diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/DownloadMutation.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/DownloadMutation.kt index 5ff89c43..e9a2b470 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/DownloadMutation.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/DownloadMutation.kt @@ -10,6 +10,7 @@ import suwayomi.tachidesk.graphql.types.ChapterType import suwayomi.tachidesk.graphql.types.DownloadStatus import suwayomi.tachidesk.manga.impl.Chapter import suwayomi.tachidesk.manga.impl.download.DownloadManager +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.DEQUEUED import suwayomi.tachidesk.manga.impl.download.model.Status import suwayomi.tachidesk.manga.model.table.ChapterTable import suwayomi.tachidesk.server.JavalinSetup.future @@ -94,7 +95,12 @@ class DownloadMutation { clientMutationId = clientMutationId, downloadStatus = withTimeout(30.seconds) { - DownloadStatus(DownloadManager.status.first { it.queue.any { it.chapter.id in chapters } }) + DownloadStatus( + DownloadManager.updates + .first { + DownloadManager.getStatus().queue.any { it.chapter.id in chapters } + }.let { DownloadManager.getStatus() }, + ) }, ) } @@ -122,7 +128,11 @@ class DownloadMutation { clientMutationId = clientMutationId, downloadStatus = withTimeout(30.seconds) { - DownloadStatus(DownloadManager.status.first { it.queue.any { it.chapter.id == chapter } }) + DownloadStatus( + DownloadManager.updates + .first { it.updates.any { it.downloadChapter.chapter.id == chapter } } + .let { DownloadManager.getStatus() }, + ) }, ) } @@ -152,7 +162,14 @@ class DownloadMutation { clientMutationId = clientMutationId, downloadStatus = withTimeout(30.seconds) { - DownloadStatus(DownloadManager.status.first { it.queue.none { it.chapter.id in chapters } }) + DownloadStatus( + DownloadManager.updates + .first { + it.updates.none { + it.downloadChapter.chapter.id in chapters && it.type != DEQUEUED + } + }.let { DownloadManager.getStatus() }, + ) }, ) } @@ -180,7 +197,14 @@ class DownloadMutation { clientMutationId = clientMutationId, downloadStatus = withTimeout(30.seconds) { - DownloadStatus(DownloadManager.status.first { it.queue.none { it.chapter.id == chapter } }) + DownloadStatus( + DownloadManager.updates + .first { + it.updates.none { + it.downloadChapter.chapter.id == chapter && it.type != DEQUEUED + } + }.let { DownloadManager.getStatus() }, + ) }, ) } @@ -206,7 +230,9 @@ class DownloadMutation { downloadStatus = withTimeout(30.seconds) { DownloadStatus( - DownloadManager.status.first { it.status == Status.Started }, + DownloadManager.updates + .first { it.status == Status.Started } + .let { DownloadManager.getStatus() }, ) }, ) @@ -232,7 +258,9 @@ class DownloadMutation { downloadStatus = withTimeout(30.seconds) { DownloadStatus( - DownloadManager.status.first { it.status == Status.Stopped }, + DownloadManager.updates + .first { it.status == Status.Stopped } + .let { DownloadManager.getStatus() }, ) }, ) @@ -258,7 +286,9 @@ class DownloadMutation { downloadStatus = withTimeout(30.seconds) { DownloadStatus( - DownloadManager.status.first { it.status == Status.Stopped && it.queue.isEmpty() }, + DownloadManager.updates + .first { it.status == Status.Stopped } + .let { DownloadManager.getStatus() }, ) }, ) @@ -288,7 +318,9 @@ class DownloadMutation { downloadStatus = withTimeout(30.seconds) { DownloadStatus( - DownloadManager.status.first { it.queue.indexOfFirst { it.chapter.id == chapter } <= to }, + DownloadManager.updates + .first { it.updates.indexOfFirst { it.downloadChapter.chapter.id == chapter } <= to } + .let { DownloadManager.getStatus() }, ) }, ) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/DownloadQuery.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/DownloadQuery.kt index 104285fb..8722ef6a 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/DownloadQuery.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/DownloadQuery.kt @@ -1,6 +1,5 @@ package suwayomi.tachidesk.graphql.queries -import kotlinx.coroutines.flow.first import suwayomi.tachidesk.graphql.types.DownloadStatus import suwayomi.tachidesk.manga.impl.download.DownloadManager import suwayomi.tachidesk.server.JavalinSetup.future @@ -9,6 +8,6 @@ import java.util.concurrent.CompletableFuture class DownloadQuery { fun downloadStatus(): CompletableFuture = future { - DownloadStatus(DownloadManager.status.first()) + DownloadStatus(DownloadManager.getStatus()) } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt index cba69c86..1bfefac8 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt @@ -7,14 +7,54 @@ package suwayomi.tachidesk.graphql.subscriptions +import com.expediagroup.graphql.generator.annotations.GraphQLDeprecated +import com.expediagroup.graphql.generator.annotations.GraphQLDescription import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import suwayomi.tachidesk.graphql.types.DownloadStatus +import suwayomi.tachidesk.graphql.types.DownloadUpdates import suwayomi.tachidesk.manga.impl.download.DownloadManager class DownloadSubscription { + @GraphQLDeprecated("Replaced width downloadStatusChanged", ReplaceWith("downloadStatusChanged(input)")) fun downloadChanged(): Flow = DownloadManager.status.map { downloadStatus -> DownloadStatus(downloadStatus) } + + data class DownloadChangedInput( + @GraphQLDescription( + "Sets a max number of updates that can be contained in a download update message." + + "Everything above this limit will be omitted and the \"downloadStatus\" should be re-fetched via the " + + "corresponding query. Due to the graphql subscription execution strategy not supporting batching for data loaders, " + + "the data loaders run into the n+1 problem, which can cause the server to get unresponsive until the status " + + "update has been handled. This is an issue e.g. when mass en- or dequeuing downloads.", + ) + val maxUpdates: Int?, + ) + + fun downloadStatusChanged(input: DownloadChangedInput): Flow { + val omitUpdates = input.maxUpdates != null + val maxUpdates = input.maxUpdates ?: 50 + + return DownloadManager.updates.map { downloadUpdates -> + val omittedUpdates = omitUpdates && downloadUpdates.updates.size > maxUpdates + + // the graphql subscription execution strategy does not support data loader batching which causes the n+1 problem, + // thus, too many updates (e.g. on mass enqueue or dequeue) causes unresponsiveness of the server until the + // update has been handled + val actualDownloadUpdates = + if (omittedUpdates) { + suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates( + downloadUpdates.status, + downloadUpdates.updates.subList(0, maxUpdates), + downloadUpdates.initial, + ) + } else { + downloadUpdates + } + + DownloadUpdates(actualDownloadUpdates, omittedUpdates) + } + } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt index a231b447..c3d7da44 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt @@ -7,6 +7,7 @@ package suwayomi.tachidesk.graphql.types +import com.expediagroup.graphql.generator.annotations.GraphQLDescription import com.expediagroup.graphql.generator.annotations.GraphQLIgnore import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import graphql.schema.DataFetchingEnvironment @@ -18,6 +19,9 @@ import suwayomi.tachidesk.graphql.server.primitives.PageInfo import suwayomi.tachidesk.graphql.types.DownloadState.FINISHED import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates import suwayomi.tachidesk.manga.impl.download.model.Status import java.util.concurrent.CompletableFuture import suwayomi.tachidesk.manga.impl.download.model.DownloadState as OtherDownloadState @@ -35,6 +39,28 @@ data class DownloadStatus( ) } +data class DownloadUpdates( + val state: DownloaderState, + val updates: List, + @GraphQLDescription("The current download queue at the time of sending initial message. Is null for all following messages") + val initial: List?, + @GraphQLDescription( + "Indicates whether updates have been omitted based on the \"maxUpdates\" subscription variable. " + + "In case updates have been omitted, the \"downloadStatus\" query should be re-fetched.", + ) + val omittedUpdates: Boolean, +) { + constructor(downloadUpdates: DownloadUpdates, omittedUpdates: Boolean) : this( + when (downloadUpdates.status) { + Status.Stopped -> DownloaderState.STOPPED + Status.Started -> DownloaderState.STARTED + }, + downloadUpdates.updates.map { DownloadUpdate(it) }, + downloadUpdates.initial?.map { DownloadType(it) }, + omittedUpdates, + ) +} + class DownloadType( @get:GraphQLIgnore val chapterId: Int, @@ -43,6 +69,7 @@ class DownloadType( val state: DownloadState, val progress: Float, val tries: Int, + val position: Int, ) : Node { constructor(downloadChapter: DownloadChapter) : this( downloadChapter.chapter.id, @@ -55,6 +82,7 @@ class DownloadType( }, downloadChapter.progress, downloadChapter.tries, + downloadChapter.position, ) fun manga(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture { @@ -76,6 +104,16 @@ class DownloadType( } } +class DownloadUpdate( + val type: DownloadUpdateType, + val download: DownloadType, +) : Node { + constructor(downloadUpdate: DownloadUpdate) : this( + downloadUpdate.type, + DownloadType(downloadUpdate.downloadChapter), + ) +} + enum class DownloadState { QUEUED, DOWNLOADING, diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt index 08d8db3b..47eec13f 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt @@ -21,7 +21,6 @@ import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.flow.sample @@ -36,6 +35,9 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates import suwayomi.tachidesk.manga.impl.download.model.Status import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass @@ -47,6 +49,7 @@ import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.CopyOnWriteArraySet import kotlin.reflect.jvm.jvmName import kotlin.time.Duration.Companion.seconds @@ -57,6 +60,7 @@ object DownloadManager { private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val clients = ConcurrentHashMap() private val downloadQueue = CopyOnWriteArrayList() + private val downloadUpdates = CopyOnWriteArraySet() private val downloaders = ConcurrentHashMap() private const val DOWNLOAD_QUEUE_KEY = "downloadQueueKey" @@ -81,6 +85,13 @@ object DownloadManager { scope.launch { saveQueueFlow.emit(Unit) } } + private fun handleDownloadUpdate( + immediate: Boolean, + download: DownloadUpdate? = null, + ) { + notifyAllClients(immediate, listOfNotNull(download)) + } + fun restoreAndResumeDownloads() { scope.launch { logger.debug { "restoreAndResumeDownloads: Restore download queue..." } @@ -124,9 +135,15 @@ object DownloadManager { private val notifyFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + @Deprecated("Replaced with updatesFlow", replaceWith = ReplaceWith("updatesFlow")) private val statusFlow = MutableSharedFlow() + + @Deprecated("Replaced with updates", replaceWith = ReplaceWith("updates")) val status = statusFlow.onStart { emit(getStatus()) } + private val updatesFlow = MutableSharedFlow() + val updates = updatesFlow.onStart { emit(getDownloadUpdates(addInitial = true)) } + init { scope.launch { notifyFlow.sample(1.seconds).collect { @@ -147,12 +164,21 @@ object DownloadManager { } } - private fun notifyAllClients(immediate: Boolean = false) { + private fun notifyAllClients( + immediate: Boolean = false, + downloads: List = emptyList(), + ) { + downloadUpdates.addAll(downloads) + if (immediate) { val status = getStatus() + val updates = getDownloadUpdates() + + downloadUpdates.clear() scope.launch { statusFlow.emit(status) + updatesFlow.emit(updates) sendStatusToAllClients(status) } @@ -164,7 +190,7 @@ object DownloadManager { } } - private fun getStatus(): DownloadStatus = + fun getStatus(): DownloadStatus = DownloadStatus( if (downloadQueue.none { it.state == Downloading }) { Status.Stopped @@ -174,6 +200,17 @@ object DownloadManager { downloadQueue.toList(), ) + private fun getDownloadUpdates(addInitial: Boolean = false): DownloadUpdates = + DownloadUpdates( + if (downloadQueue.none { it.state == Downloading }) { + Status.Stopped + } else { + Status.Started + }, + downloadUpdates.toList(), + if (addInitial) downloadQueue.toList() else null, + ) + private val downloaderWatch = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) init { @@ -234,7 +271,7 @@ object DownloadManager { scope = scope, sourceId = sourceId, downloadQueue = downloadQueue, - notifier = ::notifyAllClients, + notifier = ::handleDownloadUpdate, onComplete = ::refreshDownloaders, onDownloadFinished = ::triggerSaveDownloadQueue, ) @@ -303,7 +340,7 @@ object DownloadManager { val addedChapters = inputs.mapNotNull { addToQueue(it.first, it.second) } if (addedChapters.isNotEmpty()) { start() - notifyAllClients(true) + notifyAllClients(false, addedChapters.map { DownloadUpdate(DownloadUpdateType.QUEUED, it) }) } scope.launch { downloaderWatch.emit(Unit) @@ -328,6 +365,7 @@ object DownloadManager { manga.id, chapter, manga, + downloadQueue.size, ) downloadQueue.add(newDownloadChapter) triggerSaveDownloadQueue() @@ -374,7 +412,7 @@ object DownloadManager { downloadQueue.removeAll(chapterDownloads) triggerSaveDownloadQueue() - notifyAllClients() + notifyAllClients(false, chapterDownloads.toList().map { DownloadUpdate(DownloadUpdateType.DEQUEUED, it) }) } fun reorder( @@ -410,6 +448,8 @@ object DownloadManager { downloadQueue -= download downloadQueue.add(to, download) + download.position = to + notifyAllClients(false, listOf(DownloadUpdate(DownloadUpdateType.POSITION, download))) triggerSaveDownloadQueue() } @@ -439,9 +479,10 @@ object DownloadManager { logger.debug { "clear" } stop() + val removedDownloads = downloadQueue.toList().map { DownloadUpdate(DownloadUpdateType.DEQUEUED, it) } downloadQueue.clear() triggerSaveDownloadQueue() - notifyAllClients() + notifyAllClients(false, removedDownloads) } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt index 1a4b7c79..c1413c87 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/Downloader.kt @@ -27,6 +27,10 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Finished import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.ERROR +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.FINISHED +import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.PROGRESS import suwayomi.tachidesk.manga.model.table.ChapterTable import java.util.concurrent.CopyOnWriteArrayList @@ -34,7 +38,7 @@ class Downloader( private val scope: CoroutineScope, val sourceId: String, private val downloadQueue: CopyOnWriteArrayList, - private val notifier: (immediate: Boolean) -> Unit, + private val notifier: (immediate: Boolean, download: DownloadUpdate?) -> Unit, private val onComplete: () -> Unit, private val onDownloadFinished: () -> Unit, ) { @@ -49,10 +53,11 @@ class Downloader( class PauseDownloadException : Exception("Pause download") private suspend fun step( - download: DownloadChapter?, + downloadUpdate: DownloadUpdate?, immediate: Boolean, ) { - notifier(immediate) + val download = downloadUpdate?.downloadChapter + notifier(immediate, downloadUpdate) currentCoroutineContext().ensureActive() if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) { if (download in downloadQueue) { @@ -81,7 +86,7 @@ class Downloader( } } logger.debug { "started" } - notifier(false) + notifier(false, null) } } @@ -90,14 +95,14 @@ class Downloader( logger.debug { "stopped" } } - private suspend fun finishDownload( + private fun finishDownload( logger: KLogger, download: DownloadChapter, ) { - downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex } - step(null, false) - logger.debug { "finished" } + downloadQueue -= download + notifier(true, DownloadUpdate(FINISHED, download)) onDownloadFinished() + logger.debug { "finished" } } private suspend fun run() { @@ -122,12 +127,13 @@ class Downloader( try { download.state = Downloading - step(download, true) + step(DownloadUpdate(PROGRESS, download), true) download.chapter = getChapterDownloadReadyByIndex(download.chapterIndex, download.mangaId) - step(download, false) - ChapterDownloadHelper.download(download.mangaId, download.chapter.id, download, scope, this::step) + ChapterDownloadHelper.download(download.mangaId, download.chapter.id, download, scope) { downloadChapter, immediate -> + step(downloadChapter?.let { DownloadUpdate(PROGRESS, downloadChapter) }, immediate) + } download.state = Finished transaction { ChapterTable.update( @@ -136,7 +142,6 @@ class Downloader( it[isDownloaded] = true } } - step(download, true) finishDownload(downloadLogger, download) } catch (e: CancellationException) { logger.debug("Downloader was stopped") @@ -149,7 +154,7 @@ class Downloader( download.tries++ download.state = Error } finally { - notifier(false) + notifier(false, DownloadUpdate(ERROR, download)) } } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt index 8939ec4b..d12e9177 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadChapter.kt @@ -16,6 +16,7 @@ class DownloadChapter( val mangaId: Int, var chapter: ChapterDataClass, var manga: MangaDataClass, + var position: Int, var state: DownloadState = Queued, var progress: Float = 0f, var tries: Int = 0, diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadStatus.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadStatus.kt index d66132f3..684976a2 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadStatus.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadStatus.kt @@ -16,3 +16,9 @@ data class DownloadStatus( val status: Status, val queue: List, ) + +data class DownloadUpdates( + val status: Status, + val updates: List, + val initial: List?, +) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadUpdate.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadUpdate.kt new file mode 100644 index 00000000..41697464 --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/model/DownloadUpdate.kt @@ -0,0 +1,15 @@ +package suwayomi.tachidesk.manga.impl.download.model + +enum class DownloadUpdateType { + QUEUED, + DEQUEUED, + PROGRESS, + FINISHED, + ERROR, + POSITION, +} + +data class DownloadUpdate( + val type: DownloadUpdateType, + val downloadChapter: DownloadChapter, +)