Fix/updater update stuck in running status after failure (#731)

* Move running check to update function

* Move updating update status to process function

* Fail all source updates in case of update channel failure

In case the channel failed due to an exception, the update for the source failed completely.
This however was never handled and the pending updates for the source were never set to failed.
Due to this, the global updates running state was always true

* Remove completed update channel from available channels

* Always log specific update job failure
This commit is contained in:
schroda
2023-10-31 00:46:54 +01:00
committed by GitHub
parent 17bc2d2331
commit 9d2b098837

View File

@@ -14,6 +14,7 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@@ -127,43 +128,71 @@ class Updater : IUpdater {
*/ */
private fun updateStatus( private fun updateStatus(
jobs: List<UpdateJob>, jobs: List<UpdateJob>,
running: Boolean, running: Boolean? = null,
categories: Map<CategoryUpdateStatus, List<CategoryDataClass>>? = null, categories: Map<CategoryUpdateStatus, List<CategoryDataClass>>? = null,
skippedMangas: List<MangaDataClass>? = null, skippedMangas: List<MangaDataClass>? = null,
) { ) {
val isRunning =
running
?: jobs.any { job ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
}
val updateStatusCategories = categories ?: _status.value.categoryStatusMap val updateStatusCategories = categories ?: _status.value.categoryStatusMap
val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList() val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList()
_status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, running) } _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, isRunning) }
} }
private fun getOrCreateUpdateChannelFor(source: String): Channel<UpdateJob> { private fun getOrCreateUpdateChannelFor(source: String): Channel<UpdateJob> {
return updateChannels.getOrPut(source) { return updateChannels.getOrPut(source) {
logger.debug { "getOrCreateUpdateChannelFor: created channel for $source - channels: ${updateChannels.size + 1}" } logger.debug { "getOrCreateUpdateChannelFor: created channel for $source - channels: ${updateChannels.size + 1}" }
createUpdateChannel() createUpdateChannel(source)
} }
} }
private fun createUpdateChannel(): Channel<UpdateJob> { private fun createUpdateChannel(source: String): Channel<UpdateJob> {
val channel = Channel<UpdateJob>(Channel.UNLIMITED) val channel = Channel<UpdateJob>(Channel.UNLIMITED)
channel.consumeAsFlow() channel.consumeAsFlow()
.onEach { job -> .onEach { job ->
semaphore.withPermit { semaphore.withPermit {
updateStatus( process(job)
process(job),
tracker.any { (_, job) ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
},
)
} }
} }
.catch { logger.error(it) { "Error during updates" } } .catch {
logger.error(it) { "Error during updates (source: $source)" }
handleChannelUpdateFailure(source)
}
.onCompletion { updateChannels.remove(source) }
.launchIn(scope) .launchIn(scope)
return channel return channel
} }
private suspend fun process(job: UpdateJob): List<UpdateJob> { private fun handleChannelUpdateFailure(source: String) {
val isFailedSourceUpdate = { job: UpdateJob ->
val isForSource = job.manga.sourceId == source
val hasFailed = job.status == JobStatus.FAILED
isForSource && hasFailed
}
// fail all updates for source
tracker
.filter { (_, job) -> !isFailedSourceUpdate(job) }
.forEach { (mangaId, job) ->
tracker[mangaId] = job.copy(status = JobStatus.FAILED)
}
updateStatus(
tracker.values.toList(),
tracker.any { (_, job) ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
},
)
}
private suspend fun process(job: UpdateJob) {
tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING)
updateStatus(tracker.values.toList(), true) updateStatus(tracker.values.toList(), true)
tracker[job.manga.id] = tracker[job.manga.id] =
try { try {
logger.info { "Updating \"${job.manga.title}\" (source: ${job.manga.sourceId})" } logger.info { "Updating \"${job.manga.title}\" (source: ${job.manga.sourceId})" }
@@ -173,11 +202,12 @@ class Updater : IUpdater {
Chapter.getChapterList(job.manga.id, true) Chapter.getChapterList(job.manga.id, true)
job.copy(status = JobStatus.COMPLETE) job.copy(status = JobStatus.COMPLETE)
} catch (e: Exception) { } catch (e: Exception) {
if (e is CancellationException) throw e
logger.error(e) { "Error while updating ${job.manga.title}" } logger.error(e) { "Error while updating ${job.manga.title}" }
if (e is CancellationException) throw e
job.copy(status = JobStatus.FAILED) job.copy(status = JobStatus.FAILED)
} }
return tracker.values.toList()
updateStatus(tracker.values.toList())
} }
override fun addCategoriesToUpdateQueue( override fun addCategoriesToUpdateQueue(