Optimize Download Queue (#1627)

* Optimize download Queue

* Lint

* Fix name of DownloadStatus file

* Re-add synchronous status fetch
This commit is contained in:
Mitchell Syer
2025-09-09 18:13:31 -04:00
committed by GitHub
parent 055c1c47f6
commit 679e2c0da9
10 changed files with 129 additions and 75 deletions

View File

@@ -17,7 +17,6 @@ import suwayomi.tachidesk.manga.impl.download.model.Status
import suwayomi.tachidesk.manga.model.table.ChapterTable
import suwayomi.tachidesk.server.JavalinSetup.Attribute
import suwayomi.tachidesk.server.JavalinSetup.future
import suwayomi.tachidesk.server.JavalinSetup.getAttribute
import suwayomi.tachidesk.server.user.requireUser
import java.util.concurrent.CompletableFuture
import kotlin.time.Duration.Companion.seconds
@@ -114,7 +113,7 @@ class DownloadMutation {
DownloadStatus(
DownloadManager.updates
.first {
DownloadManager.getStatus().queue.any { it.chapter.id in chapters }
DownloadManager.getStatus().queue.any { it.chapterId in chapters }
}.let { DownloadManager.getStatus() },
)
},
@@ -150,7 +149,7 @@ class DownloadMutation {
withTimeout(30.seconds) {
DownloadStatus(
DownloadManager.updates
.first { it.updates.any { it.downloadChapter.chapter.id == chapter } }
.first { it.updates.any { it.downloadQueueItem.chapterId == chapter } }
.let { DownloadManager.getStatus() },
)
},
@@ -188,7 +187,7 @@ class DownloadMutation {
DownloadManager.updates
.first {
it.updates.any {
it.downloadChapter.chapter.id in chapters && it.type == DEQUEUED
it.downloadQueueItem.chapterId in chapters && it.type == DEQUEUED
}
}.let { DownloadManager.getStatus() },
)
@@ -227,7 +226,7 @@ class DownloadMutation {
DownloadManager.updates
.first {
it.updates.any {
it.downloadChapter.chapter.id == chapter && it.type == DEQUEUED
it.downloadQueueItem.chapterId == chapter && it.type == DEQUEUED
}
}.let { DownloadManager.getStatus() },
)
@@ -361,7 +360,7 @@ class DownloadMutation {
withTimeout(30.seconds) {
DownloadStatus(
DownloadManager.updates
.first { it.updates.indexOfFirst { it.downloadChapter.chapter.id == chapter } <= to }
.first { it.updates.indexOfFirst { it.downloadQueueItem.chapterId == chapter } <= to }
.let { DownloadManager.getStatus() },
)
},

View File

@@ -17,7 +17,7 @@ import suwayomi.tachidesk.graphql.server.primitives.Node
import suwayomi.tachidesk.graphql.server.primitives.NodeList
import suwayomi.tachidesk.graphql.server.primitives.PageInfo
import suwayomi.tachidesk.graphql.types.DownloadState.FINISHED
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadQueueItem
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType
@@ -71,8 +71,8 @@ class DownloadType(
val tries: Int,
val position: Int,
) : Node {
constructor(downloadChapter: DownloadChapter) : this(
downloadChapter.chapter.id,
constructor(downloadChapter: DownloadQueueItem) : this(
downloadChapter.chapterId,
downloadChapter.mangaId,
when (downloadChapter.state) {
OtherDownloadState.Queued -> DownloadState.QUEUED
@@ -110,7 +110,7 @@ class DownloadUpdate(
) : Node {
constructor(downloadUpdate: DownloadUpdate) : this(
downloadUpdate.type,
DownloadType(downloadUpdate.downloadChapter),
DownloadType(downloadUpdate.downloadQueueItem),
)
}

View File

@@ -6,7 +6,7 @@ import suwayomi.tachidesk.manga.impl.chapter.getChapterDownloadReady
import suwayomi.tachidesk.manga.impl.download.fileProvider.ChaptersFilesProvider
import suwayomi.tachidesk.manga.impl.download.fileProvider.impl.ArchiveProvider
import suwayomi.tachidesk.manga.impl.download.fileProvider.impl.FolderProvider
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadQueueItem
import suwayomi.tachidesk.manga.impl.util.getChapterCbzPath
import suwayomi.tachidesk.manga.impl.util.getChapterDownloadPath
import suwayomi.tachidesk.manga.model.table.ChapterTable
@@ -39,9 +39,9 @@ object ChapterDownloadHelper {
suspend fun download(
mangaId: Int,
chapterId: Int,
download: DownloadChapter,
download: DownloadQueueItem,
scope: CoroutineScope,
step: suspend (DownloadChapter?, Boolean) -> Unit,
step: suspend (DownloadQueueItem?, Boolean) -> Unit,
): Boolean = provider(mangaId, chapterId).download().execute(download, scope, step)
// return the appropriate provider based on how the download was saved. For the logic is simple but will evolve when new types of downloads are available

View File

@@ -48,7 +48,7 @@ object Page {
suspend fun getPageImage(
mangaId: Int,
chapterIndex: Int,
chapterId: Int,
index: Int,
format: String? = null,
progressFlow: ((StateFlow<Int>) -> Unit)? = null,
@@ -58,10 +58,8 @@ object Page {
transaction {
ChapterTable
.selectAll()
.where {
(ChapterTable.sourceOrder eq chapterIndex) and
(ChapterTable.manga eq mangaId)
}.first()
.where { ChapterTable.id eq chapterId }
.first()
}
val chapterId = chapterEntry[ChapterTable.id].value

View File

@@ -31,12 +31,14 @@ import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.transactions.transaction
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadQueueItem
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates
import suwayomi.tachidesk.manga.impl.download.model.OldDownloadStatus
import suwayomi.tachidesk.manga.impl.download.model.Status
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
@@ -58,9 +60,9 @@ private val logger = KotlinLogging.logger {}
object DownloadManager {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val clients = ConcurrentHashMap<String, WsContext>()
private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>()
private val downloadQueue = CopyOnWriteArrayList<DownloadQueueItem>()
private val downloadUpdates = CopyOnWriteArraySet<DownloadUpdate>()
private val downloaders = ConcurrentHashMap<String, Downloader>()
private val downloaders = ConcurrentHashMap<Long, Downloader>()
private const val DOWNLOAD_QUEUE_KEY = "downloadQueueKey"
private val sharedPreferences =
@@ -76,7 +78,7 @@ object DownloadManager {
private fun saveDownloadQueue() {
sharedPreferences
.edit()
.putStringSet(DOWNLOAD_QUEUE_KEY, downloadQueue.map { it.chapter.id.toString() }.toSet())
.putStringSet(DOWNLOAD_QUEUE_KEY, downloadQueue.map { it.chapterId.toString() }.toSet())
.apply()
}
@@ -157,12 +159,6 @@ object DownloadManager {
saveQueueFlow.onEach { saveDownloadQueue() }.launchIn(scope)
}
private fun sendStatusToAllClients(status: DownloadStatus) {
clients.forEach {
it.value.send(status)
}
}
private fun notifyAllClients(
immediate: Boolean = false,
downloads: List<DownloadUpdate> = emptyList(),
@@ -171,11 +167,11 @@ object DownloadManager {
val outdatedUpdates =
downloadUpdates.filter { update ->
downloads.any { download ->
download.downloadChapter.chapter.id ==
update.downloadChapter.chapter.id
download.downloadQueueItem.chapterId ==
update.downloadQueueItem.chapterId
}
}
downloadUpdates.removeAll(outdatedUpdates)
downloadUpdates.removeAll(outdatedUpdates.toSet())
downloadUpdates.addAll(downloads)
// There is a problem where too many immediate updates can cause the client to lag out (e.g., in case it has to
@@ -196,10 +192,14 @@ object DownloadManager {
if (immediate) {
val status = getStatus()
scope.launch {
statusFlow.emit(status)
sendStatusToAllClients(status)
if (clients.isNotEmpty()) {
val status = getOldStatus(status)
clients.forEach {
it.value.send(status)
}
}
}
return
@@ -220,6 +220,40 @@ object DownloadManager {
downloadQueue.toList(),
)
fun getOldStatus(status: DownloadStatus): OldDownloadStatus =
OldDownloadStatus(
status.status,
run {
val items = status.queue
val mangaIds = items.map { it.mangaId }.toSet()
val chapterIds = items.map { it.chapterId }.toSet()
transaction {
val mangas =
MangaTable
.selectAll()
.where { MangaTable.id inList mangaIds }
.associateBy({ it[MangaTable.id].value }, { MangaTable.toDataClass(it) })
val chapters =
ChapterTable
.selectAll()
.where { ChapterTable.id inList chapterIds }
.associateBy({ it[ChapterTable.id].value }, { ChapterTable.toDataClass(it) })
items.mapNotNull {
DownloadChapter(
it.chapterIndex,
it.mangaId,
chapters[it.chapterId] ?: return@mapNotNull null,
mangas[it.mangaId] ?: return@mapNotNull null,
it.position,
it.state,
it.progress,
it.tries,
)
}
}
},
)
private fun getDownloadUpdates(addInitial: Boolean = false): DownloadUpdates =
DownloadUpdates(
if (downloaders.values.any { it.isActive }) {
@@ -264,7 +298,7 @@ object DownloadManager {
if (runningDownloaders.size < serverConfig.maxSourcesInParallel.value) {
availableDownloads
.asSequence()
.map { it.manga.sourceId }
.map { it.sourceId }
.distinct()
.minus(
runningDownloaders.map { it.sourceId }.toSet(),
@@ -285,7 +319,7 @@ object DownloadManager {
}
}
private fun getDownloader(sourceId: String) =
private fun getDownloader(sourceId: Long) =
downloaders.getOrPut(sourceId) {
Downloader(
scope = scope,
@@ -375,18 +409,19 @@ object DownloadManager {
private fun addToQueue(
manga: MangaDataClass,
chapter: ChapterDataClass,
): DownloadChapter? {
val downloadChapter = downloadQueue.firstOrNull { it.mangaId == manga.id && it.chapterIndex == chapter.index }
): DownloadQueueItem? {
val downloadChapter = downloadQueue.firstOrNull { it.chapterId == chapter.id }
val addToQueue = downloadChapter == null
if (addToQueue) {
val newDownloadChapter =
DownloadChapter(
DownloadQueueItem(
chapter.id,
chapter.index,
manga.id,
chapter,
manga,
manga.sourceId.toLong(),
downloadQueue.size,
0,
)
downloadQueue.add(newDownloadChapter)
triggerSaveDownloadQueue()
@@ -394,12 +429,12 @@ object DownloadManager {
return newDownloadChapter
}
val retryDownload = downloadChapter?.state == Error
val retryDownload = downloadChapter.state == Error
if (retryDownload) {
logger.debug { "Chapter ${chapter.id} download failed, retry download ($downloadChapter)" }
downloadChapter?.state = Queued
downloadChapter?.progress = 0f
downloadChapter.state = Queued
downloadChapter.progress = 0f
return downloadChapter
}
@@ -410,7 +445,7 @@ object DownloadManager {
fun dequeue(input: EnqueueInput) {
if (input.chapterIds.isNullOrEmpty()) return
dequeue(downloadQueue.filter { it.chapter.id in input.chapterIds }.toSet())
dequeue(downloadQueue.filter { it.chapterId in input.chapterIds }.toSet())
}
fun dequeue(
@@ -424,10 +459,10 @@ object DownloadManager {
mangaIds: List<Int>,
chaptersToIgnore: List<Int> = emptyList(),
) {
dequeue(downloadQueue.filter { it.mangaId in mangaIds && it.chapter.id !in chaptersToIgnore }.toSet())
dequeue(downloadQueue.filter { it.mangaId in mangaIds && it.chapterId !in chaptersToIgnore }.toSet())
}
private fun dequeue(chapterDownloads: Set<DownloadChapter>) {
private fun dequeue(chapterDownloads: Set<DownloadQueueItem>) {
logger.debug { "dequeue ${chapterDownloads.size} chapters [${chapterDownloads.joinToString(separator = ", ") { "$it" }}]" }
downloadQueue.removeAll(chapterDownloads)
@@ -453,14 +488,14 @@ object DownloadManager {
to: Int,
) {
val download =
downloadQueue.find { it.chapter.id == chapterId }
downloadQueue.find { it.chapterId == chapterId }
?: return
reorder(download, to)
}
private fun reorder(
download: DownloadChapter,
download: DownloadQueueItem,
to: Int,
) {
require(to >= 0) { "'to' must be over or equal to 0" }
@@ -506,11 +541,3 @@ object DownloadManager {
notifyAllClients(false, removedDownloads)
}
}
enum class DownloaderState(
val state: Int,
) {
Stopped(0),
Running(1),
Paused(2),
}

View File

@@ -17,12 +17,11 @@ import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.update
import suwayomi.tachidesk.manga.impl.ChapterDownloadHelper
import suwayomi.tachidesk.manga.impl.chapter.getChapterDownloadReadyById
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadQueueItem
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Finished
@@ -38,8 +37,8 @@ import java.util.concurrent.CopyOnWriteArrayList
class Downloader(
private val scope: CoroutineScope,
val sourceId: String,
private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>,
val sourceId: Long,
private val downloadQueue: CopyOnWriteArrayList<DownloadQueueItem>,
private val notifier: (immediate: Boolean, download: DownloadUpdate?) -> Unit,
private val onComplete: () -> Unit,
private val onDownloadFinished: () -> Unit,
@@ -52,7 +51,7 @@ class Downloader(
private var job: Job? = null
private val availableSourceDownloads
get() = downloadQueue.filter { it.manga.sourceId == sourceId }
get() = downloadQueue.filter { it.sourceId == sourceId }
class StopDownloadException : Exception("Cancelled download")
@@ -64,7 +63,7 @@ class Downloader(
downloadUpdate: DownloadUpdate?,
immediate: Boolean,
) {
val download = downloadUpdate?.downloadChapter
val download = downloadUpdate?.downloadQueueItem
notifier(immediate, downloadUpdate)
currentCoroutineContext().ensureActive()
if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) {
@@ -105,7 +104,7 @@ class Downloader(
private fun finishDownload(
logger: KLogger,
download: DownloadChapter,
download: DownloadQueueItem,
) {
notifier(true, DownloadUpdate(FINISHED, download))
downloadQueue -= download
@@ -137,19 +136,21 @@ class Downloader(
download.state = Downloading
step(DownloadUpdate(PROGRESS, download), true)
download.chapter = getChapterDownloadReadyById(download.chapter.id)
val chapter = getChapterDownloadReadyById(download.chapterId)
if (download.chapter.pageCount <= 0) {
if (chapter.pageCount <= 0) {
throw EmptyChapterException()
}
ChapterDownloadHelper.download(download.mangaId, download.chapter.id, download, scope) { downloadChapter, immediate ->
download.pageCount = chapter.pageCount
ChapterDownloadHelper.download(download.mangaId, download.chapterId, download, scope) { downloadChapter, immediate ->
step(downloadChapter?.let { DownloadUpdate(PROGRESS, downloadChapter) }, immediate)
}
download.state = Finished
transaction {
ChapterTable.update(
{ (ChapterTable.manga eq download.mangaId) and (ChapterTable.sourceOrder eq download.chapterIndex) },
{ (ChapterTable.id eq download.chapterId) },
) {
it[isDownloaded] = true
}

View File

@@ -17,7 +17,7 @@ import org.jetbrains.exposed.sql.update
import suwayomi.tachidesk.graphql.types.DownloadConversion
import suwayomi.tachidesk.manga.impl.Page
import suwayomi.tachidesk.manga.impl.chapter.getChapterDownloadReady
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadQueueItem
import suwayomi.tachidesk.manga.impl.util.KoreaderHelper
import suwayomi.tachidesk.manga.impl.util.createComicInfoFile
import suwayomi.tachidesk.manga.impl.util.getChapterCachePath
@@ -104,9 +104,9 @@ abstract class ChaptersFilesProvider<Type : FileType>(
@OptIn(FlowPreview::class)
private suspend fun downloadImpl(
download: DownloadChapter,
download: DownloadQueueItem,
scope: CoroutineScope,
step: suspend (DownloadChapter?, Boolean) -> Unit,
step: suspend (DownloadQueueItem?, Boolean) -> Unit,
): Boolean {
val existingDownloadPageCount =
try {
@@ -114,7 +114,7 @@ abstract class ChaptersFilesProvider<Type : FileType>(
} catch (_: Exception) {
0
}
val pageCount = download.chapter.pageCount
val pageCount = download.pageCount
check(pageCount > 0) { "pageCount must be greater than 0 - ChapterForDownload#getChapterDownloadReady not called" }
check(existingDownloadPageCount == 0 || existingDownloadPageCount == pageCount) {
@@ -153,7 +153,7 @@ abstract class ChaptersFilesProvider<Type : FileType>(
Page
.getPageImage(
mangaId = download.mangaId,
chapterIndex = download.chapterIndex,
chapterId = download.chapterId,
index = pageNum,
) { flow ->
pageProgressJob =
@@ -213,7 +213,7 @@ abstract class ChaptersFilesProvider<Type : FileType>(
/**
* This function should never be called without calling [getChapterDownloadReady] beforehand.
*/
override fun download(): FileDownload3Args<DownloadChapter, CoroutineScope, suspend (DownloadChapter?, Boolean) -> Unit> =
override fun download(): FileDownload3Args<DownloadQueueItem, CoroutineScope, suspend (DownloadQueueItem?, Boolean) -> Unit> =
FileDownload3Args(::downloadImpl)
abstract override fun delete(): Boolean

View File

@@ -0,0 +1,24 @@
package suwayomi.tachidesk.manga.impl.download.model
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
class DownloadQueueItem(
val chapterId: Int,
val chapterIndex: Int,
val mangaId: Int,
val sourceId: Long,
var position: Int,
var pageCount: Int,
var state: DownloadState = Queued,
var progress: Float = 0f,
var tries: Int = 0,
) {
override fun toString(): String = "$mangaId - $chapterId | state= $state, tries= $tries, progress= $progress"
}

View File

@@ -12,13 +12,18 @@ enum class Status {
Started,
}
data class DownloadStatus(
data class OldDownloadStatus(
val status: Status,
val queue: List<DownloadChapter>,
)
data class DownloadStatus(
val status: Status,
val queue: List<DownloadQueueItem>,
)
data class DownloadUpdates(
val status: Status,
val updates: List<DownloadUpdate>,
val initial: List<DownloadChapter>?,
val initial: List<DownloadQueueItem>?,
)

View File

@@ -13,5 +13,5 @@ enum class DownloadUpdateType {
data class DownloadUpdate(
val type: DownloadUpdateType,
val downloadChapter: DownloadChapter,
val downloadQueueItem: DownloadQueueItem,
)