From 9d2b098837cab10b3ff28bc02b3f00d30064656c Mon Sep 17 00:00:00 2001 From: schroda <50052685+schroda@users.noreply.github.com> Date: Tue, 31 Oct 2023 00:46:54 +0100 Subject: [PATCH] Fix/updater update stuck in running status after failure (#731) * Move running check to update function * Move updating update status to process function * Fail all source updates in case of update channel failure In case the channel failed due to an exception, the update for the source failed completely. This however was never handled and the pending updates for the source were never set to failed. Due to this, the global updates running state was always true * Remove completed update channel from available channels * Always log specific update job failure --- .../tachidesk/manga/impl/update/Updater.kt | 58 ++++++++++++++----- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index f591cb53..c7ccf5d3 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch @@ -127,43 +128,71 @@ class Updater : IUpdater { */ private fun updateStatus( jobs: List, - running: Boolean, + running: Boolean? = null, categories: Map>? = null, skippedMangas: List? = null, ) { + val isRunning = + running + ?: jobs.any { job -> + job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING + } val updateStatusCategories = categories ?: _status.value.categoryStatusMap val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList() - _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, running) } + _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, isRunning) } } private fun getOrCreateUpdateChannelFor(source: String): Channel { return updateChannels.getOrPut(source) { logger.debug { "getOrCreateUpdateChannelFor: created channel for $source - channels: ${updateChannels.size + 1}" } - createUpdateChannel() + createUpdateChannel(source) } } - private fun createUpdateChannel(): Channel { + private fun createUpdateChannel(source: String): Channel { val channel = Channel(Channel.UNLIMITED) channel.consumeAsFlow() .onEach { job -> semaphore.withPermit { - updateStatus( - process(job), - tracker.any { (_, job) -> - job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING - }, - ) + process(job) } } - .catch { logger.error(it) { "Error during updates" } } + .catch { + logger.error(it) { "Error during updates (source: $source)" } + handleChannelUpdateFailure(source) + } + .onCompletion { updateChannels.remove(source) } .launchIn(scope) return channel } - private suspend fun process(job: UpdateJob): List { + private fun handleChannelUpdateFailure(source: String) { + val isFailedSourceUpdate = { job: UpdateJob -> + val isForSource = job.manga.sourceId == source + val hasFailed = job.status == JobStatus.FAILED + + isForSource && hasFailed + } + + // fail all updates for source + tracker + .filter { (_, job) -> !isFailedSourceUpdate(job) } + .forEach { (mangaId, job) -> + tracker[mangaId] = job.copy(status = JobStatus.FAILED) + } + + updateStatus( + tracker.values.toList(), + tracker.any { (_, job) -> + job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING + }, + ) + } + + private suspend fun process(job: UpdateJob) { tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) updateStatus(tracker.values.toList(), true) + tracker[job.manga.id] = try { logger.info { "Updating \"${job.manga.title}\" (source: ${job.manga.sourceId})" } @@ -173,11 +202,12 @@ class Updater : IUpdater { Chapter.getChapterList(job.manga.id, true) job.copy(status = JobStatus.COMPLETE) } catch (e: Exception) { - if (e is CancellationException) throw e logger.error(e) { "Error while updating ${job.manga.title}" } + if (e is CancellationException) throw e job.copy(status = JobStatus.FAILED) } - return tracker.values.toList() + + updateStatus(tracker.values.toList()) } override fun addCategoriesToUpdateQueue(