made HSL downloader even faster

This commit is contained in:
LagradOst 2023-08-20 01:29:50 +02:00
parent 98b6417140
commit c4852ce440
2 changed files with 132 additions and 88 deletions

View file

@ -2,6 +2,7 @@ package com.lagradost.cloudstream3.utils
import com.lagradost.cloudstream3.ErrorLoadingException import com.lagradost.cloudstream3.ErrorLoadingException
import com.lagradost.cloudstream3.app import com.lagradost.cloudstream3.app
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import javax.crypto.Cipher import javax.crypto.Cipher
import javax.crypto.spec.IvParameterSpec import javax.crypto.spec.IvParameterSpec
@ -196,6 +197,8 @@ object M3u8Helper2 {
return if(condition()) out else null return if(condition()) out else null
} catch (e: IllegalArgumentException) { } catch (e: IllegalArgumentException) {
return null return null
} catch (e : CancellationException) {
return null
} catch (t: Throwable) { } catch (t: Throwable) {
delay(failDelay) delay(failDelay)
} }
@ -213,6 +216,8 @@ object M3u8Helper2 {
return resolveLink(index) return resolveLink(index)
} catch (e: IllegalArgumentException) { } catch (e: IllegalArgumentException) {
return null return null
} catch (e : CancellationException) {
return null
} catch (t: Throwable) { } catch (t: Throwable) {
delay(failDelay) delay(failDelay)
} }

View file

@ -43,6 +43,7 @@ import com.lagradost.cloudstream3.utils.UIHelper.colorFromAttribute
import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
@ -1083,6 +1084,39 @@ object VideoDownloadManager {
) )
} }
/** Helper function to make sure duplicate attributes don't get overriden or inserted without lowercase cmp
* example: map("a" to 1) appendAndDontOverride map("A" to 2, "a" to 3, "c" to 4) = map("a" to 1, "c" to 4)
* */
private fun <V> Map<String, V>.appendAndDontOverride(rhs: Map<String, V>): Map<String, V> {
val out = this.toMutableMap()
val current = this.keys.map { it.lowercase() }
for ((key, value) in rhs) {
if (current.contains(key.lowercase())) continue
out[key] = value
}
return out
}
private fun List<Job>.cancel() {
forEach { job ->
try {
job.cancel()
} catch (t: Throwable) {
logError(t)
}
}
}
private suspend fun List<Job>.join() {
forEach { job ->
try {
job.join()
} catch (t: Throwable) {
logError(t)
}
}
}
@Throws @Throws
suspend fun downloadThing( suspend fun downloadThing(
context: Context, context: Context,
@ -1166,8 +1200,9 @@ object VideoDownloadManager {
hashMapOf() hashMapOf()
val jobs = (0 until parallelConnections).map { val jobs = (0 until parallelConnections).map {
launch { launch(Dispatchers.IO) {
// @downloadexplanation
// this may seem a bit complex but it more or less acts as a queue system // this may seem a bit complex but it more or less acts as a queue system
// imagine we do the downloading [0,3] and it response in the order 0,2,3,1 // imagine we do the downloading [0,3] and it response in the order 0,2,3,1
// file: [_,_,_,_] queue: [_,_,_,_] Initial condition // file: [_,_,_,_] queue: [_,_,_,_] Initial condition
@ -1177,6 +1212,10 @@ object VideoDownloadManager {
// file: [X,X,_,_] queue: [_,_,X,X] + added 1 directly to file // file: [X,X,_,_] queue: [_,_,X,X] + added 1 directly to file
// file: [X,X,X,X] queue: [_,_,_,_] write the queue and remove from it // file: [X,X,X,X] queue: [_,_,_,_] write the queue and remove from it
// note that this is a bit more complex compared to hsl as ever segment
// will return several bytearrays, and is therefore chained by the byte
// so every request has a front and back byte instead of an index
// this *requires* that no gap exist due because of resolve
val callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit) = val callback: (suspend CoroutineScope.(LazyStreamDownloadResponse) -> Unit) =
callback@{ response -> callback@{ response ->
if (!isActive) return@callback if (!isActive) return@callback
@ -1228,10 +1267,11 @@ object VideoDownloadManager {
while (true) { while (true) {
if (!isActive) return@launch if (!isActive) return@launch
fileMutex.withLock { fileMutex.withLock {
if (metadata.type == DownloadType.IsStopped) return@launch if (metadata.type == DownloadType.IsStopped
|| metadata.type == DownloadType.IsFailed) return@launch
} }
// just in case, we never want this to fail due to multithreading // mutex just in case, we never want this to fail due to multithreading
val index = currentMutex.withLock { val index = currentMutex.withLock {
if (!current.hasNext()) return@launch if (!current.hasNext()) return@launch
current.nextInt() current.nextInt()
@ -1253,68 +1293,14 @@ object VideoDownloadManager {
// fast stop as the jobs may be in a slow request // fast stop as the jobs may be in a slow request
metadata.setOnStop { metadata.setOnStop {
jobs.forEach { job -> jobs.cancel()
try {
job.cancel()
} catch (t: Throwable) {
logError(t)
}
}
} }
jobs.forEach { it.join() } jobs.join()
// jobs are finished so we don't want to stop them anymore // jobs are finished so we don't want to stop them anymore
metadata.removeStopListener() metadata.removeStopListener()
// set up a connection
//val request = app.get(
// link.url.replace(" ", "%20"),
// headers = link.headers.appendAndDontOverride(
// mapOf(
// "Accept-Encoding" to "identity",
// "accept" to "*/*",
// "user-agent" to USER_AGENT,
// "sec-ch-ua" to "\"Chromium\";v=\"91\", \" Not;A Brand\";v=\"99\"",
// "sec-fetch-mode" to "navigate",
// "sec-fetch-dest" to "video",
// "sec-fetch-user" to "?1",
// "sec-ch-ua-mobile" to "?0",
// ) + if (resumeAt > 0) mapOf("Range" to "bytes=${resumeAt}-") else emptyMap()
// ),
// referer = link.referer,
// verify = false
//)
// init variables
//val contentLength = request.size ?: 0
//metadata.totalBytes = contentLength + resumeAt
//// save
//metadata.setDownloadFileInfoTemplate(
// DownloadedFileInfo(
// totalBytes = metadata.approxTotalBytes,
// relativePath = relativePath ?: "",
// displayName = displayName,
// basePath = basePath
// )
//)
//// total length is less than 5mb, that is too short and something has gone wrong
//if (extension == "mp4" && metadata.approxTotalBytes < 5000000) return@withContext ERROR_TOO_SMALL_CONNECTION
//// read the buffer into the filestream, this is equivalent of transferTo
//requestStream = request.body.byteStream()
//metadata.type = DownloadType.IsDownloading
//val buffer = ByteArray(DEFAULT_BUFFER_SIZE)
//var read: Int
//while (requestStream.read(buffer, 0, DEFAULT_BUFFER_SIZE).also { read = it } >= 0) {
// fileStream.write(buffer, 0, read)
// // wait until not paused
// while (metadata.type == DownloadType.IsPaused) delay(100)
// // if stopped then break to delete
// if (metadata.type == DownloadType.IsStopped) break
// metadata.addBytes(read.toLong())
//}
if (metadata.type == DownloadType.IsFailed) { if (metadata.type == DownloadType.IsFailed) {
return@withContext ERROR_CONNECTION_ERROR return@withContext ERROR_CONNECTION_ERROR
} }
@ -1342,7 +1328,6 @@ object VideoDownloadManager {
// note that when failing we don't want to delete the file, // note that when failing we don't want to delete the file,
// only user interaction has that power // only user interaction has that power
metadata.removeStopListener()
metadata.type = DownloadType.IsFailed metadata.type = DownloadType.IsFailed
return@withContext ERROR_CONNECTION_ERROR return@withContext ERROR_CONNECTION_ERROR
} finally { } finally {
@ -1352,19 +1337,6 @@ object VideoDownloadManager {
} }
} }
/** Helper function to make sure duplicate attributes don't get overriden or inserted without lowercase cmp
* example: map("a" to 1) appendAndDontOverride map("A" to 2, "a" to 3, "c" to 4) = map("a" to 1, "c" to 4)
* */
private fun <V> Map<String, V>.appendAndDontOverride(rhs: Map<String, V>): Map<String, V> {
val out = this.toMutableMap()
val current = this.keys.map { it.lowercase() }
for ((key, value) in rhs) {
if (current.contains(key.lowercase())) continue
out[key] = value
}
return out
}
@Throws @Throws
private suspend fun downloadHLS( private suspend fun downloadHLS(
context: Context, context: Context,
@ -1429,28 +1401,95 @@ object VideoDownloadManager {
metadata.hlsTotal = items.size metadata.hlsTotal = items.size
metadata.type = DownloadType.IsDownloading metadata.type = DownloadType.IsDownloading
val currentMutex = Mutex()
val current = (0 until items.size).iterator()
val fileMutex = Mutex()
val pendingData: HashMap<Int, ByteArray> = hashMapOf()
// see @downloadexplanation for explanation of this download strategy,
// this keeps all jobs working at all times,
// does several connections in parallel instead of a regular for loop to improve // does several connections in parallel instead of a regular for loop to improve
// download speed // download speed
(startAt until items.size).chunked(parallelConnections).forEach { subset -> val jobs = (0 until parallelConnections).map {
// wait until not paused launch(Dispatchers.IO) {
while (metadata.type == DownloadType.IsPaused) delay(100) while (true) {
// if stopped then break to delete if (!isActive) return@launch
if (metadata.type == DownloadType.IsStopped) return@forEach fileMutex.withLock {
if (metadata.type == DownloadType.IsStopped
|| metadata.type == DownloadType.IsFailed
) return@launch
}
subset.amap { idx -> // mutex just in case, we never want this to fail due to multithreading
idx to items.resolveLinkSafe(idx)?.also { bytes -> val index = currentMutex.withLock {
metadata.addSegment(bytes.size.toLong()) if (!current.hasNext()) return@launch
current.nextInt()
}
// in case something has gone wrong set to failed if the fail is not caused by
// user cancellation
val bytes = items.resolveLinkSafe(index) ?: run {
fileMutex.withLock {
if (metadata.type != DownloadType.IsStopped) {
metadata.type = DownloadType.IsFailed
}
}
return@launch
}
try {
fileMutex.lock()
// user pause
while (metadata.type == DownloadType.IsPaused) delay(100)
// if stopped then break to delete
if (metadata.type == DownloadType.IsStopped || !isActive) return@launch
// send notification, no matter the actual write order
metadata.addSegment(bytes.size.toLong())
// directly write the bytes if you are first
if (metadata.hlsWrittenProgress == index) {
fileStream.write(bytes)
metadata.setWrittenSegment(index)
} else {
// no need to clone as there will be no modification of this bytearray
pendingData[index] = bytes
}
// write the cached bytes submitted by other threads
while (true) {
fileStream.write(
pendingData.remove(metadata.hlsWrittenProgress) ?: break
)
metadata.setWrittenSegment(metadata.hlsWrittenProgress)
}
} catch (t : Throwable) {
// this is in case of write fail
if (metadata.type != DownloadType.IsStopped) {
metadata.type = DownloadType.IsFailed
}
} finally {
fileMutex.unlock()
}
} }
}.forEach { (idx, bytes) ->
if (bytes == null) {
metadata.type = DownloadType.IsFailed
return@withContext ERROR_CONNECTION_ERROR
}
fileStream.write(bytes)
metadata.setWrittenSegment(idx)
} }
} }
// fast stop as the jobs may be in a slow request
metadata.setOnStop {
jobs.cancel()
}
jobs.join()
metadata.removeStopListener()
if (metadata.type == DownloadType.IsFailed) {
return@withContext ERROR_CONNECTION_ERROR
}
if (metadata.type == DownloadType.IsStopped) { if (metadata.type == DownloadType.IsStopped) {
// we need to close before delete // we need to close before delete
fileStream.closeQuietly() fileStream.closeQuietly()