Fix/update subscription clear data loader cache (#908)

* Set updater running flag to false only at the end of the update

For clearing the data loader cache properly, the update status subscription requires the update to be running.

For the last completed manga update the flag was immediately set to false which prevented the dataloader cache from getting cleared, returning outdated data for the last updated manga

* Correctly clear the "MangaForIdsDataLoader" cache

The cache keys for this dataloader are lists of manga ids.
Thus, it is not possible to clear only the cached data of the provided manga id and instead each cache entry that includes the manga id has to be cleared

* Ensure that manga dataloader caches gets cleared during global update

The "StateFlow" drops value updates in case the collector is too slow, which was the case for the "UpdateSubscription".

This caused the dataloader cache to not get properly cleared because the running state of the update was already set to false.
This commit is contained in:
schroda
2024-03-31 19:19:49 +02:00
committed by GitHub
parent b2aff1efc9
commit 6d539d3404
9 changed files with 190 additions and 54 deletions

View File

@@ -0,0 +1,46 @@
package suwayomi.tachidesk.graphql.cache
import org.dataloader.CacheMap
import java.util.concurrent.CompletableFuture
class CustomCacheMap<K, V> : CacheMap<K, V> {
private val cache: MutableMap<K, CompletableFuture<V>>
init {
cache = HashMap()
}
override fun containsKey(key: K): Boolean {
return cache.containsKey(key)
}
override fun get(key: K): CompletableFuture<V> {
return cache[key]!!
}
fun getKeys(): Collection<K> {
return cache.keys.toSet()
}
override fun getAll(): Collection<CompletableFuture<V>> {
return cache.values
}
override fun set(
key: K,
value: CompletableFuture<V>,
): CacheMap<K, V> {
cache[key] = value
return this
}
override fun delete(key: K): CacheMap<K, V> {
cache.remove(key)
return this
}
override fun clear(): CacheMap<K, V> {
cache.clear()
return this
}
}

View File

@@ -10,11 +10,13 @@ package suwayomi.tachidesk.graphql.dataLoaders
import com.expediagroup.graphql.dataloader.KotlinDataLoader
import org.dataloader.DataLoader
import org.dataloader.DataLoaderFactory
import org.dataloader.DataLoaderOptions
import org.jetbrains.exposed.sql.Slf4jSqlDebugLogger
import org.jetbrains.exposed.sql.addLogger
import org.jetbrains.exposed.sql.andWhere
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import suwayomi.tachidesk.graphql.cache.CustomCacheMap
import suwayomi.tachidesk.graphql.types.MangaNodeList
import suwayomi.tachidesk.graphql.types.MangaNodeList.Companion.toNodeList
import suwayomi.tachidesk.graphql.types.MangaType
@@ -95,18 +97,21 @@ class MangaForIdsDataLoader : KotlinDataLoader<List<Int>, MangaNodeList> {
override val dataLoaderName = "MangaForIdsDataLoader"
override fun getDataLoader(): DataLoader<List<Int>, MangaNodeList> =
DataLoaderFactory.newDataLoader { mangaIds ->
future {
transaction {
addLogger(Slf4jSqlDebugLogger)
val ids = mangaIds.flatten().distinct()
val manga =
MangaTable.select { MangaTable.id inList ids }
.map { MangaType(it) }
mangaIds.map { mangaIds ->
manga.filter { it.id in mangaIds }.toNodeList()
DataLoaderFactory.newDataLoader(
{ mangaIds ->
future {
transaction {
addLogger(Slf4jSqlDebugLogger)
val ids = mangaIds.flatten().distinct()
val manga =
MangaTable.select { MangaTable.id inList ids }
.map { MangaType(it) }
mangaIds.map { mangaIds ->
manga.filter { it.id in mangaIds }.toNodeList()
}
}
}
}
}
},
DataLoaderOptions.newOptions().setCacheMap(CustomCacheMap<List<Int>, MangaNodeList>()),
)
}

View File

@@ -1,5 +1,7 @@
package suwayomi.tachidesk.graphql.mutations
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withTimeout
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import org.kodein.di.DI
@@ -10,6 +12,9 @@ import suwayomi.tachidesk.manga.impl.Category
import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.manga.model.table.CategoryTable
import suwayomi.tachidesk.manga.model.table.toDataClass
import suwayomi.tachidesk.server.JavalinSetup.future
import java.util.concurrent.CompletableFuture
import kotlin.time.Duration.Companion.seconds
class UpdateMutation {
private val updater by DI.global.instance<IUpdater>()
@@ -23,14 +28,22 @@ class UpdateMutation {
val updateStatus: UpdateStatus,
)
fun updateLibraryManga(input: UpdateLibraryMangaInput): UpdateLibraryMangaPayload {
fun updateLibraryManga(input: UpdateLibraryMangaInput): CompletableFuture<UpdateLibraryMangaPayload> {
updater.addCategoriesToUpdateQueue(
Category.getCategoryList(),
clear = true,
forceAll = false,
)
return UpdateLibraryMangaPayload(input.clientMutationId, UpdateStatus(updater.status.value))
return future {
UpdateLibraryMangaPayload(
input.clientMutationId,
updateStatus =
withTimeout(30.seconds) {
UpdateStatus(updater.status.first())
},
)
}
}
data class UpdateCategoryMangaInput(
@@ -43,7 +56,7 @@ class UpdateMutation {
val updateStatus: UpdateStatus,
)
fun updateCategoryManga(input: UpdateCategoryMangaInput): UpdateCategoryMangaPayload {
fun updateCategoryManga(input: UpdateCategoryMangaInput): CompletableFuture<UpdateCategoryMangaPayload> {
val categories =
transaction {
CategoryTable.select { CategoryTable.id inList input.categories }.map {
@@ -52,10 +65,15 @@ class UpdateMutation {
}
updater.addCategoriesToUpdateQueue(categories, clear = true, forceAll = true)
return UpdateCategoryMangaPayload(
clientMutationId = input.clientMutationId,
updateStatus = UpdateStatus(updater.status.value),
)
return future {
UpdateCategoryMangaPayload(
input.clientMutationId,
updateStatus =
withTimeout(30.seconds) {
UpdateStatus(updater.status.first())
},
)
}
}
data class UpdateStopInput(

View File

@@ -1,16 +1,19 @@
package suwayomi.tachidesk.graphql.queries
import kotlinx.coroutines.flow.first
import org.kodein.di.DI
import org.kodein.di.conf.global
import org.kodein.di.instance
import suwayomi.tachidesk.graphql.types.UpdateStatus
import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.server.JavalinSetup.future
import java.util.concurrent.CompletableFuture
class UpdateQuery {
private val updater by DI.global.instance<IUpdater>()
fun updateStatus(): UpdateStatus {
return UpdateStatus(updater.status.value)
fun updateStatus(): CompletableFuture<UpdateStatus> {
return future { UpdateStatus(updater.status.first()) }
}
data class LastUpdateTimestampPayload(val timestamp: Long)

View File

@@ -11,6 +11,7 @@ import com.expediagroup.graphql.server.extensions.getValueFromDataLoader
import eu.kanade.tachiyomi.source.model.UpdateStrategy
import graphql.schema.DataFetchingEnvironment
import org.jetbrains.exposed.sql.ResultRow
import suwayomi.tachidesk.graphql.cache.CustomCacheMap
import suwayomi.tachidesk.graphql.server.primitives.Cursor
import suwayomi.tachidesk.graphql.server.primitives.Edge
import suwayomi.tachidesk.graphql.server.primitives.Node
@@ -45,12 +46,25 @@ class MangaType(
var chaptersLastFetchedAt: Long?, // todo
) : Node {
companion object {
fun clearCacheFor(
mangaIds: List<Int>,
dataFetchingEnvironment: DataFetchingEnvironment,
) {
mangaIds.forEach { clearCacheFor(it, dataFetchingEnvironment) }
}
fun clearCacheFor(
mangaId: Int,
dataFetchingEnvironment: DataFetchingEnvironment,
) {
dataFetchingEnvironment.getDataLoader<Int, MangaType>("MangaDataLoader").clear(mangaId)
dataFetchingEnvironment.getDataLoader<Int, MangaNodeList>("MangaForIdsDataLoader").clear(mangaId)
val mangaForIdsDataLoader =
dataFetchingEnvironment.getDataLoader<List<Int>, MangaNodeList>("MangaForIdsDataLoader")
@Suppress("UNCHECKED_CAST")
(mangaForIdsDataLoader.cacheMap as CustomCacheMap<List<Int>, MangaNodeList>).getKeys()
.filter { it.contains(mangaId) }.forEach { mangaForIdsDataLoader.clear(it) }
dataFetchingEnvironment.getDataLoader<Int, Int>("DownloadedChapterCountForMangaDataLoader").clear(mangaId)
dataFetchingEnvironment.getDataLoader<Int, Int>("UnreadChapterCountForMangaDataLoader").clear(mangaId)
dataFetchingEnvironment.getDataLoader<Int, ChapterType>("LastReadChapterForMangaDataLoader").clear(mangaId)

View File

@@ -117,7 +117,7 @@ object UpdateController {
},
behaviorOf = { ctx ->
val updater by DI.global.instance<IUpdater>()
ctx.json(updater.status.value)
ctx.json(updater.statusDeprecated.value)
},
withResults = {
json<UpdateStatus>(HttpCode.OK)

View File

@@ -1,5 +1,6 @@
package suwayomi.tachidesk.manga.impl.update
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
@@ -12,7 +13,9 @@ interface IUpdater {
forceAll: Boolean,
)
val status: StateFlow<UpdateStatus>
val status: Flow<UpdateStatus>
val statusDeprecated: StateFlow<UpdateStatus>
fun reset()
}

View File

@@ -6,9 +6,12 @@ import eu.kanade.tachiyomi.source.model.UpdateStrategy
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.catch
@@ -16,6 +19,8 @@ import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
@@ -37,14 +42,33 @@ import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import kotlin.math.absoluteValue
import kotlin.time.Duration.Companion.hours
import kotlin.time.Duration.Companion.seconds
@OptIn(FlowPreview::class)
class Updater : IUpdater {
private val logger = KotlinLogging.logger {}
private val notifyFlowScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val _status = MutableStateFlow(UpdateStatus())
override val status = _status.asStateFlow()
private val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
private val statusFlow = MutableSharedFlow<UpdateStatus>()
override val status = statusFlow.onStart { emit(getStatus()) }
init {
// has to be in its own scope (notifyFlowScope), otherwise, the collection gets canceled due to canceling the scopes (scope) children in the reset function
notifyFlowScope.launch {
notifyFlow.sample(1.seconds).collect {
updateStatus(immediate = true)
}
}
}
private val _status = MutableStateFlow(UpdateStatus())
override val statusDeprecated = _status.asStateFlow()
private var updateStatusCategories: Map<CategoryUpdateStatus, List<CategoryDataClass>> = emptyMap()
private var updateStatusSkippedMangas: List<MangaDataClass> = emptyList()
private val tracker = ConcurrentHashMap<Int, UpdateJob>()
private val updateChannels = ConcurrentHashMap<String, Channel<UpdateJob>>()
@@ -87,7 +111,7 @@ class Updater : IUpdater {
val lastAutomatedUpdate = preferences.getLong(lastAutomatedUpdateKey, 0)
preferences.edit().putLong(lastAutomatedUpdateKey, System.currentTimeMillis()).apply()
if (status.value.running) {
if (getStatus().running) {
logger.debug { "Global update is already in progress" }
return
}
@@ -123,23 +147,33 @@ class Updater : IUpdater {
HAScheduler.schedule(::autoUpdateTask, updateInterval, timeToNextExecution, "global-update")
}
/**
* Updates the status and sustains the "skippedMangas"
*/
private fun updateStatus(
jobs: List<UpdateJob>,
running: Boolean? = null,
categories: Map<CategoryUpdateStatus, List<CategoryDataClass>>? = null,
skippedMangas: List<MangaDataClass>? = null,
) {
private fun getStatus(running: Boolean? = null): UpdateStatus {
val jobs = tracker.values.toList()
val isRunning =
running
?: jobs.any { job ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
}
val updateStatusCategories = categories ?: _status.value.categoryStatusMap
val tmpSkippedMangas = skippedMangas ?: _status.value.mangaStatusMap[JobStatus.SKIPPED] ?: emptyList()
_status.update { UpdateStatus(updateStatusCategories, jobs, tmpSkippedMangas, isRunning) }
return UpdateStatus(this.updateStatusCategories, jobs, this.updateStatusSkippedMangas, isRunning)
}
/**
* Pass "isRunning" to force a specific running state
*/
private suspend fun updateStatus(
immediate: Boolean = false,
isRunning: Boolean? = null,
) {
if (immediate) {
val status = getStatus(running = isRunning)
statusFlow.emit(status)
_status.update { status }
return
}
notifyFlow.emit(Unit)
}
private fun getOrCreateUpdateChannelFor(source: String): Channel<UpdateJob> {
@@ -166,7 +200,7 @@ class Updater : IUpdater {
return channel
}
private fun handleChannelUpdateFailure(source: String) {
private suspend fun handleChannelUpdateFailure(source: String) {
val isFailedSourceUpdate = { job: UpdateJob ->
val isForSource = job.manga.sourceId == source
val hasFailed = job.status == JobStatus.FAILED
@@ -181,17 +215,12 @@ class Updater : IUpdater {
tracker[mangaId] = job.copy(status = JobStatus.FAILED)
}
updateStatus(
tracker.values.toList(),
tracker.any { (_, job) ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
},
)
updateStatus()
}
private suspend fun process(job: UpdateJob) {
tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING)
updateStatus(tracker.values.toList(), true)
updateStatus()
tracker[job.manga.id] =
try {
@@ -207,7 +236,15 @@ class Updater : IUpdater {
job.copy(status = JobStatus.FAILED)
}
updateStatus(tracker.values.toList())
val wasLastJob = tracker.values.none { it.status == JobStatus.PENDING || it.status == JobStatus.RUNNING }
// in case this is the last update job, the running flag has to be true, before it gets set to false, to be able
// to properly clear the dataloader store in UpdateType
updateStatus(immediate = wasLastJob, isRunning = true)
if (wasLastJob) {
updateStatus(isRunning = false)
}
}
override fun addCategoriesToUpdateQueue(
@@ -274,10 +311,15 @@ class Updater : IUpdater {
.toList()
val skippedMangas = categoriesToUpdateMangas.subtract(mangasToUpdate.toSet()).toList()
// In case no manga gets updated and no update job was running before, the client would never receive an info about its update request
updateStatus(emptyList(), mangasToUpdate.isNotEmpty(), updateStatusCategories, skippedMangas)
this.updateStatusCategories = updateStatusCategories
this.updateStatusSkippedMangas = skippedMangas
if (mangasToUpdate.isEmpty()) {
// In case no manga gets updated and no update job was running before, the client would never receive an info
// about its update request
scope.launch {
updateStatus(immediate = true)
}
return
}
@@ -288,8 +330,9 @@ class Updater : IUpdater {
}
private fun addMangasToQueue(mangasToUpdate: List<MangaDataClass>) {
// create all manga update jobs before adding them to the queue so that the client is able to calculate the
// progress properly right form the start
mangasToUpdate.forEach { tracker[it.id] = UpdateJob(it) }
updateStatus(tracker.values.toList(), mangasToUpdate.isNotEmpty())
mangasToUpdate.forEach { addMangaToQueue(it) }
}
@@ -303,7 +346,11 @@ class Updater : IUpdater {
override fun reset() {
scope.coroutineContext.cancelChildren()
tracker.clear()
updateStatus(emptyList(), false)
this.updateStatusCategories = emptyMap()
this.updateStatusSkippedMangas = emptyList()
scope.launch {
updateStatus(immediate = true, isRunning = false)
}
updateChannels.forEach { (_, channel) -> channel.cancel() }
updateChannels.clear()
}

View File

@@ -23,12 +23,12 @@ object UpdaterSocket : Websocket<UpdateStatus>() {
ctx: WsContext,
value: UpdateStatus?,
) {
ctx.send(value ?: updater.status.value)
ctx.send(value ?: updater.statusDeprecated.value)
}
override fun handleRequest(ctx: WsMessageContext) {
when (ctx.message()) {
"STATUS" -> notifyClient(ctx, updater.status.value)
"STATUS" -> notifyClient(ctx, updater.statusDeprecated.value)
else ->
ctx.send(
"""