made downloader faster with parallel downloads

This commit is contained in:
LagradOst 2023-08-19 21:37:14 +02:00
parent 10c1ea2f02
commit 98b6417140
2 changed files with 395 additions and 49 deletions

View file

@ -593,10 +593,8 @@ class ResultViewModel2 : ViewModel() {
folder, folder,
if (link.url.contains(".srt")) ".srt" else "vtt", if (link.url.contains(".srt")) ".srt" else "vtt",
false, false,
null null, createNotificationCallback = {}
) { )
// no notification
}
} }
} }

View file

@ -40,14 +40,20 @@ import com.lagradost.cloudstream3.utils.Coroutines.main
import com.lagradost.cloudstream3.utils.DataStore.getKey import com.lagradost.cloudstream3.utils.DataStore.getKey
import com.lagradost.cloudstream3.utils.DataStore.removeKey import com.lagradost.cloudstream3.utils.DataStore.removeKey
import com.lagradost.cloudstream3.utils.UIHelper.colorFromAttribute import com.lagradost.cloudstream3.utils.UIHelper.colorFromAttribute
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import okhttp3.internal.closeQuietly import okhttp3.internal.closeQuietly
import java.io.Closeable import java.io.Closeable
import java.io.File import java.io.File
import java.io.IOException import java.io.IOException
import java.io.InputStream
import java.io.OutputStream import java.io.OutputStream
import java.net.URL import java.net.URL
import java.util.* import java.util.*
@ -710,6 +716,8 @@ object VideoDownloadManager {
data class DownloadMetaData( data class DownloadMetaData(
private val id: Int?, private val id: Int?,
var bytesDownloaded: Long = 0, var bytesDownloaded: Long = 0,
var bytesWritten: Long = 0,
var totalBytes: Long? = null, var totalBytes: Long? = null,
// notification metadata // notification metadata
@ -732,10 +740,21 @@ object VideoDownloadManager {
val approxTotalBytes: Long val approxTotalBytes: Long
get() = totalBytes ?: hlsTotal?.let { total -> get() = totalBytes ?: hlsTotal?.let { total ->
(bytesDownloaded * (total / hlsProgress.toFloat())).toLong() (bytesDownloaded * (total / hlsProgress.toFloat())).toLong()
} ?: 0L } ?: bytesDownloaded
private val isHLS get() = hlsTotal != null private val isHLS get() = hlsTotal != null
private var stopListener: (() -> Unit)? = null
/** on cancel button pressed or failed invoke this once and only once */
fun setOnStop(callback: (() -> Unit)) {
stopListener = callback
}
fun removeStopListener() {
stopListener = null
}
private val downloadEventListener = { event: Pair<Int, DownloadActionType> -> private val downloadEventListener = { event: Pair<Int, DownloadActionType> ->
if (event.first == id) { if (event.first == id) {
when (event.second) { when (event.second) {
@ -747,6 +766,8 @@ object VideoDownloadManager {
type = DownloadType.IsStopped type = DownloadType.IsStopped
removeKey(KEY_RESUME_PACKAGES, event.first.toString()) removeKey(KEY_RESUME_PACKAGES, event.first.toString())
saveQueue() saveQueue()
stopListener?.invoke()
stopListener = null
} }
DownloadActionType.Resume -> { DownloadActionType.Resume -> {
@ -783,13 +804,14 @@ object VideoDownloadManager {
override fun close() { override fun close() {
// as we may need to resume hls downloads, we save the current written index // as we may need to resume hls downloads, we save the current written index
if (isHLS) { if (isHLS || totalBytes == null) {
updateFileInfo() updateFileInfo()
} }
if (id != null) { if (id != null) {
downloadEvent -= downloadEventListener downloadEvent -= downloadEventListener
downloadStatus -= id downloadStatus -= id
} }
stopListener = null
} }
var type var type
@ -846,6 +868,11 @@ object VideoDownloadManager {
updateFileInfo() updateFileInfo()
} }
if (internalType == DownloadType.IsStopped || internalType == DownloadType.IsFailed) {
stopListener?.invoke()
stopListener = null
}
// push all events, this *should* not crash, TODO MUTEX? // push all events, this *should* not crash, TODO MUTEX?
if (id != null) { if (id != null) {
downloadStatus[id] = type downloadStatus[id] = type
@ -874,6 +901,10 @@ object VideoDownloadManager {
if (type == DownloadType.IsDownloading) checkNotification() if (type == DownloadType.IsDownloading) checkNotification()
} }
fun addBytesWritten(length: Long) {
bytesWritten += length
}
/** adds the length + hsl progress and pushes a notification if necessary */ /** adds the length + hsl progress and pushes a notification if necessary */
fun addSegment(length: Long) { fun addSegment(length: Long) {
hlsProgress += 1 hlsProgress += 1
@ -885,6 +916,173 @@ object VideoDownloadManager {
} }
} }
/** bytes have the size end-start where the byte range is [start,end)
* note that ByteArray is a pointer and therefore cant be stored without cloning it */
data class LazyStreamDownloadResponse(
val bytes: ByteArray,
val startByte: Long,
val endByte: Long,
) {
val size get() = endByte - startByte
override fun toString(): String {
return "$startByte->$endByte"
}
override fun equals(other: Any?): Boolean {
if (other !is LazyStreamDownloadResponse) return false
return other.startByte == startByte && other.endByte == endByte
}
override fun hashCode(): Int {
return Objects.hash(startByte, endByte)
}
}
data class LazyStreamDownloadData(
private val url: String,
private val headers: Map<String, String>,
private val referer: String,
/** This specifies where chunck i starts and ends,
* bytes=${chuckStartByte[ i ]}-${chuckStartByte[ i+1 ] -1}
* where out of bounds => bytes=${chuckStartByte[ i ]}- */
private val chuckStartByte: LongArray,
val totalLength: Long?,
val downloadLength: Long?,
val chuckSize: Long,
val bufferSize: Int,
) {
val size get() = chuckStartByte.size
/** returns what byte it has downloaded,
* so start at 10 and download 4 bytes = return 14
*
* the range is [startByte, endByte) to be able to do [a, b) [b, c) ect
*
* [a, null) will return inclusive to eof = [a, eof]
*
* throws an error if initial get request fails, can be specified as return startByte
* */
@Throws
private suspend fun resolve(
startByte: Long,
endByte: Long?,
callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit)
): Long = withContext(Dispatchers.IO) {
var currentByte: Long = startByte
val stopAt = endByte ?: Long.MAX_VALUE
if (currentByte >= stopAt) return@withContext currentByte
val request = app.get(
url,
headers = headers + mapOf(
// range header is inclusive so [startByte, endByte-1] = [startByte, endByte)
// if nothing at end the server will continue until eof
"Range" to "bytes=$startByte-" // ${endByte?.minus(1)?.toString() ?: "" }
),
referer = referer,
verify = false
)
val requestStream = request.body.byteStream()
val buffer = ByteArray(bufferSize)
var read: Int
try {
while (requestStream.read(buffer, 0, bufferSize).also { read = it } >= 0) {
val start = currentByte
currentByte += read.toLong()
// this stops overflow
if (currentByte >= stopAt) {
callback(LazyStreamDownloadResponse(buffer, start, stopAt))
break
} else {
callback(LazyStreamDownloadResponse(buffer, start, currentByte))
}
}
} catch (e: CancellationException) {
throw e
} catch (t: Throwable) {
logError(t)
} finally {
requestStream.closeQuietly()
}
return@withContext currentByte
}
/** retries the resolve n times and returns true if successful */
suspend fun resolveSafe(
index: Int,
retries: Int = 3,
callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit)
): Boolean {
var start = chuckStartByte.getOrNull(index) ?: return false
val end = chuckStartByte.getOrNull(index + 1)
for (i in 0 until retries) {
try {
// in case
start = resolve(start, end, callback)
// no end defined, so we don't care exactly where it ended
if (end == null) return true
// we have download more or exactly what we needed
if (start >= end) return true
} catch (e: IllegalStateException) {
return false
} catch (e: CancellationException) {
return false
} catch (t: Throwable) {
continue
}
}
return false
}
}
@Throws
suspend fun streamLazy(
url: String,
headers: Map<String, String>,
referer: String,
startByte: Long,
/** how many bytes every connection should be, by default it is 10 MiB */
chuckSize: Long = (1 shl 20) * 10,
/** maximum bytes in the buffer that responds */
bufferSize: Int = DEFAULT_BUFFER_SIZE
): LazyStreamDownloadData {
// we don't want to make a separate connection for every 1kb
require(chuckSize > 1000)
val contentLength =
app.head(url = url, headers = headers, referer = referer, verify = false).size
var downloadLength: Long? = null
var totalLength: Long? = null
val ranges = if (contentLength == null) {
LongArray(1) { startByte }
} else {
downloadLength = contentLength - startByte
totalLength = contentLength
LongArray((downloadLength / chuckSize).toInt()) { idx ->
startByte + idx * chuckSize
}
}
return LazyStreamDownloadData(
url = url,
headers = headers,
referer = referer,
chuckStartByte = ranges,
downloadLength = downloadLength,
totalLength = totalLength,
chuckSize = chuckSize,
bufferSize = bufferSize
)
}
@Throws @Throws
suspend fun downloadThing( suspend fun downloadThing(
context: Context, context: Context,
@ -895,6 +1093,7 @@ object VideoDownloadManager {
tryResume: Boolean, tryResume: Boolean,
parentId: Int?, parentId: Int?,
createNotificationCallback: (CreateNotificationMetadata) -> Unit, createNotificationCallback: (CreateNotificationMetadata) -> Unit,
parallelConnections: Int = 3
): Int = withContext(Dispatchers.IO) { ): Int = withContext(Dispatchers.IO) {
// we cant download torrents with this implementation, aria2c might be used in the future // we cant download torrents with this implementation, aria2c might be used in the future
if (link.url.startsWith("magnet") || link.url.endsWith(".torrent")) { if (link.url.startsWith("magnet") || link.url.endsWith(".torrent")) {
@ -902,7 +1101,7 @@ object VideoDownloadManager {
} }
var fileStream: OutputStream? = null var fileStream: OutputStream? = null
var requestStream: InputStream? = null //var requestStream: InputStream? = null
val metadata = DownloadMetaData( val metadata = DownloadMetaData(
totalBytes = 0, totalBytes = 0,
bytesDownloaded = 0, bytesDownloaded = 0,
@ -926,11 +1125,13 @@ object VideoDownloadManager {
val fileLength = stream.fileLength ?: return@withContext ERROR_UNKNOWN val fileLength = stream.fileLength ?: return@withContext ERROR_UNKNOWN
val resumeAt = (if (resume) fileLength else 0) val resumeAt = (if (resume) fileLength else 0)
metadata.bytesDownloaded = resumeAt metadata.bytesDownloaded = resumeAt
metadata.bytesWritten = resumeAt
metadata.type = DownloadType.IsPending metadata.type = DownloadType.IsPending
// set up a connection val items = streamLazy(
val request = app.get( url = link.url.replace(" ", "%20"),
link.url.replace(" ", "%20"), referer = link.referer,
startByte = resumeAt,
headers = link.headers.appendAndDontOverride( headers = link.headers.appendAndDontOverride(
mapOf( mapOf(
"Accept-Encoding" to "identity", "Accept-Encoding" to "identity",
@ -941,17 +1142,12 @@ object VideoDownloadManager {
"sec-fetch-dest" to "video", "sec-fetch-dest" to "video",
"sec-fetch-user" to "?1", "sec-fetch-user" to "?1",
"sec-ch-ua-mobile" to "?0", "sec-ch-ua-mobile" to "?0",
) + if (resumeAt > 0) mapOf("Range" to "bytes=${resumeAt}-") else emptyMap() )
), )
referer = link.referer,
verify = false
) )
// init variables metadata.totalBytes = items.totalLength
val contentLength = request.size ?: 0 metadata.type = DownloadType.IsDownloading
metadata.totalBytes = contentLength + resumeAt
// save
metadata.setDownloadFileInfoTemplate( metadata.setDownloadFileInfoTemplate(
DownloadedFileInfo( DownloadedFileInfo(
totalBytes = metadata.approxTotalBytes, totalBytes = metadata.approxTotalBytes,
@ -961,23 +1157,166 @@ object VideoDownloadManager {
) )
) )
// total length is less than 5mb, that is too short and something has gone wrong val currentMutex = Mutex()
if (extension == "mp4" && metadata.approxTotalBytes < 5000000) return@withContext ERROR_TOO_SMALL_CONNECTION val current = (0 until items.size).iterator()
// read the buffer into the filestream, this is equivalent of transferTo val fileMutex = Mutex()
requestStream = request.body.byteStream() // start to data
metadata.type = DownloadType.IsDownloading val pendingData: HashMap<Long, LazyStreamDownloadResponse> =
hashMapOf()
val buffer = ByteArray(DEFAULT_BUFFER_SIZE) val jobs = (0 until parallelConnections).map {
var read: Int launch {
while (requestStream.read(buffer, 0, DEFAULT_BUFFER_SIZE).also { read = it } >= 0) {
fileStream.write(buffer, 0, read)
// this may seem a bit complex but it more or less acts as a queue system
// imagine we do the downloading [0,3] and it response in the order 0,2,3,1
// file: [_,_,_,_] queue: [_,_,_,_] Initial condition
// file: [X,_,_,_] queue: [_,_,_,_] + added 0 directly to file
// file: [X,_,_,_] queue: [_,_,X,_] + added 2 to queue
// file: [X,_,_,_] queue: [_,_,X,X] + added 3 to queue
// file: [X,X,_,_] queue: [_,_,X,X] + added 1 directly to file
// file: [X,X,X,X] queue: [_,_,_,_] write the queue and remove from it
val callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit) =
callback@{ response ->
if (!isActive) return@callback
fileMutex.withLock {
// wait until not paused // wait until not paused
while (metadata.type == DownloadType.IsPaused) delay(100) while (metadata.type == DownloadType.IsPaused) delay(100)
// if stopped then break to delete // if stopped then throw
if (metadata.type == DownloadType.IsStopped) break if (metadata.type == DownloadType.IsStopped || metadata.type == DownloadType.IsFailed) {
metadata.addBytes(read.toLong()) this.cancel()
return@callback
}
val responseSize = response.size
metadata.addBytes(response.size)
if (response.startByte == metadata.bytesWritten) {
// if we are first in the queue then write it directly
fileStream.write(
response.bytes,
0,
responseSize.toInt()
)
metadata.addBytesWritten(responseSize)
} else {
// otherwise append to queue, we need to clone the bytes as they will be overridden otherwise
pendingData[response.startByte] =
response.copy(bytes = response.bytes.clone())
}
while (true) {
// remove the current queue start, so no possibility of
// while(true) { continue } in case size = 0, and removed extra
// garbage
val pending = pendingData.remove(metadata.bytesWritten) ?: break
val size = pending.size
fileStream.write(
pending.bytes,
0,
size.toInt()
)
metadata.addBytesWritten(size)
}
}
}
// this will take up the first available job and resolve
while (true) {
if (!isActive) return@launch
fileMutex.withLock {
if (metadata.type == DownloadType.IsStopped) return@launch
}
// just in case, we never want this to fail due to multithreading
val index = currentMutex.withLock {
if (!current.hasNext()) return@launch
current.nextInt()
}
// in case something has gone wrong set to failed if the fail is not caused by
// user cancellation
if (!items.resolveSafe(index, callback = callback)) {
fileMutex.withLock {
if (metadata.type != DownloadType.IsStopped) {
metadata.type = DownloadType.IsFailed
}
}
return@launch
}
}
}
}
// fast stop as the jobs may be in a slow request
metadata.setOnStop {
jobs.forEach { job ->
try {
job.cancel()
} catch (t: Throwable) {
logError(t)
}
}
}
jobs.forEach { it.join() }
// jobs are finished so we don't want to stop them anymore
metadata.removeStopListener()
// set up a connection
//val request = app.get(
// link.url.replace(" ", "%20"),
// headers = link.headers.appendAndDontOverride(
// mapOf(
// "Accept-Encoding" to "identity",
// "accept" to "*/*",
// "user-agent" to USER_AGENT,
// "sec-ch-ua" to "\"Chromium\";v=\"91\", \" Not;A Brand\";v=\"99\"",
// "sec-fetch-mode" to "navigate",
// "sec-fetch-dest" to "video",
// "sec-fetch-user" to "?1",
// "sec-ch-ua-mobile" to "?0",
// ) + if (resumeAt > 0) mapOf("Range" to "bytes=${resumeAt}-") else emptyMap()
// ),
// referer = link.referer,
// verify = false
//)
// init variables
//val contentLength = request.size ?: 0
//metadata.totalBytes = contentLength + resumeAt
//// save
//metadata.setDownloadFileInfoTemplate(
// DownloadedFileInfo(
// totalBytes = metadata.approxTotalBytes,
// relativePath = relativePath ?: "",
// displayName = displayName,
// basePath = basePath
// )
//)
//// total length is less than 5mb, that is too short and something has gone wrong
//if (extension == "mp4" && metadata.approxTotalBytes < 5000000) return@withContext ERROR_TOO_SMALL_CONNECTION
//// read the buffer into the filestream, this is equivalent of transferTo
//requestStream = request.body.byteStream()
//metadata.type = DownloadType.IsDownloading
//val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
//var read: Int
//while (requestStream.read(buffer, 0, DEFAULT_BUFFER_SIZE).also { read = it } >= 0) {
// fileStream.write(buffer, 0, read)
// // wait until not paused
// while (metadata.type == DownloadType.IsPaused) delay(100)
// // if stopped then break to delete
// if (metadata.type == DownloadType.IsStopped) break
// metadata.addBytes(read.toLong())
//}
if (metadata.type == DownloadType.IsFailed) {
return@withContext ERROR_CONNECTION_ERROR
} }
if (metadata.type == DownloadType.IsStopped) { if (metadata.type == DownloadType.IsStopped) {
@ -1003,11 +1342,12 @@ object VideoDownloadManager {
// note that when failing we don't want to delete the file, // note that when failing we don't want to delete the file,
// only user interaction has that power // only user interaction has that power
metadata.removeStopListener()
metadata.type = DownloadType.IsFailed metadata.type = DownloadType.IsFailed
return@withContext ERROR_CONNECTION_ERROR return@withContext ERROR_CONNECTION_ERROR
} finally { } finally {
fileStream?.closeQuietly() fileStream?.closeQuietly()
requestStream?.closeQuietly() //requestStream?.closeQuietly()
metadata.close() metadata.close()
} }
} }
@ -1388,7 +1728,15 @@ object VideoDownloadManager {
} }
return suspendSafeApiCall { return suspendSafeApiCall {
downloadThing(context, link, name, folder, "mp4", tryResume, ep.id) { meta -> downloadThing(
context,
link,
name,
folder,
"mp4",
tryResume,
ep.id,
createNotificationCallback = { meta ->
main { main {
createNotification( createNotification(
context, context,
@ -1401,7 +1749,7 @@ object VideoDownloadManager {
notificationCallback notificationCallback
) )
} }
} })
}.also { extractorJob.cancel() } ?: ERROR_UNKNOWN }.also { extractorJob.cancel() } ?: ERROR_UNKNOWN
} }