From 98b641714073e6bb84f74bd9ea3e19a03ed857aa Mon Sep 17 00:00:00 2001 From: LagradOst <11805592+LagradOst@users.noreply.github.com> Date: Sat, 19 Aug 2023 21:37:14 +0200 Subject: [PATCH] made downloader faster with parallel downloads --- .../ui/result/ResultViewModel2.kt | 6 +- .../utils/VideoDownloadManager.kt | 438 ++++++++++++++++-- 2 files changed, 395 insertions(+), 49 deletions(-) diff --git a/app/src/main/java/com/lagradost/cloudstream3/ui/result/ResultViewModel2.kt b/app/src/main/java/com/lagradost/cloudstream3/ui/result/ResultViewModel2.kt index 2fe3b012..bdd27091 100644 --- a/app/src/main/java/com/lagradost/cloudstream3/ui/result/ResultViewModel2.kt +++ b/app/src/main/java/com/lagradost/cloudstream3/ui/result/ResultViewModel2.kt @@ -593,10 +593,8 @@ class ResultViewModel2 : ViewModel() { folder, if (link.url.contains(".srt")) ".srt" else "vtt", false, - null - ) { - // no notification - } + null, createNotificationCallback = {} + ) } } diff --git a/app/src/main/java/com/lagradost/cloudstream3/utils/VideoDownloadManager.kt b/app/src/main/java/com/lagradost/cloudstream3/utils/VideoDownloadManager.kt index dc3eaa25..d8ef7e85 100644 --- a/app/src/main/java/com/lagradost/cloudstream3/utils/VideoDownloadManager.kt +++ b/app/src/main/java/com/lagradost/cloudstream3/utils/VideoDownloadManager.kt @@ -40,14 +40,20 @@ import com.lagradost.cloudstream3.utils.Coroutines.main import com.lagradost.cloudstream3.utils.DataStore.getKey import com.lagradost.cloudstream3.utils.DataStore.removeKey import com.lagradost.cloudstream3.utils.UIHelper.colorFromAttribute +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.cancel import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import okhttp3.internal.closeQuietly import java.io.Closeable import java.io.File import java.io.IOException -import java.io.InputStream import java.io.OutputStream import java.net.URL import java.util.* @@ -710,6 +716,8 @@ object VideoDownloadManager { data class DownloadMetaData( private val id: Int?, var bytesDownloaded: Long = 0, + var bytesWritten: Long = 0, + var totalBytes: Long? = null, // notification metadata @@ -732,10 +740,21 @@ object VideoDownloadManager { val approxTotalBytes: Long get() = totalBytes ?: hlsTotal?.let { total -> (bytesDownloaded * (total / hlsProgress.toFloat())).toLong() - } ?: 0L + } ?: bytesDownloaded private val isHLS get() = hlsTotal != null + private var stopListener: (() -> Unit)? = null + + /** on cancel button pressed or failed invoke this once and only once */ + fun setOnStop(callback: (() -> Unit)) { + stopListener = callback + } + + fun removeStopListener() { + stopListener = null + } + private val downloadEventListener = { event: Pair -> if (event.first == id) { when (event.second) { @@ -747,6 +766,8 @@ object VideoDownloadManager { type = DownloadType.IsStopped removeKey(KEY_RESUME_PACKAGES, event.first.toString()) saveQueue() + stopListener?.invoke() + stopListener = null } DownloadActionType.Resume -> { @@ -783,13 +804,14 @@ object VideoDownloadManager { override fun close() { // as we may need to resume hls downloads, we save the current written index - if (isHLS) { + if (isHLS || totalBytes == null) { updateFileInfo() } if (id != null) { downloadEvent -= downloadEventListener downloadStatus -= id } + stopListener = null } var type @@ -846,6 +868,11 @@ object VideoDownloadManager { updateFileInfo() } + if (internalType == DownloadType.IsStopped || internalType == DownloadType.IsFailed) { + stopListener?.invoke() + stopListener = null + } + // push all events, this *should* not crash, TODO MUTEX? if (id != null) { downloadStatus[id] = type @@ -874,6 +901,10 @@ object VideoDownloadManager { if (type == DownloadType.IsDownloading) checkNotification() } + fun addBytesWritten(length: Long) { + bytesWritten += length + } + /** adds the length + hsl progress and pushes a notification if necessary */ fun addSegment(length: Long) { hlsProgress += 1 @@ -885,6 +916,173 @@ object VideoDownloadManager { } } + /** bytes have the size end-start where the byte range is [start,end) + * note that ByteArray is a pointer and therefore cant be stored without cloning it */ + data class LazyStreamDownloadResponse( + val bytes: ByteArray, + val startByte: Long, + val endByte: Long, + ) { + val size get() = endByte - startByte + + override fun toString(): String { + return "$startByte->$endByte" + } + + override fun equals(other: Any?): Boolean { + if (other !is LazyStreamDownloadResponse) return false + return other.startByte == startByte && other.endByte == endByte + } + + override fun hashCode(): Int { + return Objects.hash(startByte, endByte) + } + } + + data class LazyStreamDownloadData( + private val url: String, + private val headers: Map, + private val referer: String, + /** This specifies where chunck i starts and ends, + * bytes=${chuckStartByte[ i ]}-${chuckStartByte[ i+1 ] -1} + * where out of bounds => bytes=${chuckStartByte[ i ]}- */ + private val chuckStartByte: LongArray, + val totalLength: Long?, + val downloadLength: Long?, + val chuckSize: Long, + val bufferSize: Int, + ) { + val size get() = chuckStartByte.size + + /** returns what byte it has downloaded, + * so start at 10 and download 4 bytes = return 14 + * + * the range is [startByte, endByte) to be able to do [a, b) [b, c) ect + * + * [a, null) will return inclusive to eof = [a, eof] + * + * throws an error if initial get request fails, can be specified as return startByte + * */ + @Throws + private suspend fun resolve( + startByte: Long, + endByte: Long?, + callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit) + ): Long = withContext(Dispatchers.IO) { + var currentByte: Long = startByte + val stopAt = endByte ?: Long.MAX_VALUE + if (currentByte >= stopAt) return@withContext currentByte + + val request = app.get( + url, + headers = headers + mapOf( + // range header is inclusive so [startByte, endByte-1] = [startByte, endByte) + // if nothing at end the server will continue until eof + "Range" to "bytes=$startByte-" // ${endByte?.minus(1)?.toString() ?: "" } + ), + referer = referer, + verify = false + ) + val requestStream = request.body.byteStream() + + val buffer = ByteArray(bufferSize) + var read: Int + + try { + while (requestStream.read(buffer, 0, bufferSize).also { read = it } >= 0) { + val start = currentByte + currentByte += read.toLong() + + // this stops overflow + if (currentByte >= stopAt) { + callback(LazyStreamDownloadResponse(buffer, start, stopAt)) + break + } else { + callback(LazyStreamDownloadResponse(buffer, start, currentByte)) + } + } + } catch (e: CancellationException) { + throw e + } catch (t: Throwable) { + logError(t) + } finally { + requestStream.closeQuietly() + } + + return@withContext currentByte + } + + /** retries the resolve n times and returns true if successful */ + suspend fun resolveSafe( + index: Int, + retries: Int = 3, + callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit) + ): Boolean { + var start = chuckStartByte.getOrNull(index) ?: return false + val end = chuckStartByte.getOrNull(index + 1) + + for (i in 0 until retries) { + try { + // in case + start = resolve(start, end, callback) + // no end defined, so we don't care exactly where it ended + if (end == null) return true + // we have download more or exactly what we needed + if (start >= end) return true + } catch (e: IllegalStateException) { + return false + } catch (e: CancellationException) { + return false + } catch (t: Throwable) { + continue + } + } + return false + } + + } + + @Throws + suspend fun streamLazy( + url: String, + headers: Map, + referer: String, + startByte: Long, + /** how many bytes every connection should be, by default it is 10 MiB */ + chuckSize: Long = (1 shl 20) * 10, + /** maximum bytes in the buffer that responds */ + bufferSize: Int = DEFAULT_BUFFER_SIZE + ): LazyStreamDownloadData { + // we don't want to make a separate connection for every 1kb + require(chuckSize > 1000) + + val contentLength = + app.head(url = url, headers = headers, referer = referer, verify = false).size + + var downloadLength: Long? = null + var totalLength: Long? = null + + val ranges = if (contentLength == null) { + LongArray(1) { startByte } + } else { + downloadLength = contentLength - startByte + totalLength = contentLength + LongArray((downloadLength / chuckSize).toInt()) { idx -> + startByte + idx * chuckSize + } + } + return LazyStreamDownloadData( + url = url, + headers = headers, + referer = referer, + chuckStartByte = ranges, + downloadLength = downloadLength, + totalLength = totalLength, + chuckSize = chuckSize, + bufferSize = bufferSize + ) + } + @Throws suspend fun downloadThing( context: Context, @@ -895,6 +1093,7 @@ object VideoDownloadManager { tryResume: Boolean, parentId: Int?, createNotificationCallback: (CreateNotificationMetadata) -> Unit, + parallelConnections: Int = 3 ): Int = withContext(Dispatchers.IO) { // we cant download torrents with this implementation, aria2c might be used in the future if (link.url.startsWith("magnet") || link.url.endsWith(".torrent")) { @@ -902,7 +1101,7 @@ object VideoDownloadManager { } var fileStream: OutputStream? = null - var requestStream: InputStream? = null + //var requestStream: InputStream? = null val metadata = DownloadMetaData( totalBytes = 0, bytesDownloaded = 0, @@ -926,11 +1125,13 @@ object VideoDownloadManager { val fileLength = stream.fileLength ?: return@withContext ERROR_UNKNOWN val resumeAt = (if (resume) fileLength else 0) metadata.bytesDownloaded = resumeAt + metadata.bytesWritten = resumeAt metadata.type = DownloadType.IsPending - // set up a connection - val request = app.get( - link.url.replace(" ", "%20"), + val items = streamLazy( + url = link.url.replace(" ", "%20"), + referer = link.referer, + startByte = resumeAt, headers = link.headers.appendAndDontOverride( mapOf( "Accept-Encoding" to "identity", @@ -941,17 +1142,12 @@ object VideoDownloadManager { "sec-fetch-dest" to "video", "sec-fetch-user" to "?1", "sec-ch-ua-mobile" to "?0", - ) + if (resumeAt > 0) mapOf("Range" to "bytes=${resumeAt}-") else emptyMap() - ), - referer = link.referer, - verify = false + ) + ) ) - // init variables - val contentLength = request.size ?: 0 - metadata.totalBytes = contentLength + resumeAt - - // save + metadata.totalBytes = items.totalLength + metadata.type = DownloadType.IsDownloading metadata.setDownloadFileInfoTemplate( DownloadedFileInfo( totalBytes = metadata.approxTotalBytes, @@ -961,23 +1157,166 @@ object VideoDownloadManager { ) ) - // total length is less than 5mb, that is too short and something has gone wrong - if (extension == "mp4" && metadata.approxTotalBytes < 5000000) return@withContext ERROR_TOO_SMALL_CONNECTION + val currentMutex = Mutex() + val current = (0 until items.size).iterator() - // read the buffer into the filestream, this is equivalent of transferTo - requestStream = request.body.byteStream() - metadata.type = DownloadType.IsDownloading + val fileMutex = Mutex() + // start to data + val pendingData: HashMap = + hashMapOf() - val buffer = ByteArray(DEFAULT_BUFFER_SIZE) - var read: Int - while (requestStream.read(buffer, 0, DEFAULT_BUFFER_SIZE).also { read = it } >= 0) { - fileStream.write(buffer, 0, read) + val jobs = (0 until parallelConnections).map { + launch { - // wait until not paused - while (metadata.type == DownloadType.IsPaused) delay(100) - // if stopped then break to delete - if (metadata.type == DownloadType.IsStopped) break - metadata.addBytes(read.toLong()) + // this may seem a bit complex but it more or less acts as a queue system + // imagine we do the downloading [0,3] and it response in the order 0,2,3,1 + // file: [_,_,_,_] queue: [_,_,_,_] Initial condition + // file: [X,_,_,_] queue: [_,_,_,_] + added 0 directly to file + // file: [X,_,_,_] queue: [_,_,X,_] + added 2 to queue + // file: [X,_,_,_] queue: [_,_,X,X] + added 3 to queue + // file: [X,X,_,_] queue: [_,_,X,X] + added 1 directly to file + // file: [X,X,X,X] queue: [_,_,_,_] write the queue and remove from it + + val callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit) = + callback@{ response -> + if (!isActive) return@callback + fileMutex.withLock { + // wait until not paused + while (metadata.type == DownloadType.IsPaused) delay(100) + // if stopped then throw + if (metadata.type == DownloadType.IsStopped || metadata.type == DownloadType.IsFailed) { + this.cancel() + return@callback + } + + val responseSize = response.size + metadata.addBytes(response.size) + + if (response.startByte == metadata.bytesWritten) { + // if we are first in the queue then write it directly + fileStream.write( + response.bytes, + 0, + responseSize.toInt() + ) + metadata.addBytesWritten(responseSize) + } else { + // otherwise append to queue, we need to clone the bytes as they will be overridden otherwise + pendingData[response.startByte] = + response.copy(bytes = response.bytes.clone()) + } + + while (true) { + // remove the current queue start, so no possibility of + // while(true) { continue } in case size = 0, and removed extra + // garbage + val pending = pendingData.remove(metadata.bytesWritten) ?: break + + val size = pending.size + + fileStream.write( + pending.bytes, + 0, + size.toInt() + ) + metadata.addBytesWritten(size) + } + } + } + + // this will take up the first available job and resolve + while (true) { + if (!isActive) return@launch + fileMutex.withLock { + if (metadata.type == DownloadType.IsStopped) return@launch + } + + // just in case, we never want this to fail due to multithreading + val index = currentMutex.withLock { + if (!current.hasNext()) return@launch + current.nextInt() + } + + // in case something has gone wrong set to failed if the fail is not caused by + // user cancellation + if (!items.resolveSafe(index, callback = callback)) { + fileMutex.withLock { + if (metadata.type != DownloadType.IsStopped) { + metadata.type = DownloadType.IsFailed + } + } + return@launch + } + } + } + } + + // fast stop as the jobs may be in a slow request + metadata.setOnStop { + jobs.forEach { job -> + try { + job.cancel() + } catch (t: Throwable) { + logError(t) + } + } + } + + jobs.forEach { it.join() } + + // jobs are finished so we don't want to stop them anymore + metadata.removeStopListener() + + // set up a connection + //val request = app.get( + // link.url.replace(" ", "%20"), + // headers = link.headers.appendAndDontOverride( + // mapOf( + // "Accept-Encoding" to "identity", + // "accept" to "*/*", + // "user-agent" to USER_AGENT, + // "sec-ch-ua" to "\"Chromium\";v=\"91\", \" Not;A Brand\";v=\"99\"", + // "sec-fetch-mode" to "navigate", + // "sec-fetch-dest" to "video", + // "sec-fetch-user" to "?1", + // "sec-ch-ua-mobile" to "?0", + // ) + if (resumeAt > 0) mapOf("Range" to "bytes=${resumeAt}-") else emptyMap() + // ), + // referer = link.referer, + // verify = false + //) + + // init variables + //val contentLength = request.size ?: 0 + //metadata.totalBytes = contentLength + resumeAt + //// save + //metadata.setDownloadFileInfoTemplate( + // DownloadedFileInfo( + // totalBytes = metadata.approxTotalBytes, + // relativePath = relativePath ?: "", + // displayName = displayName, + // basePath = basePath + // ) + //) + //// total length is less than 5mb, that is too short and something has gone wrong + //if (extension == "mp4" && metadata.approxTotalBytes < 5000000) return@withContext ERROR_TOO_SMALL_CONNECTION + //// read the buffer into the filestream, this is equivalent of transferTo + //requestStream = request.body.byteStream() + //metadata.type = DownloadType.IsDownloading + //val buffer = ByteArray(DEFAULT_BUFFER_SIZE) + //var read: Int + //while (requestStream.read(buffer, 0, DEFAULT_BUFFER_SIZE).also { read = it } >= 0) { + // fileStream.write(buffer, 0, read) + // // wait until not paused + // while (metadata.type == DownloadType.IsPaused) delay(100) + // // if stopped then break to delete + // if (metadata.type == DownloadType.IsStopped) break + // metadata.addBytes(read.toLong()) + //} + + + if (metadata.type == DownloadType.IsFailed) { + return@withContext ERROR_CONNECTION_ERROR } if (metadata.type == DownloadType.IsStopped) { @@ -1003,11 +1342,12 @@ object VideoDownloadManager { // note that when failing we don't want to delete the file, // only user interaction has that power + metadata.removeStopListener() metadata.type = DownloadType.IsFailed return@withContext ERROR_CONNECTION_ERROR } finally { fileStream?.closeQuietly() - requestStream?.closeQuietly() + //requestStream?.closeQuietly() metadata.close() } } @@ -1388,20 +1728,28 @@ object VideoDownloadManager { } return suspendSafeApiCall { - downloadThing(context, link, name, folder, "mp4", tryResume, ep.id) { meta -> - main { - createNotification( - context, - source, - link.name, - ep, - meta.type, - meta.bytesDownloaded, - meta.bytesTotal, - notificationCallback - ) - } - } + downloadThing( + context, + link, + name, + folder, + "mp4", + tryResume, + ep.id, + createNotificationCallback = { meta -> + main { + createNotification( + context, + source, + link.name, + ep, + meta.type, + meta.bytesDownloaded, + meta.bytesTotal, + notificationCallback + ) + } + }) }.also { extractorJob.cancel() } ?: ERROR_UNKNOWN }