From 439e0c8284308a731b5cbd4255cd4b08f75660f0 Mon Sep 17 00:00:00 2001 From: schroda <50052685+schroda@users.noreply.github.com> Date: Sun, 23 Mar 2025 00:34:43 +0100 Subject: [PATCH] Emit only updater job changes instead of full status (#1302) The update subscription emitted the full update status, which, depending on how big the status was, took forever because the graphql subscription does not support data loader batching, causing it to run into the n+1 problem --- .../graphql/mutations/UpdateMutation.kt | 59 ++++++--- .../tachidesk/graphql/queries/UpdateQuery.kt | 5 + .../subscriptions/DownloadSubscription.kt | 2 +- .../subscriptions/UpdateSubscription.kt | 51 ++++++++ .../tachidesk/graphql/types/CategoryType.kt | 10 ++ .../tachidesk/graphql/types/UpdateType.kt | 123 ++++++++++++++++-- .../tachidesk/manga/impl/update/IUpdater.kt | 5 + .../tachidesk/manga/impl/update/UpdateJob.kt | 11 ++ .../manga/impl/update/UpdateStatus.kt | 16 ++- .../tachidesk/manga/impl/update/Updater.kt | 107 ++++++++++++--- 10 files changed, 342 insertions(+), 47 deletions(-) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt index 08a3cb0a..a9217221 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt @@ -3,14 +3,11 @@ package suwayomi.tachidesk.graphql.mutations import graphql.execution.DataFetcherResult import kotlinx.coroutines.flow.first import kotlinx.coroutines.withTimeout -import org.jetbrains.exposed.sql.selectAll -import org.jetbrains.exposed.sql.transactions.transaction import suwayomi.tachidesk.graphql.asDataFetcherResult +import suwayomi.tachidesk.graphql.types.LibraryUpdateStatus import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.manga.impl.Category import suwayomi.tachidesk.manga.impl.update.IUpdater -import suwayomi.tachidesk.manga.model.table.CategoryTable -import suwayomi.tachidesk.manga.model.table.toDataClass import suwayomi.tachidesk.server.JavalinSetup.future import uy.kohesive.injekt.injectLazy import java.util.concurrent.CompletableFuture @@ -19,6 +16,38 @@ import kotlin.time.Duration.Companion.seconds class UpdateMutation { private val updater: IUpdater by injectLazy() + data class UpdateLibraryInput( + val clientMutationId: String? = null, + val categories: List?, + ) + + data class UpdateLibraryPayload( + val clientMutationId: String? = null, + val updateStatus: LibraryUpdateStatus, + ) + + fun updateLibrary(input: UpdateLibraryInput): CompletableFuture> { + updater.addCategoriesToUpdateQueue( + Category.getCategoryList().filter { input.categories?.contains(it.id) ?: true }, + clear = true, + forceAll = !input.categories.isNullOrEmpty(), + ) + + return future { + asDataFetcherResult { + UpdateLibraryPayload( + input.clientMutationId, + updateStatus = + withTimeout(30.seconds) { + LibraryUpdateStatus( + updater.updates.first(), + ) + }, + ) + } + } + } + data class UpdateLibraryMangaInput( val clientMutationId: String? = null, ) @@ -29,10 +58,11 @@ class UpdateMutation { ) fun updateLibraryManga(input: UpdateLibraryMangaInput): CompletableFuture> { - updater.addCategoriesToUpdateQueue( - Category.getCategoryList(), - clear = true, - forceAll = false, + updateLibrary( + UpdateLibraryInput( + clientMutationId = input.clientMutationId, + categories = null, + ), ) return future { @@ -59,13 +89,12 @@ class UpdateMutation { ) fun updateCategoryManga(input: UpdateCategoryMangaInput): CompletableFuture> { - val categories = - transaction { - CategoryTable.selectAll().where { CategoryTable.id inList input.categories }.map { - CategoryTable.toDataClass(it) - } - } - updater.addCategoriesToUpdateQueue(categories, clear = true, forceAll = true) + updateLibrary( + UpdateLibraryInput( + clientMutationId = input.clientMutationId, + categories = input.categories, + ), + ) return future { asDataFetcherResult { diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt index daa4cea6..94c0a67e 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt @@ -1,6 +1,8 @@ package suwayomi.tachidesk.graphql.queries +import com.expediagroup.graphql.generator.annotations.GraphQLDeprecated import kotlinx.coroutines.flow.first +import suwayomi.tachidesk.graphql.types.LibraryUpdateStatus import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.manga.impl.update.IUpdater import suwayomi.tachidesk.server.JavalinSetup.future @@ -10,8 +12,11 @@ import java.util.concurrent.CompletableFuture class UpdateQuery { private val updater: IUpdater by injectLazy() + @GraphQLDeprecated("Replaced with libraryUpdateStatus", ReplaceWith("libraryUpdateStatus")) fun updateStatus(): CompletableFuture = future { UpdateStatus(updater.status.first()) } + fun libraryUpdateStatus(): CompletableFuture = future { LibraryUpdateStatus(updater.getStatus()) } + data class LastUpdateTimestampPayload( val timestamp: Long, ) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt index 1bfefac8..9cfd2b97 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt @@ -16,7 +16,7 @@ import suwayomi.tachidesk.graphql.types.DownloadUpdates import suwayomi.tachidesk.manga.impl.download.DownloadManager class DownloadSubscription { - @GraphQLDeprecated("Replaced width downloadStatusChanged", ReplaceWith("downloadStatusChanged(input)")) + @GraphQLDeprecated("Replaced with downloadStatusChanged", ReplaceWith("downloadStatusChanged(input)")) fun downloadChanged(): Flow = DownloadManager.status.map { downloadStatus -> DownloadStatus(downloadStatus) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/UpdateSubscription.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/UpdateSubscription.kt index 6a66ce46..060aa53f 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/UpdateSubscription.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/UpdateSubscription.kt @@ -7,17 +7,68 @@ package suwayomi.tachidesk.graphql.subscriptions +import com.expediagroup.graphql.generator.annotations.GraphQLDeprecated +import com.expediagroup.graphql.generator.annotations.GraphQLDescription import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map import suwayomi.tachidesk.graphql.types.UpdateStatus +import suwayomi.tachidesk.graphql.types.UpdaterUpdates import suwayomi.tachidesk.manga.impl.update.IUpdater +import suwayomi.tachidesk.manga.impl.update.UpdateUpdates import uy.kohesive.injekt.injectLazy class UpdateSubscription { private val updater: IUpdater by injectLazy() + @GraphQLDeprecated("Replaced with updates", ReplaceWith("updates(input)")) fun updateStatusChanged(): Flow = updater.status.map { updateStatus -> UpdateStatus(updateStatus) } + + data class LibraryUpdateStatusChangedInput( + @GraphQLDescription( + "Sets a max number of updates that can be contained in a updater update message." + + "Everything above this limit will be omitted and the \"updateStatus\" should be re-fetched via the " + + "corresponding query. Due to the graphql subscription execution strategy not supporting batching for data loaders, " + + "the data loaders run into the n+1 problem, which can cause the server to get unresponsive until the status " + + "update has been handled. This is an issue e.g. when starting an update.", + ) + val maxUpdates: Int?, + ) + + fun libraryUpdateStatusChanged(input: LibraryUpdateStatusChangedInput): Flow { + val omitUpdates = input.maxUpdates != null + val maxUpdates = input.maxUpdates ?: 50 + + return updater.updates.map { updates -> + val categoryUpdatesCount = updates.categoryUpdates.size + val mangaUpdatesCount = updates.mangaUpdates.size + val totalUpdatesCount = categoryUpdatesCount + mangaUpdatesCount + + val needToOmitUpdates = omitUpdates && totalUpdatesCount > maxUpdates + if (!needToOmitUpdates) { + return@map UpdaterUpdates(updates, omittedUpdates = false) + } + + val maxUpdatesAfterCategoryUpdates = (maxUpdates - categoryUpdatesCount).coerceAtLeast(0) + + // the graphql subscription execution strategy does not support data loader batching which causes the n+1 problem, + // thus, too many updates (e.g. on mass enqueue or dequeue) causes unresponsiveness of the server until the + // update has been handled + UpdaterUpdates( + UpdateUpdates( + updates.isRunning, + updates.categoryUpdates.subList(0, maxUpdates), + updates.mangaUpdates.subList(0, maxUpdatesAfterCategoryUpdates), + updates.totalJobs, + updates.finishedJobs, + updates.skippedCategoriesCount, + updates.skippedMangasCount, + updates.initial, + ), + omittedUpdates = true, + ) + } + } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/CategoryType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/CategoryType.kt index b8a159df..21b04ea0 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/CategoryType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/CategoryType.kt @@ -15,6 +15,7 @@ import suwayomi.tachidesk.graphql.server.primitives.Edge import suwayomi.tachidesk.graphql.server.primitives.Node import suwayomi.tachidesk.graphql.server.primitives.NodeList import suwayomi.tachidesk.graphql.server.primitives.PageInfo +import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass import suwayomi.tachidesk.manga.model.dataclass.IncludeOrExclude import suwayomi.tachidesk.manga.model.table.CategoryTable import java.util.concurrent.CompletableFuture @@ -36,6 +37,15 @@ class CategoryType( IncludeOrExclude.fromValue(row[CategoryTable.includeInDownload]), ) + constructor(dataClass: CategoryDataClass) : this( + dataClass.id, + dataClass.order, + dataClass.name, + dataClass.default, + dataClass.includeInUpdate, + dataClass.includeInDownload, + ) + fun mangas(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture = dataFetchingEnvironment.getValueFromDataLoader("MangaForCategoryDataLoader", id) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/UpdateType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/UpdateType.kt index 1f455c19..7ccb2109 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/UpdateType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/UpdateType.kt @@ -1,11 +1,15 @@ package suwayomi.tachidesk.graphql.types +import com.expediagroup.graphql.generator.annotations.GraphQLDescription import com.expediagroup.graphql.generator.annotations.GraphQLIgnore import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import graphql.schema.DataFetchingEnvironment +import suwayomi.tachidesk.manga.impl.update.CategoryUpdateJob import suwayomi.tachidesk.manga.impl.update.CategoryUpdateStatus import suwayomi.tachidesk.manga.impl.update.JobStatus +import suwayomi.tachidesk.manga.impl.update.UpdateJob import suwayomi.tachidesk.manga.impl.update.UpdateStatus +import suwayomi.tachidesk.manga.impl.update.UpdateUpdates import java.util.concurrent.CompletableFuture private val jobStatusToMangaIdsToCacheClearedStatus = mutableMapOf>() @@ -47,14 +51,6 @@ class UpdateStatus( ) } -class UpdateStatusCategoryType( - @get:GraphQLIgnore - val categoryIds: List, -) { - fun categories(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture = - dataFetchingEnvironment.getValueFromDataLoader("CategoryForIdsDataLoader", categoryIds) -} - class UpdateStatusType( @get:GraphQLIgnore val mangaIds: List, @@ -85,6 +81,115 @@ class UpdateStatusType( } } - return dataFetchingEnvironment.getValueFromDataLoader, MangaNodeList>("MangaForIdsDataLoader", mangaIds) + return dataFetchingEnvironment.getValueFromDataLoader, MangaNodeList>( + "MangaForIdsDataLoader", + mangaIds, + ) } } + +class UpdateStatusCategoryType( + @get:GraphQLIgnore + val categoryIds: List, +) { + fun categories(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture = + dataFetchingEnvironment.getValueFromDataLoader("CategoryForIdsDataLoader", categoryIds) +} + +class LibraryUpdateStatus( + val categoryUpdates: List, + val mangaUpdates: List, + val jobsInfo: UpdaterJobsInfoType, +) { + constructor(updates: UpdateUpdates) : this( + categoryUpdates = updates.categoryUpdates.map(::CategoryUpdateType), + mangaUpdates = updates.mangaUpdates.map(::MangaUpdateType), + jobsInfo = + UpdaterJobsInfoType( + isRunning = updates.isRunning, + totalJobs = updates.totalJobs, + finishedJobs = updates.finishedJobs, + skippedCategoriesCount = updates.skippedCategoriesCount, + skippedMangasCount = updates.skippedMangasCount, + ), + ) +} + +enum class MangaJobStatus { + PENDING, + RUNNING, + COMPLETE, + FAILED, + SKIPPED, +} + +enum class CategoryJobStatus { + UPDATING, + SKIPPED, +} + +class MangaUpdateType( + val manga: MangaType, + val status: MangaJobStatus, +) { + constructor(job: UpdateJob) : this( + MangaType(job.manga), + when (job.status) { + JobStatus.PENDING -> MangaJobStatus.PENDING + JobStatus.RUNNING -> MangaJobStatus.RUNNING + JobStatus.COMPLETE -> MangaJobStatus.COMPLETE + JobStatus.FAILED -> MangaJobStatus.FAILED + JobStatus.SKIPPED -> MangaJobStatus.SKIPPED + }, + ) +} + +class CategoryUpdateType( + val category: CategoryType, + val status: CategoryJobStatus, +) { + constructor(job: CategoryUpdateJob) : this( + CategoryType(job.category), + when (job.status) { + CategoryUpdateStatus.UPDATING -> CategoryJobStatus.UPDATING + CategoryUpdateStatus.SKIPPED -> CategoryJobStatus.SKIPPED + }, + ) +} + +// wrap this info in a data class so that the update subscription updates the date of the update status in the clients cache +data class UpdaterJobsInfoType( + val isRunning: Boolean, + val totalJobs: Int, + val finishedJobs: Int, + val skippedCategoriesCount: Int, + val skippedMangasCount: Int, +) + +data class UpdaterUpdates( + val categoryUpdates: List, + val mangaUpdates: List, + @GraphQLDescription("The current update status at the time of sending the initial message. Is null for all following messages") + val initial: LibraryUpdateStatus?, + val jobsInfo: UpdaterJobsInfoType, + @GraphQLDescription( + "Indicates whether updates have been omitted based on the \"maxUpdates\" subscription variable. " + + "In case updates have been omitted, the \"updateStatus\" query should be re-fetched.", + ) + val omittedUpdates: Boolean, +) { + constructor(updates: UpdateUpdates, omittedUpdates: Boolean) : this( + categoryUpdates = updates.categoryUpdates.map(::CategoryUpdateType), + mangaUpdates = updates.mangaUpdates.map(::MangaUpdateType), + initial = updates.initial?.let { LibraryUpdateStatus(updates.initial) }, + jobsInfo = + UpdaterJobsInfoType( + isRunning = updates.isRunning, + totalJobs = updates.totalJobs, + finishedJobs = updates.finishedJobs, + skippedCategoriesCount = updates.skippedCategoriesCount, + skippedMangasCount = updates.skippedMangasCount, + ), + omittedUpdates = omittedUpdates, + ) +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt index 1dc00519..033c1b22 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt @@ -16,9 +16,14 @@ interface IUpdater { fun addMangasToQueue(mangas: List) + @Deprecated("Replaced with updates", replaceWith = ReplaceWith("updates")) val status: Flow + val updates: Flow + val statusDeprecated: StateFlow fun reset() + + fun getStatus(): UpdateUpdates } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt index 2dfb6cec..70a9e111 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateJob.kt @@ -1,5 +1,6 @@ package suwayomi.tachidesk.manga.impl.update +import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass enum class JobStatus { @@ -14,3 +15,13 @@ data class UpdateJob( val manga: MangaDataClass, val status: JobStatus = JobStatus.PENDING, ) + +enum class CategoryUpdateStatus { + UPDATING, + SKIPPED, +} + +data class CategoryUpdateJob( + val category: CategoryDataClass, + val status: CategoryUpdateStatus, +) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt index 292ee2d0..8dbe6942 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdateStatus.kt @@ -4,11 +4,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass -enum class CategoryUpdateStatus { - UPDATING, - SKIPPED, -} - data class UpdateStatus( val categoryStatusMap: Map> = emptyMap(), val mangaStatusMap: Map> = emptyMap(), @@ -33,3 +28,14 @@ data class UpdateStatus( numberOfJobs = jobs.size, ) } + +data class UpdateUpdates( + val isRunning: Boolean = false, + val categoryUpdates: List, + val mangaUpdates: List, + val totalJobs: Int, + val finishedJobs: Int, + val skippedCategoriesCount: Int, + val skippedMangasCount: Int, + val initial: UpdateUpdates?, +) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index af220b5b..200160c9 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -54,8 +54,14 @@ class Updater : IUpdater { private val notifyFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + @Deprecated("Replaced with updatesFlow", replaceWith = ReplaceWith("updatesFlow")) private val statusFlow = MutableSharedFlow() - override val status = statusFlow.onStart { emit(getStatus()) } + + @Deprecated("Replaced with updates", replaceWith = ReplaceWith("updates")) + override val status = statusFlow.onStart { emit(getStatusDeprecated(null)) } + + private val updatesFlow = MutableSharedFlow() + override val updates = updatesFlow.onStart { emit(getUpdates(addInitial = true)) } init { // has to be in its own scope (notifyFlowScope), otherwise, the collection gets canceled due to canceling the scopes (scope) children in the reset function @@ -69,8 +75,11 @@ class Updater : IUpdater { private val _status = MutableStateFlow(UpdateStatus()) override val statusDeprecated = _status.asStateFlow() + private val mangaUpdates = ConcurrentHashMap() + private val categoryUpdates = ConcurrentHashMap() private var updateStatusCategories: Map> = emptyMap() private var updateStatusSkippedMangas: List = emptyList() + private val tracker = ConcurrentHashMap() private val updateChannels = ConcurrentHashMap>() @@ -112,7 +121,7 @@ class Updater : IUpdater { val lastAutomatedUpdate = preferences.getLong(lastAutomatedUpdateKey, 0) preferences.edit().putLong(lastAutomatedUpdateKey, System.currentTimeMillis()).apply() - if (getStatus().running) { + if (getStatus().isRunning) { logger.debug { "Global update is already in progress" } return } @@ -157,28 +166,78 @@ class Updater : IUpdater { HAScheduler.schedule(::autoUpdateTask, updateInterval, timeToNextExecution, "global-update") } - private fun getStatus(running: Boolean? = null): UpdateStatus { + private fun isRunning(): Boolean = + tracker.values.toList().any { job -> job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING } + + // old status that is still required for the deprecated endpoints + private fun getStatusDeprecated(running: Boolean? = null): UpdateStatus { val jobs = tracker.values.toList() - val isRunning = - running - ?: jobs.any { job -> - job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING - } + val isRunning = running ?: isRunning() return UpdateStatus(this.updateStatusCategories, jobs, this.updateStatusSkippedMangas, isRunning) } + private fun getStatus( + categories: List, + mangas: List, + running: Boolean? = null, + addInitial: Boolean? = false, + ): UpdateUpdates = + UpdateUpdates( + running ?: isRunning(), + categories, + mangas, + tracker.size, + tracker.values.count { it.status == JobStatus.COMPLETE || it.status == JobStatus.FAILED }, + this.updateStatusCategories[CategoryUpdateStatus.SKIPPED]?.size ?: 0, + this.updateStatusSkippedMangas.size, + if (addInitial == true) getStatus() else null, + ) + + override fun getStatus(): UpdateUpdates = + getStatus( + this.updateStatusCategories[CategoryUpdateStatus.UPDATING] + ?.map { + CategoryUpdateJob( + it, + CategoryUpdateStatus.UPDATING, + ) + }.orEmpty(), + tracker.values.toList(), + ) + + private fun getUpdates( + running: Boolean? = null, + addInitial: Boolean? = null, + ): UpdateUpdates = + getStatus( + categoryUpdates.values.toList(), + mangaUpdates.values.toList(), + running, + addInitial = addInitial, + ) + /** * Pass "isRunning" to force a specific running state */ private suspend fun updateStatus( immediate: Boolean = false, + categoryUpdates: List = emptyList(), + mangaUpdates: List = emptyList(), isRunning: Boolean? = null, ) { + mangaUpdates.forEach { this.mangaUpdates[it.manga.id] = it } + categoryUpdates.forEach { this.categoryUpdates[it.category.id] = it } + if (immediate) { - val status = getStatus(running = isRunning) + val status = getStatusDeprecated(running = isRunning) + val updates = getUpdates(isRunning) + + this.mangaUpdates.clear() + this.categoryUpdates.clear() statusFlow.emit(status) _status.update { status } + updatesFlow.emit(updates) return } @@ -217,18 +276,15 @@ class Updater : IUpdater { } // fail all updates for source - tracker - .filter { (_, job) -> !isFailedSourceUpdate(job) } - .forEach { (mangaId, job) -> - tracker[mangaId] = job.copy(status = JobStatus.FAILED) - } + val sourceUpdateJobs = tracker.filter { (_, job) -> !isFailedSourceUpdate(job) } + sourceUpdateJobs.forEach { (mangaId, job) -> tracker[mangaId] = job.copy(status = JobStatus.FAILED) } - updateStatus() + updateStatus(mangaUpdates = sourceUpdateJobs.values.toList()) } private suspend fun process(job: UpdateJob) { tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) - updateStatus() + updateStatus(mangaUpdates = listOf(tracker[job.manga.id]!!)) tracker[job.manga.id] = try { @@ -248,7 +304,7 @@ class Updater : IUpdater { // in case this is the last update job, the running flag has to be true, before it gets set to false, to be able // to properly clear the dataloader store in UpdateType - updateStatus(immediate = wasLastJob, isRunning = true) + updateStatus(immediate = wasLastJob, isRunning = true, mangaUpdates = listOf(tracker[job.manga.id]!!)) if (wasLastJob) { updateStatus(isRunning = false) @@ -328,6 +384,18 @@ class Updater : IUpdater { return } + scope.launch { + updateStatus( + categoryUpdates = + updateStatusCategories[CategoryUpdateStatus.UPDATING] + ?.map { + CategoryUpdateJob(it, CategoryUpdateStatus.UPDATING) + }.orEmpty(), + mangaUpdates = mangasToUpdate.map { UpdateJob(it) }, + isRunning = true, + ) + } + addMangasToQueue( mangasToUpdate .sortedWith(compareBy(String.CASE_INSENSITIVE_ORDER, MangaDataClass::title)), @@ -350,12 +418,17 @@ class Updater : IUpdater { override fun reset() { scope.coroutineContext.cancelChildren() + tracker.clear() + this.mangaUpdates.clear() + this.categoryUpdates.clear() this.updateStatusCategories = emptyMap() this.updateStatusSkippedMangas = emptyList() + scope.launch { updateStatus(immediate = true, isRunning = false) } + updateChannels.forEach { (_, channel) -> channel.cancel() } updateChannels.clear() }