From 2195c3df765c3e1e435595d9edbec8ad3590bf46 Mon Sep 17 00:00:00 2001 From: Mitchell Syer Date: Mon, 7 Nov 2022 20:09:26 -0500 Subject: [PATCH] Downloader Rewrite (#437) * Downloader rewrite - Rewrite downloader to use coroutines instead of a thread - Remove unused Page functions - Add page progress - Add ProgressResponseBody - Add support for canceling a download in the middle of downloading - Fix clear download queue * Minor fix * Minor improvements - notifyAllClients now launches in another thread and only sends new data every second - Better handling of download queue checker in step() - Minor improvements and fixes * Reorder downloads * Download in parallel by source * Remove TODO --- .../tachiyomi/network/OkHttpExtensions.kt | 14 +- .../tachiyomi/network/ProgressResponseBody.kt | 44 +++++++ .../source/local/loader/EpubPageLoader.kt | 2 - .../source/local/loader/RarPageLoader.kt | 3 - .../source/local/loader/ZipPageLoader.kt | 2 - .../eu/kanade/tachiyomi/source/model/Page.kt | 40 +----- .../source/online/HttpSourceFetcher.kt | 2 - .../suwayomi/tachidesk/manga/MangaAPI.kt | 3 +- .../manga/controller/DownloadController.kt | 35 +++-- .../suwayomi/tachidesk/manga/impl/Page.kt | 4 +- .../manga/impl/download/DownloadManager.kt | 122 ++++++++++++++---- .../manga/impl/download/Downloader.kt | 119 +++++++++++++---- 12 files changed, 274 insertions(+), 116 deletions(-) create mode 100644 server/src/main/kotlin/eu/kanade/tachiyomi/network/ProgressResponseBody.kt diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/network/OkHttpExtensions.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/network/OkHttpExtensions.kt index a637bd15..c8f17d30 100644 --- a/server/src/main/kotlin/eu/kanade/tachiyomi/network/OkHttpExtensions.kt +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/network/OkHttpExtensions.kt @@ -116,13 +116,13 @@ fun Call.asObservableSuccess(): Observable { @Suppress("UNUSED_PARAMETER") fun OkHttpClient.newCallWithProgress(request: Request, listener: ProgressListener): Call { val progressClient = newBuilder() -// .cache(null) -// .addNetworkInterceptor { chain -> -// val originalResponse = chain.proceed(chain.request()) -// originalResponse.newBuilder() -// .body(ProgressResponseBody(originalResponse.body!!, listener)) -// .build() -// } + .cache(null) + .addNetworkInterceptor { chain -> + val originalResponse = chain.proceed(chain.request()) + originalResponse.newBuilder() + .body(ProgressResponseBody(originalResponse.body!!, listener)) + .build() + } .build() return progressClient.newCall(request) diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/network/ProgressResponseBody.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/network/ProgressResponseBody.kt new file mode 100644 index 00000000..72248f17 --- /dev/null +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/network/ProgressResponseBody.kt @@ -0,0 +1,44 @@ +package eu.kanade.tachiyomi.network + +import okhttp3.MediaType +import okhttp3.ResponseBody +import okio.Buffer +import okio.BufferedSource +import okio.ForwardingSource +import okio.Source +import okio.buffer +import java.io.IOException + +class ProgressResponseBody(private val responseBody: ResponseBody, private val progressListener: ProgressListener) : ResponseBody() { + + private val bufferedSource: BufferedSource by lazy { + source(responseBody.source()).buffer() + } + + override fun contentType(): MediaType? { + return responseBody.contentType() + } + + override fun contentLength(): Long { + return responseBody.contentLength() + } + + override fun source(): BufferedSource { + return bufferedSource + } + + private fun source(source: Source): Source { + return object : ForwardingSource(source) { + var totalBytesRead = 0L + + @Throws(IOException::class) + override fun read(sink: Buffer, byteCount: Long): Long { + val bytesRead = super.read(sink, byteCount) + // read() returns the number of bytes read, or -1 if this source is exhausted. + totalBytesRead += if (bytesRead != -1L) bytesRead else 0 + progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1L) + return bytesRead + } + } + } +} diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/EpubPageLoader.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/EpubPageLoader.kt index 55c23311..2e15eb04 100644 --- a/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/EpubPageLoader.kt +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/EpubPageLoader.kt @@ -1,6 +1,5 @@ package eu.kanade.tachiyomi.source.local.loader -import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.util.storage.EpubFile import java.io.File @@ -24,7 +23,6 @@ class EpubPageLoader(file: File) : PageLoader { val streamFn = { epub.getInputStream(epub.getEntry(path)!!) } ReaderPage(i).apply { stream = streamFn - status = Page.READY } } } diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/RarPageLoader.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/RarPageLoader.kt index 50d16e24..ea559bbf 100644 --- a/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/RarPageLoader.kt +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/RarPageLoader.kt @@ -2,7 +2,6 @@ package eu.kanade.tachiyomi.source.local.loader import com.github.junrar.Archive import com.github.junrar.rarfile.FileHeader -import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import suwayomi.tachidesk.manga.impl.util.storage.ImageUtil import java.io.ByteArrayInputStream @@ -46,7 +45,6 @@ class RarPageLoader(file: File) : PageLoader { ReaderPage(i).apply { stream = streamFn - status = Page.READY } } } @@ -58,7 +56,6 @@ class RarPageLoader(file: File) : PageLoader { ReaderPage(i).apply { stream = streamFn - status = Page.READY } } } diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/ZipPageLoader.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/ZipPageLoader.kt index 6aa0a026..9742ac03 100644 --- a/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/ZipPageLoader.kt +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/source/local/loader/ZipPageLoader.kt @@ -1,6 +1,5 @@ package eu.kanade.tachiyomi.source.local.loader -import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import suwayomi.tachidesk.manga.impl.util.storage.ImageUtil import java.io.File @@ -24,7 +23,6 @@ class ZipPageLoader(file: File) : PageLoader { val streamFn = { zip.getInputStream(entry) } ReaderPage(i).apply { stream = streamFn - status = Page.READY } } } diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/source/model/Page.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/source/model/Page.kt index 22436bfe..2784c438 100644 --- a/server/src/main/kotlin/eu/kanade/tachiyomi/source/model/Page.kt +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/source/model/Page.kt @@ -2,7 +2,8 @@ package eu.kanade.tachiyomi.source.model import android.net.Uri import eu.kanade.tachiyomi.network.ProgressListener -import rx.subjects.Subject +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.asStateFlow open class Page( val index: Int, @@ -11,48 +12,17 @@ open class Page( @Transient var uri: Uri? = null // Deprecated but can't be deleted due to extensions ) : ProgressListener { - val number: Int - get() = index + 1 - - @Transient - @Volatile - var status: Int = 0 - set(value) { - field = value - statusSubject?.onNext(value) - statusCallback?.invoke(this) - } - - @Transient - @Volatile - var progress: Int = 0 - set(value) { - field = value - statusCallback?.invoke(this) - } - - @Transient - private var statusSubject: Subject? = null - - @Transient - private var statusCallback: ((Page) -> Unit)? = null + private val _progress = MutableStateFlow(0) + val progress = _progress.asStateFlow() override fun update(bytesRead: Long, contentLength: Long, done: Boolean) { - progress = if (contentLength > 0) { + _progress.value = if (contentLength > 0) { (100 * bytesRead / contentLength).toInt() } else { -1 } } - fun setStatusSubject(subject: Subject?) { - this.statusSubject = subject - } - - fun setStatusCallback(f: ((Page) -> Unit)?) { - statusCallback = f - } - companion object { const val QUEUE = 0 const val LOAD_PAGE = 1 diff --git a/server/src/main/kotlin/eu/kanade/tachiyomi/source/online/HttpSourceFetcher.kt b/server/src/main/kotlin/eu/kanade/tachiyomi/source/online/HttpSourceFetcher.kt index 7b3ea4bd..26969fc2 100644 --- a/server/src/main/kotlin/eu/kanade/tachiyomi/source/online/HttpSourceFetcher.kt +++ b/server/src/main/kotlin/eu/kanade/tachiyomi/source/online/HttpSourceFetcher.kt @@ -4,9 +4,7 @@ import eu.kanade.tachiyomi.source.model.Page import rx.Observable fun HttpSource.getImageUrl(page: Page): Observable { - page.status = Page.LOAD_PAGE return fetchImageUrl(page) - .doOnError { page.status = Page.ERROR } .onErrorReturn { null } .doOnNext { page.imageUrl = it } .map { page } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/MangaAPI.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/MangaAPI.kt index 1b63c38f..2ee57ddf 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/MangaAPI.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/MangaAPI.kt @@ -106,12 +106,13 @@ object MangaAPI { get("start", DownloadController.start) get("stop", DownloadController.stop) - get("clear", DownloadController.stop) + get("clear", DownloadController.clear) } path("download") { get("{mangaId}/chapter/{chapterIndex}", DownloadController.queueChapter) delete("{mangaId}/chapter/{chapterIndex}", DownloadController.unqueueChapter) + patch("{mangaId}/chapter/{chapterIndex}/reorder/{to}", DownloadController.reorderChapter) post("batch", DownloadController.queueChapters) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt index 8c2cecb1..7d7f9547 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/DownloadController.kt @@ -46,10 +46,8 @@ object DownloadController { description("Start the downloader") } }, - behaviorOf = { ctx -> + behaviorOf = { DownloadManager.start() - - ctx.status(200) }, withResults = { httpCode(HttpCode.OK) @@ -65,9 +63,9 @@ object DownloadController { } }, behaviorOf = { ctx -> - DownloadManager.stop() - - ctx.status(200) + ctx.future( + future { DownloadManager.stop() } + ) }, withResults = { httpCode(HttpCode.OK) @@ -83,9 +81,9 @@ object DownloadController { } }, behaviorOf = { ctx -> - DownloadManager.clear() - - ctx.status(200) + ctx.future( + future { DownloadManager.clear() } + ) }, withResults = { httpCode(HttpCode.OK) @@ -155,4 +153,23 @@ object DownloadController { httpCode(HttpCode.OK) } ) + + /** clear download queue */ + val reorderChapter = handler( + pathParam("chapterIndex"), + pathParam("mangaId"), + pathParam("to"), + documentWith = { + withOperation { + summary("Downloader reorder chapter") + description("Reorder chapter in download queue") + } + }, + behaviorOf = { _, chapterIndex, mangaId, to -> + DownloadManager.reorder(chapterIndex, mangaId, to) + }, + withResults = { + httpCode(HttpCode.OK) + } + ) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Page.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Page.kt index 125fc692..96c4d32d 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Page.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/Page.kt @@ -10,6 +10,7 @@ package suwayomi.tachidesk.manga.impl import eu.kanade.tachiyomi.source.local.LocalSource import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.online.HttpSource +import kotlinx.coroutines.flow.StateFlow import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction @@ -37,7 +38,7 @@ object Page { return page.imageUrl!! } - suspend fun getPageImage(mangaId: Int, chapterIndex: Int, index: Int, useCache: Boolean = true): Pair { + suspend fun getPageImage(mangaId: Int, chapterIndex: Int, index: Int, useCache: Boolean = true, progressFlow: ((StateFlow) -> Unit)? = null): Pair { val mangaEntry = transaction { MangaTable.select { MangaTable.id eq mangaId }.first() } val source = getCatalogueSourceOrStub(mangaEntry[MangaTable.sourceReference]) val chapterEntry = transaction { @@ -55,6 +56,7 @@ object Page { pageEntry[PageTable.url], pageEntry[PageTable.imageUrl] ) + progressFlow?.invoke(tachiyomiPage.progress) // we treat Local source differently if (source.id == LocalSource.ID) { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt index 462d24f6..d5c93693 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt @@ -9,6 +9,16 @@ package suwayomi.tachidesk.manga.impl.download import io.javalin.websocket.WsContext import io.javalin.websocket.WsMessageContext +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.sample +import kotlinx.coroutines.launch import kotlinx.serialization.Serializable import mu.KotlinLogging import org.jetbrains.exposed.sql.and @@ -24,13 +34,17 @@ import suwayomi.tachidesk.manga.model.table.MangaTable import suwayomi.tachidesk.manga.model.table.toDataClass import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList +import kotlin.time.Duration.Companion.seconds private val logger = KotlinLogging.logger {} +private const val MAX_SOURCES_IN_PARAllEL = 5 + object DownloadManager { + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val clients = ConcurrentHashMap() private val downloadQueue = CopyOnWriteArrayList() - private var downloader: Downloader? = null + private val downloaders = ConcurrentHashMap() fun addClient(ctx: WsContext) { clients[ctx.sessionId] = ctx @@ -61,23 +75,73 @@ object DownloadManager { } } + private val notifyFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + + init { + scope.launch { + notifyFlow.sample(1.seconds).collect { + val status = getStatus() + clients.forEach { + it.value.send(status) + } + } + } + } + private fun notifyAllClients() { - val status = getStatus() - clients.forEach { - it.value.send(status) + scope.launch { + notifyFlow.emit(Unit) } } private fun getStatus(): DownloadStatus { return DownloadStatus( - if (downloader == null || - downloadQueue.none { it.state == Downloading } - ) { + if (downloadQueue.none { it.state == Downloading }) { "Stopped" } else { "Started" }, - downloadQueue + downloadQueue.toList() + ) + } + + private val downloaderWatch = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + init { + scope.launch { + downloaderWatch.sample(1.seconds).collect { + val runningDownloaders = downloaders.values.filter { it.isActive } + logger.info { "Running: ${runningDownloaders.size}" } + if (runningDownloaders.size < MAX_SOURCES_IN_PARAllEL) { + downloadQueue.asSequence() + .map { it.manga.sourceId.toLong() } + .distinct() + .minus( + runningDownloaders.map { it.sourceId }.toSet() + ) + .take(MAX_SOURCES_IN_PARAllEL - runningDownloaders.size) + .map { getDownloader(it) } + .forEach { + it.start() + } + notifyAllClients() + } + } + } + } + + private fun refreshDownloaders() { + scope.launch { + downloaderWatch.emit(Unit) + } + } + + private fun getDownloader(sourceId: Long) = downloaders.getOrPut(sourceId) { + Downloader( + scope = scope, + sourceId = sourceId, + downloadQueue = downloadQueue, + notifier = ::notifyAllClients, + onComplete = ::refreshDownloaders ) } @@ -99,7 +163,7 @@ object DownloadManager { ) fun enqueue(input: EnqueueInput) { - if (input.chapterIds == null) return + if (input.chapterIds.isNullOrEmpty()) return val chapters = transaction { (ChapterTable innerJoin MangaTable) @@ -136,6 +200,9 @@ object DownloadManager { start() notifyAllClients() } + scope.launch { + downloaderWatch.emit(Unit) + } } /** @@ -163,31 +230,32 @@ object DownloadManager { notifyAllClients() } + fun reorder(chapterIndex: Int, mangaId: Int, to: Int) { + require(to >= 0) { "'to' must be over or equal to 0" } + val download = downloadQueue.find { it.mangaId == mangaId && it.chapterIndex == chapterIndex } + ?: return + downloadQueue -= download + downloadQueue.add(to, download) + } + fun start() { - if (downloader != null && !downloader?.isAlive!!) { - // doesn't exist or is dead - downloader = null + scope.launch { + downloaderWatch.emit(Unit) } + } - if (downloader == null) { - downloader = Downloader(downloadQueue) { notifyAllClients() } - downloader!!.start() + suspend fun stop() { + coroutineScope { + downloaders.map { (_, downloader) -> + async { + downloader.stop() + } + }.awaitAll() } - notifyAllClients() } - fun stop() { - downloader?.let { - synchronized(it.shouldStop) { - it.shouldStop = true - } - } - downloader = null - notifyAllClients() - } - - fun clear() { + suspend fun clear() { stop() downloadQueue.clear() notifyAllClients() diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt index 923b8e66..30b7a921 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt @@ -7,7 +7,18 @@ package suwayomi.tachidesk.manga.impl.download * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.sample +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch import mu.KotlinLogging import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.transactions.transaction @@ -24,40 +35,92 @@ import java.util.concurrent.CopyOnWriteArrayList private val logger = KotlinLogging.logger {} -class Downloader(private val downloadQueue: CopyOnWriteArrayList, val notifier: () -> Unit) : Thread() { - var shouldStop: Boolean = false +class Downloader( + private val scope: CoroutineScope, + val sourceId: Long, + private val downloadQueue: CopyOnWriteArrayList, + private val notifier: () -> Unit, + private val onComplete: () -> Unit +) { + private var job: Job? = null + class StopDownloadException : Exception("Cancelled download") + class PauseDownloadException : Exception("Pause download") - class DownloadShouldStopException : Exception() - - fun step() { + private suspend fun step(download: DownloadChapter?) { notifier() - synchronized(shouldStop) { - if (shouldStop) throw DownloadShouldStopException() + currentCoroutineContext().ensureActive() + if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) { + if (download in downloadQueue) { + throw PauseDownloadException() + } else { + throw StopDownloadException() + } } } - override fun run() { - do { + val isActive + get() = job?.isActive == true + + fun start() { + if (!isActive) { + job = scope.launch { + run() + }.also { job -> + job.invokeOnCompletion { + if (it !is CancellationException) { + onComplete() + } + } + } + } + + notifier() + } + + suspend fun stop() { + job?.cancelAndJoin() + } + + private suspend fun run() { + while (downloadQueue.isNotEmpty() && currentCoroutineContext().isActive) { val download = downloadQueue.firstOrNull { - it.state == Queued || - (it.state == Error && it.tries < 3) // 3 re-tries per download + it.manga.sourceId.toLong() == sourceId && + (it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download } ?: break try { download.state = Downloading - step() + step(download) - download.chapter = runBlocking { getChapterDownloadReady(download.chapterIndex, download.mangaId) } - step() + download.chapter = getChapterDownloadReady(download.chapterIndex, download.mangaId) + step(download) val pageCount = download.chapter.pageCount for (pageNum in 0 until pageCount) { - runBlocking { getPageImage(download.mangaId, download.chapterIndex, pageNum) }.first.close() + var pageProgressJob: Job? = null + try { + getPageImage( + mangaId = download.mangaId, + chapterIndex = download.chapterIndex, + index = pageNum, + progressFlow = { flow -> + pageProgressJob = flow + .sample(100) + .distinctUntilChanged() + .onEach { + download.progress = (pageNum.toFloat() + (it.toFloat() * 0.01f)) / pageCount + step(null) // don't throw on canceled download here since we can't do anything + } + .launchIn(scope) + } + ).first.close() + } finally { + // always cancel the page progress job even if it throws an exception to avoid memory leaks + pageProgressJob?.cancel() + } // TODO: retry on error with 2,4,8 seconds of wait - // TODO: download multiple pages at once, possible solution: rx observer's strategy is used in Tachiyomi - // TODO: fine grained download percentage - download.progress = (pageNum + 1).toFloat() / pageCount - step() + download.progress = ((pageNum + 1).toFloat()) / pageCount + step(download) } download.state = Finished transaction { @@ -65,20 +128,22 @@ class Downloader(private val downloadQueue: CopyOnWriteArrayList