From 6d539d34040c4e95692b57ce4fedfbeaa73083d0 Mon Sep 17 00:00:00 2001 From: schroda <50052685+schroda@users.noreply.github.com> Date: Sun, 31 Mar 2024 19:19:49 +0200 Subject: [PATCH] Fix/update subscription clear data loader cache (#908) * Set updater running flag to false only at the end of the update For clearing the data loader cache properly, the update status subscription requires the update to be running. For the last completed manga update the flag was immediately set to false which prevented the dataloader cache from getting cleared, returning outdated data for the last updated manga * Correctly clear the "MangaForIdsDataLoader" cache The cache keys for this dataloader are lists of manga ids. Thus, it is not possible to clear only the cached data of the provided manga id and instead each cache entry that includes the manga id has to be cleared * Ensure that manga dataloader caches gets cleared during global update The "StateFlow" drops value updates in case the collector is too slow, which was the case for the "UpdateSubscription". This caused the dataloader cache to not get properly cleared because the running state of the update was already set to false. --- .../tachidesk/graphql/cache/CustomCacheMap.kt | 46 ++++++++ .../graphql/dataLoaders/MangaDataLoader.kt | 29 +++-- .../graphql/mutations/UpdateMutation.kt | 32 ++++-- .../tachidesk/graphql/queries/UpdateQuery.kt | 7 +- .../tachidesk/graphql/types/MangaType.kt | 16 ++- .../manga/controller/UpdateController.kt | 2 +- .../tachidesk/manga/impl/update/IUpdater.kt | 5 +- .../tachidesk/manga/impl/update/Updater.kt | 103 +++++++++++++----- .../manga/impl/update/UpdaterSocket.kt | 4 +- 9 files changed, 190 insertions(+), 54 deletions(-) create mode 100644 server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt new file mode 100644 index 00000000..a6660f6c --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt @@ -0,0 +1,46 @@ +package suwayomi.tachidesk.graphql.cache + +import org.dataloader.CacheMap +import java.util.concurrent.CompletableFuture + +class CustomCacheMap : CacheMap { + private val cache: MutableMap> + + init { + cache = HashMap() + } + + override fun containsKey(key: K): Boolean { + return cache.containsKey(key) + } + + override fun get(key: K): CompletableFuture { + return cache[key]!! + } + + fun getKeys(): Collection { + return cache.keys.toSet() + } + + override fun getAll(): Collection> { + return cache.values + } + + override fun set( + key: K, + value: CompletableFuture, + ): CacheMap { + cache[key] = value + return this + } + + override fun delete(key: K): CacheMap { + cache.remove(key) + return this + } + + override fun clear(): CacheMap { + cache.clear() + return this + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt index fe4cb199..6cdbe455 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt @@ -10,11 +10,13 @@ package suwayomi.tachidesk.graphql.dataLoaders import com.expediagroup.graphql.dataloader.KotlinDataLoader import org.dataloader.DataLoader import org.dataloader.DataLoaderFactory +import org.dataloader.DataLoaderOptions import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger import org.jetbrains.exposed.sql.addLogger import org.jetbrains.exposed.sql.andWhere import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction +import suwayomi.tachidesk.graphql.cache.CustomCacheMap import suwayomi.tachidesk.graphql.types.MangaNodeList import suwayomi.tachidesk.graphql.types.MangaNodeList.Companion.toNodeList import suwayomi.tachidesk.graphql.types.MangaType @@ -95,18 +97,21 @@ class MangaForIdsDataLoader : KotlinDataLoader, MangaNodeList> { override val dataLoaderName = "MangaForIdsDataLoader" override fun getDataLoader(): DataLoader, MangaNodeList> = - DataLoaderFactory.newDataLoader { mangaIds -> - future { - transaction { - addLogger(Slf4jSqlDebugLogger) - val ids = mangaIds.flatten().distinct() - val manga = - MangaTable.select { MangaTable.id inList ids } - .map { MangaType(it) } - mangaIds.map { mangaIds -> - manga.filter { it.id in mangaIds }.toNodeList() + DataLoaderFactory.newDataLoader( + { mangaIds -> + future { + transaction { + addLogger(Slf4jSqlDebugLogger) + val ids = mangaIds.flatten().distinct() + val manga = + MangaTable.select { MangaTable.id inList ids } + .map { MangaType(it) } + mangaIds.map { mangaIds -> + manga.filter { it.id in mangaIds }.toNodeList() + } } } - } - } + }, + DataLoaderOptions.newOptions().setCacheMap(CustomCacheMap, MangaNodeList>()), + ) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt index b7ce3eaa..10fed203 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt @@ -1,5 +1,7 @@ package suwayomi.tachidesk.graphql.mutations +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.withTimeout import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction import org.kodein.di.DI @@ -10,6 +12,9 @@ import suwayomi.tachidesk.manga.impl.Category import suwayomi.tachidesk.manga.impl.update.IUpdater import suwayomi.tachidesk.manga.model.table.CategoryTable import suwayomi.tachidesk.manga.model.table.toDataClass +import suwayomi.tachidesk.server.JavalinSetup.future +import java.util.concurrent.CompletableFuture +import kotlin.time.Duration.Companion.seconds class UpdateMutation { private val updater by DI.global.instance() @@ -23,14 +28,22 @@ class UpdateMutation { val updateStatus: UpdateStatus, ) - fun updateLibraryManga(input: UpdateLibraryMangaInput): UpdateLibraryMangaPayload { + fun updateLibraryManga(input: UpdateLibraryMangaInput): CompletableFuture { updater.addCategoriesToUpdateQueue( Category.getCategoryList(), clear = true, forceAll = false, ) - return UpdateLibraryMangaPayload(input.clientMutationId, UpdateStatus(updater.status.value)) + return future { + UpdateLibraryMangaPayload( + input.clientMutationId, + updateStatus = + withTimeout(30.seconds) { + UpdateStatus(updater.status.first()) + }, + ) + } } data class UpdateCategoryMangaInput( @@ -43,7 +56,7 @@ class UpdateMutation { val updateStatus: UpdateStatus, ) - fun updateCategoryManga(input: UpdateCategoryMangaInput): UpdateCategoryMangaPayload { + fun updateCategoryManga(input: UpdateCategoryMangaInput): CompletableFuture { val categories = transaction { CategoryTable.select { CategoryTable.id inList input.categories }.map { @@ -52,10 +65,15 @@ class UpdateMutation { } updater.addCategoriesToUpdateQueue(categories, clear = true, forceAll = true) - return UpdateCategoryMangaPayload( - clientMutationId = input.clientMutationId, - updateStatus = UpdateStatus(updater.status.value), - ) + return future { + UpdateCategoryMangaPayload( + input.clientMutationId, + updateStatus = + withTimeout(30.seconds) { + UpdateStatus(updater.status.first()) + }, + ) + } } data class UpdateStopInput( diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt index 344c054b..59a7516e 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt @@ -1,16 +1,19 @@ package suwayomi.tachidesk.graphql.queries +import kotlinx.coroutines.flow.first import org.kodein.di.DI import org.kodein.di.conf.global import org.kodein.di.instance import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.manga.impl.update.IUpdater +import suwayomi.tachidesk.server.JavalinSetup.future +import java.util.concurrent.CompletableFuture class UpdateQuery { private val updater by DI.global.instance() - fun updateStatus(): UpdateStatus { - return UpdateStatus(updater.status.value) + fun updateStatus(): CompletableFuture { + return future { UpdateStatus(updater.status.first()) } } data class LastUpdateTimestampPayload(val timestamp: Long) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt index 84e7b237..bc1c149b 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt @@ -11,6 +11,7 @@ import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import eu.kanade.tachiyomi.source.model.UpdateStrategy import graphql.schema.DataFetchingEnvironment import org.jetbrains.exposed.sql.ResultRow +import suwayomi.tachidesk.graphql.cache.CustomCacheMap import suwayomi.tachidesk.graphql.server.primitives.Cursor import suwayomi.tachidesk.graphql.server.primitives.Edge import suwayomi.tachidesk.graphql.server.primitives.Node @@ -45,12 +46,25 @@ class MangaType( var chaptersLastFetchedAt: Long?, // todo ) : Node { companion object { + fun clearCacheFor( + mangaIds: List, + dataFetchingEnvironment: DataFetchingEnvironment, + ) { + mangaIds.forEach { clearCacheFor(it, dataFetchingEnvironment) } + } + fun clearCacheFor( mangaId: Int, dataFetchingEnvironment: DataFetchingEnvironment, ) { dataFetchingEnvironment.getDataLoader("MangaDataLoader").clear(mangaId) - dataFetchingEnvironment.getDataLoader("MangaForIdsDataLoader").clear(mangaId) + + val mangaForIdsDataLoader = + dataFetchingEnvironment.getDataLoader, MangaNodeList>("MangaForIdsDataLoader") + @Suppress("UNCHECKED_CAST") + (mangaForIdsDataLoader.cacheMap as CustomCacheMap, MangaNodeList>).getKeys() + .filter { it.contains(mangaId) }.forEach { mangaForIdsDataLoader.clear(it) } + dataFetchingEnvironment.getDataLoader("DownloadedChapterCountForMangaDataLoader").clear(mangaId) dataFetchingEnvironment.getDataLoader("UnreadChapterCountForMangaDataLoader").clear(mangaId) dataFetchingEnvironment.getDataLoader("LastReadChapterForMangaDataLoader").clear(mangaId) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt index 7f762b09..5e2a74fb 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt @@ -117,7 +117,7 @@ object UpdateController { }, behaviorOf = { ctx -> val updater by DI.global.instance() - ctx.json(updater.status.value) + ctx.json(updater.statusDeprecated.value) }, withResults = { json(HttpCode.OK) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt index 0670bfd8..09ac70bb 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt @@ -1,5 +1,6 @@ package suwayomi.tachidesk.manga.impl.update +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.StateFlow import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass @@ -12,7 +13,9 @@ interface IUpdater { forceAll: Boolean, ) - val status: StateFlow + val status: Flow + + val statusDeprecated: StateFlow fun reset() } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index bae5c3fa..55aeeb78 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -6,9 +6,12 @@ import eu.kanade.tachiyomi.source.model.UpdateStrategy import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch @@ -16,6 +19,8 @@ import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.sample import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore @@ -37,14 +42,33 @@ import java.util.Date import java.util.concurrent.ConcurrentHashMap import kotlin.math.absoluteValue import kotlin.time.Duration.Companion.hours +import kotlin.time.Duration.Companion.seconds +@OptIn(FlowPreview::class) class Updater : IUpdater { private val logger = KotlinLogging.logger {} + private val notifyFlowScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - private val _status = MutableStateFlow(UpdateStatus()) - override val status = _status.asStateFlow() + private val notifyFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val statusFlow = MutableSharedFlow() + override val status = statusFlow.onStart { emit(getStatus()) } + + init { + // has to be in its own scope (notifyFlowScope), otherwise, the collection gets canceled due to canceling the scopes (scope) children in the reset function + notifyFlowScope.launch { + notifyFlow.sample(1.seconds).collect { + updateStatus(immediate = true) + } + } + } + + private val _status = MutableStateFlow(UpdateStatus()) + override val statusDeprecated = _status.asStateFlow() + + private var updateStatusCategories: Map> = emptyMap() + private var updateStatusSkippedMangas: List = emptyList() private val tracker = ConcurrentHashMap() private val updateChannels = ConcurrentHashMap>() @@ -87,7 +111,7 @@ class Updater : IUpdater { val lastAutomatedUpdate = preferences.getLong(lastAutomatedUpdateKey, 0) preferences.edit().putLong(lastAutomatedUpdateKey, System.currentTimeMillis()).apply() - if (status.value.running) { + if (getStatus().running) { logger.debug { "Global update is already in progress" } return } @@ -123,23 +147,33 @@ class Updater : IUpdater { HAScheduler.schedule(::autoUpdateTask, updateInterval, timeToNextExecution, "global-update") } - /** - * Updates the status and sustains the "skippedMangas" - */ - private fun updateStatus( - jobs: List, - running: Boolean? = null, - categories: Map>? = null, - skippedMangas: List? = null, - ) { + private fun getStatus(running: Boolean? = null): UpdateStatus { + val jobs = tracker.values.toList() val isRunning = running ?: jobs.any { job -> job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING } - val updateStatusCategories = categories ?: _status.value.categoryStatusMap - val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList() - _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, isRunning) } + return UpdateStatus(this.updateStatusCategories, jobs, this.updateStatusSkippedMangas, isRunning) + } + + /** + * Pass "isRunning" to force a specific running state + */ + private suspend fun updateStatus( + immediate: Boolean = false, + isRunning: Boolean? = null, + ) { + if (immediate) { + val status = getStatus(running = isRunning) + + statusFlow.emit(status) + _status.update { status } + + return + } + + notifyFlow.emit(Unit) } private fun getOrCreateUpdateChannelFor(source: String): Channel { @@ -166,7 +200,7 @@ class Updater : IUpdater { return channel } - private fun handleChannelUpdateFailure(source: String) { + private suspend fun handleChannelUpdateFailure(source: String) { val isFailedSourceUpdate = { job: UpdateJob -> val isForSource = job.manga.sourceId == source val hasFailed = job.status == JobStatus.FAILED @@ -181,17 +215,12 @@ class Updater : IUpdater { tracker[mangaId] = job.copy(status = JobStatus.FAILED) } - updateStatus( - tracker.values.toList(), - tracker.any { (_, job) -> - job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING - }, - ) + updateStatus() } private suspend fun process(job: UpdateJob) { tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) - updateStatus(tracker.values.toList(), true) + updateStatus() tracker[job.manga.id] = try { @@ -207,7 +236,15 @@ class Updater : IUpdater { job.copy(status = JobStatus.FAILED) } - updateStatus(tracker.values.toList()) + val wasLastJob = tracker.values.none { it.status == JobStatus.PENDING || it.status == JobStatus.RUNNING } + + // in case this is the last update job, the running flag has to be true, before it gets set to false, to be able + // to properly clear the dataloader store in UpdateType + updateStatus(immediate = wasLastJob, isRunning = true) + + if (wasLastJob) { + updateStatus(isRunning = false) + } } override fun addCategoriesToUpdateQueue( @@ -274,10 +311,15 @@ class Updater : IUpdater { .toList() val skippedMangas = categoriesToUpdateMangas.subtract(mangasToUpdate.toSet()).toList() - // In case no manga gets updated and no update job was running before, the client would never receive an info about its update request - updateStatus(emptyList(), mangasToUpdate.isNotEmpty(), updateStatusCategories, skippedMangas) + this.updateStatusCategories = updateStatusCategories + this.updateStatusSkippedMangas = skippedMangas if (mangasToUpdate.isEmpty()) { + // In case no manga gets updated and no update job was running before, the client would never receive an info + // about its update request + scope.launch { + updateStatus(immediate = true) + } return } @@ -288,8 +330,9 @@ class Updater : IUpdater { } private fun addMangasToQueue(mangasToUpdate: List) { + // create all manga update jobs before adding them to the queue so that the client is able to calculate the + // progress properly right form the start mangasToUpdate.forEach { tracker[it.id] = UpdateJob(it) } - updateStatus(tracker.values.toList(), mangasToUpdate.isNotEmpty()) mangasToUpdate.forEach { addMangaToQueue(it) } } @@ -303,7 +346,11 @@ class Updater : IUpdater { override fun reset() { scope.coroutineContext.cancelChildren() tracker.clear() - updateStatus(emptyList(), false) + this.updateStatusCategories = emptyMap() + this.updateStatusSkippedMangas = emptyList() + scope.launch { + updateStatus(immediate = true, isRunning = false) + } updateChannels.forEach { (_, channel) -> channel.cancel() } updateChannels.clear() } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt index e7cdf2fb..de8c5428 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt @@ -23,12 +23,12 @@ object UpdaterSocket : Websocket() { ctx: WsContext, value: UpdateStatus?, ) { - ctx.send(value ?: updater.status.value) + ctx.send(value ?: updater.statusDeprecated.value) } override fun handleRequest(ctx: WsMessageContext) { when (ctx.message()) { - "STATUS" -> notifyClient(ctx, updater.status.value) + "STATUS" -> notifyClient(ctx, updater.statusDeprecated.value) else -> ctx.send( """