diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index f591cb53..c7ccf5d3 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch @@ -127,43 +128,71 @@ class Updater : IUpdater { */ private fun updateStatus( jobs: List, - running: Boolean, + running: Boolean? = null, categories: Map>? = null, skippedMangas: List? = null, ) { + val isRunning = + running + ?: jobs.any { job -> + job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING + } val updateStatusCategories = categories ?: _status.value.categoryStatusMap val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList() - _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, running) } + _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, isRunning) } } private fun getOrCreateUpdateChannelFor(source: String): Channel { return updateChannels.getOrPut(source) { logger.debug { "getOrCreateUpdateChannelFor: created channel for $source - channels: ${updateChannels.size + 1}" } - createUpdateChannel() + createUpdateChannel(source) } } - private fun createUpdateChannel(): Channel { + private fun createUpdateChannel(source: String): Channel { val channel = Channel(Channel.UNLIMITED) channel.consumeAsFlow() .onEach { job -> semaphore.withPermit { - updateStatus( - process(job), - tracker.any { (_, job) -> - job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING - }, - ) + process(job) } } - .catch { logger.error(it) { "Error during updates" } } + .catch { + logger.error(it) { "Error during updates (source: $source)" } + handleChannelUpdateFailure(source) + } + .onCompletion { updateChannels.remove(source) } .launchIn(scope) return channel } - private suspend fun process(job: UpdateJob): List { + private fun handleChannelUpdateFailure(source: String) { + val isFailedSourceUpdate = { job: UpdateJob -> + val isForSource = job.manga.sourceId == source + val hasFailed = job.status == JobStatus.FAILED + + isForSource && hasFailed + } + + // fail all updates for source + tracker + .filter { (_, job) -> !isFailedSourceUpdate(job) } + .forEach { (mangaId, job) -> + tracker[mangaId] = job.copy(status = JobStatus.FAILED) + } + + updateStatus( + tracker.values.toList(), + tracker.any { (_, job) -> + job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING + }, + ) + } + + private suspend fun process(job: UpdateJob) { tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) updateStatus(tracker.values.toList(), true) + tracker[job.manga.id] = try { logger.info { "Updating \"${job.manga.title}\" (source: ${job.manga.sourceId})" } @@ -173,11 +202,12 @@ class Updater : IUpdater { Chapter.getChapterList(job.manga.id, true) job.copy(status = JobStatus.COMPLETE) } catch (e: Exception) { - if (e is CancellationException) throw e logger.error(e) { "Error while updating ${job.manga.title}" } + if (e is CancellationException) throw e job.copy(status = JobStatus.FAILED) } - return tracker.values.toList() + + updateStatus(tracker.values.toList()) } override fun addCategoriesToUpdateQueue(