diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt new file mode 100644 index 00000000..a6660f6c --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/cache/CustomCacheMap.kt @@ -0,0 +1,46 @@ +package suwayomi.tachidesk.graphql.cache + +import org.dataloader.CacheMap +import java.util.concurrent.CompletableFuture + +class CustomCacheMap : CacheMap { + private val cache: MutableMap> + + init { + cache = HashMap() + } + + override fun containsKey(key: K): Boolean { + return cache.containsKey(key) + } + + override fun get(key: K): CompletableFuture { + return cache[key]!! + } + + fun getKeys(): Collection { + return cache.keys.toSet() + } + + override fun getAll(): Collection> { + return cache.values + } + + override fun set( + key: K, + value: CompletableFuture, + ): CacheMap { + cache[key] = value + return this + } + + override fun delete(key: K): CacheMap { + cache.remove(key) + return this + } + + override fun clear(): CacheMap { + cache.clear() + return this + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt index fe4cb199..6cdbe455 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/MangaDataLoader.kt @@ -10,11 +10,13 @@ package suwayomi.tachidesk.graphql.dataLoaders import com.expediagroup.graphql.dataloader.KotlinDataLoader import org.dataloader.DataLoader import org.dataloader.DataLoaderFactory +import org.dataloader.DataLoaderOptions import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger import org.jetbrains.exposed.sql.addLogger import org.jetbrains.exposed.sql.andWhere import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction +import suwayomi.tachidesk.graphql.cache.CustomCacheMap import suwayomi.tachidesk.graphql.types.MangaNodeList import suwayomi.tachidesk.graphql.types.MangaNodeList.Companion.toNodeList import suwayomi.tachidesk.graphql.types.MangaType @@ -95,18 +97,21 @@ class MangaForIdsDataLoader : KotlinDataLoader, MangaNodeList> { override val dataLoaderName = "MangaForIdsDataLoader" override fun getDataLoader(): DataLoader, MangaNodeList> = - DataLoaderFactory.newDataLoader { mangaIds -> - future { - transaction { - addLogger(Slf4jSqlDebugLogger) - val ids = mangaIds.flatten().distinct() - val manga = - MangaTable.select { MangaTable.id inList ids } - .map { MangaType(it) } - mangaIds.map { mangaIds -> - manga.filter { it.id in mangaIds }.toNodeList() + DataLoaderFactory.newDataLoader( + { mangaIds -> + future { + transaction { + addLogger(Slf4jSqlDebugLogger) + val ids = mangaIds.flatten().distinct() + val manga = + MangaTable.select { MangaTable.id inList ids } + .map { MangaType(it) } + mangaIds.map { mangaIds -> + manga.filter { it.id in mangaIds }.toNodeList() + } } } - } - } + }, + DataLoaderOptions.newOptions().setCacheMap(CustomCacheMap, MangaNodeList>()), + ) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt index b7ce3eaa..10fed203 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/mutations/UpdateMutation.kt @@ -1,5 +1,7 @@ package suwayomi.tachidesk.graphql.mutations +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.withTimeout import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction import org.kodein.di.DI @@ -10,6 +12,9 @@ import suwayomi.tachidesk.manga.impl.Category import suwayomi.tachidesk.manga.impl.update.IUpdater import suwayomi.tachidesk.manga.model.table.CategoryTable import suwayomi.tachidesk.manga.model.table.toDataClass +import suwayomi.tachidesk.server.JavalinSetup.future +import java.util.concurrent.CompletableFuture +import kotlin.time.Duration.Companion.seconds class UpdateMutation { private val updater by DI.global.instance() @@ -23,14 +28,22 @@ class UpdateMutation { val updateStatus: UpdateStatus, ) - fun updateLibraryManga(input: UpdateLibraryMangaInput): UpdateLibraryMangaPayload { + fun updateLibraryManga(input: UpdateLibraryMangaInput): CompletableFuture { updater.addCategoriesToUpdateQueue( Category.getCategoryList(), clear = true, forceAll = false, ) - return UpdateLibraryMangaPayload(input.clientMutationId, UpdateStatus(updater.status.value)) + return future { + UpdateLibraryMangaPayload( + input.clientMutationId, + updateStatus = + withTimeout(30.seconds) { + UpdateStatus(updater.status.first()) + }, + ) + } } data class UpdateCategoryMangaInput( @@ -43,7 +56,7 @@ class UpdateMutation { val updateStatus: UpdateStatus, ) - fun updateCategoryManga(input: UpdateCategoryMangaInput): UpdateCategoryMangaPayload { + fun updateCategoryManga(input: UpdateCategoryMangaInput): CompletableFuture { val categories = transaction { CategoryTable.select { CategoryTable.id inList input.categories }.map { @@ -52,10 +65,15 @@ class UpdateMutation { } updater.addCategoriesToUpdateQueue(categories, clear = true, forceAll = true) - return UpdateCategoryMangaPayload( - clientMutationId = input.clientMutationId, - updateStatus = UpdateStatus(updater.status.value), - ) + return future { + UpdateCategoryMangaPayload( + input.clientMutationId, + updateStatus = + withTimeout(30.seconds) { + UpdateStatus(updater.status.first()) + }, + ) + } } data class UpdateStopInput( diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt index 344c054b..59a7516e 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/queries/UpdateQuery.kt @@ -1,16 +1,19 @@ package suwayomi.tachidesk.graphql.queries +import kotlinx.coroutines.flow.first import org.kodein.di.DI import org.kodein.di.conf.global import org.kodein.di.instance import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.manga.impl.update.IUpdater +import suwayomi.tachidesk.server.JavalinSetup.future +import java.util.concurrent.CompletableFuture class UpdateQuery { private val updater by DI.global.instance() - fun updateStatus(): UpdateStatus { - return UpdateStatus(updater.status.value) + fun updateStatus(): CompletableFuture { + return future { UpdateStatus(updater.status.first()) } } data class LastUpdateTimestampPayload(val timestamp: Long) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt index 84e7b237..bc1c149b 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt @@ -11,6 +11,7 @@ import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import eu.kanade.tachiyomi.source.model.UpdateStrategy import graphql.schema.DataFetchingEnvironment import org.jetbrains.exposed.sql.ResultRow +import suwayomi.tachidesk.graphql.cache.CustomCacheMap import suwayomi.tachidesk.graphql.server.primitives.Cursor import suwayomi.tachidesk.graphql.server.primitives.Edge import suwayomi.tachidesk.graphql.server.primitives.Node @@ -45,12 +46,25 @@ class MangaType( var chaptersLastFetchedAt: Long?, // todo ) : Node { companion object { + fun clearCacheFor( + mangaIds: List, + dataFetchingEnvironment: DataFetchingEnvironment, + ) { + mangaIds.forEach { clearCacheFor(it, dataFetchingEnvironment) } + } + fun clearCacheFor( mangaId: Int, dataFetchingEnvironment: DataFetchingEnvironment, ) { dataFetchingEnvironment.getDataLoader("MangaDataLoader").clear(mangaId) - dataFetchingEnvironment.getDataLoader("MangaForIdsDataLoader").clear(mangaId) + + val mangaForIdsDataLoader = + dataFetchingEnvironment.getDataLoader, MangaNodeList>("MangaForIdsDataLoader") + @Suppress("UNCHECKED_CAST") + (mangaForIdsDataLoader.cacheMap as CustomCacheMap, MangaNodeList>).getKeys() + .filter { it.contains(mangaId) }.forEach { mangaForIdsDataLoader.clear(it) } + dataFetchingEnvironment.getDataLoader("DownloadedChapterCountForMangaDataLoader").clear(mangaId) dataFetchingEnvironment.getDataLoader("UnreadChapterCountForMangaDataLoader").clear(mangaId) dataFetchingEnvironment.getDataLoader("LastReadChapterForMangaDataLoader").clear(mangaId) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt index 7f762b09..5e2a74fb 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/controller/UpdateController.kt @@ -117,7 +117,7 @@ object UpdateController { }, behaviorOf = { ctx -> val updater by DI.global.instance() - ctx.json(updater.status.value) + ctx.json(updater.statusDeprecated.value) }, withResults = { json(HttpCode.OK) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt index 0670bfd8..09ac70bb 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/IUpdater.kt @@ -1,5 +1,6 @@ package suwayomi.tachidesk.manga.impl.update +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.StateFlow import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass @@ -12,7 +13,9 @@ interface IUpdater { forceAll: Boolean, ) - val status: StateFlow + val status: Flow + + val statusDeprecated: StateFlow fun reset() } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index bae5c3fa..55aeeb78 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -6,9 +6,12 @@ import eu.kanade.tachiyomi.source.model.UpdateStrategy import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.catch @@ -16,6 +19,8 @@ import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.sample import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore @@ -37,14 +42,33 @@ import java.util.Date import java.util.concurrent.ConcurrentHashMap import kotlin.math.absoluteValue import kotlin.time.Duration.Companion.hours +import kotlin.time.Duration.Companion.seconds +@OptIn(FlowPreview::class) class Updater : IUpdater { private val logger = KotlinLogging.logger {} + private val notifyFlowScope = CoroutineScope(SupervisorJob() + Dispatchers.Default) private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) - private val _status = MutableStateFlow(UpdateStatus()) - override val status = _status.asStateFlow() + private val notifyFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) + private val statusFlow = MutableSharedFlow() + override val status = statusFlow.onStart { emit(getStatus()) } + + init { + // has to be in its own scope (notifyFlowScope), otherwise, the collection gets canceled due to canceling the scopes (scope) children in the reset function + notifyFlowScope.launch { + notifyFlow.sample(1.seconds).collect { + updateStatus(immediate = true) + } + } + } + + private val _status = MutableStateFlow(UpdateStatus()) + override val statusDeprecated = _status.asStateFlow() + + private var updateStatusCategories: Map> = emptyMap() + private var updateStatusSkippedMangas: List = emptyList() private val tracker = ConcurrentHashMap() private val updateChannels = ConcurrentHashMap>() @@ -87,7 +111,7 @@ class Updater : IUpdater { val lastAutomatedUpdate = preferences.getLong(lastAutomatedUpdateKey, 0) preferences.edit().putLong(lastAutomatedUpdateKey, System.currentTimeMillis()).apply() - if (status.value.running) { + if (getStatus().running) { logger.debug { "Global update is already in progress" } return } @@ -123,23 +147,33 @@ class Updater : IUpdater { HAScheduler.schedule(::autoUpdateTask, updateInterval, timeToNextExecution, "global-update") } - /** - * Updates the status and sustains the "skippedMangas" - */ - private fun updateStatus( - jobs: List, - running: Boolean? = null, - categories: Map>? = null, - skippedMangas: List? = null, - ) { + private fun getStatus(running: Boolean? = null): UpdateStatus { + val jobs = tracker.values.toList() val isRunning = running ?: jobs.any { job -> job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING } - val updateStatusCategories = categories ?: _status.value.categoryStatusMap - val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList() - _status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, isRunning) } + return UpdateStatus(this.updateStatusCategories, jobs, this.updateStatusSkippedMangas, isRunning) + } + + /** + * Pass "isRunning" to force a specific running state + */ + private suspend fun updateStatus( + immediate: Boolean = false, + isRunning: Boolean? = null, + ) { + if (immediate) { + val status = getStatus(running = isRunning) + + statusFlow.emit(status) + _status.update { status } + + return + } + + notifyFlow.emit(Unit) } private fun getOrCreateUpdateChannelFor(source: String): Channel { @@ -166,7 +200,7 @@ class Updater : IUpdater { return channel } - private fun handleChannelUpdateFailure(source: String) { + private suspend fun handleChannelUpdateFailure(source: String) { val isFailedSourceUpdate = { job: UpdateJob -> val isForSource = job.manga.sourceId == source val hasFailed = job.status == JobStatus.FAILED @@ -181,17 +215,12 @@ class Updater : IUpdater { tracker[mangaId] = job.copy(status = JobStatus.FAILED) } - updateStatus( - tracker.values.toList(), - tracker.any { (_, job) -> - job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING - }, - ) + updateStatus() } private suspend fun process(job: UpdateJob) { tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) - updateStatus(tracker.values.toList(), true) + updateStatus() tracker[job.manga.id] = try { @@ -207,7 +236,15 @@ class Updater : IUpdater { job.copy(status = JobStatus.FAILED) } - updateStatus(tracker.values.toList()) + val wasLastJob = tracker.values.none { it.status == JobStatus.PENDING || it.status == JobStatus.RUNNING } + + // in case this is the last update job, the running flag has to be true, before it gets set to false, to be able + // to properly clear the dataloader store in UpdateType + updateStatus(immediate = wasLastJob, isRunning = true) + + if (wasLastJob) { + updateStatus(isRunning = false) + } } override fun addCategoriesToUpdateQueue( @@ -274,10 +311,15 @@ class Updater : IUpdater { .toList() val skippedMangas = categoriesToUpdateMangas.subtract(mangasToUpdate.toSet()).toList() - // In case no manga gets updated and no update job was running before, the client would never receive an info about its update request - updateStatus(emptyList(), mangasToUpdate.isNotEmpty(), updateStatusCategories, skippedMangas) + this.updateStatusCategories = updateStatusCategories + this.updateStatusSkippedMangas = skippedMangas if (mangasToUpdate.isEmpty()) { + // In case no manga gets updated and no update job was running before, the client would never receive an info + // about its update request + scope.launch { + updateStatus(immediate = true) + } return } @@ -288,8 +330,9 @@ class Updater : IUpdater { } private fun addMangasToQueue(mangasToUpdate: List) { + // create all manga update jobs before adding them to the queue so that the client is able to calculate the + // progress properly right form the start mangasToUpdate.forEach { tracker[it.id] = UpdateJob(it) } - updateStatus(tracker.values.toList(), mangasToUpdate.isNotEmpty()) mangasToUpdate.forEach { addMangaToQueue(it) } } @@ -303,7 +346,11 @@ class Updater : IUpdater { override fun reset() { scope.coroutineContext.cancelChildren() tracker.clear() - updateStatus(emptyList(), false) + this.updateStatusCategories = emptyMap() + this.updateStatusSkippedMangas = emptyList() + scope.launch { + updateStatus(immediate = true, isRunning = false) + } updateChannels.forEach { (_, channel) -> channel.cancel() } updateChannels.clear() } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt index e7cdf2fb..de8c5428 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/UpdaterSocket.kt @@ -23,12 +23,12 @@ object UpdaterSocket : Websocket() { ctx: WsContext, value: UpdateStatus?, ) { - ctx.send(value ?: updater.status.value) + ctx.send(value ?: updater.statusDeprecated.value) } override fun handleRequest(ctx: WsMessageContext) { when (ctx.message()) { - "STATUS" -> notifyClient(ctx, updater.status.value) + "STATUS" -> notifyClient(ctx, updater.statusDeprecated.value) else -> ctx.send( """