diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt index 6def341d..7316038a 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/update/Updater.kt @@ -14,9 +14,12 @@ import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.update import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit import mu.KotlinLogging import suwayomi.tachidesk.manga.impl.Chapter import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass +import suwayomi.tachidesk.server.serverConfig import java.util.concurrent.ConcurrentHashMap class Updater : IUpdater { @@ -27,18 +30,29 @@ class Updater : IUpdater { override val status = _status.asStateFlow() private val tracker = ConcurrentHashMap() - private var updateChannel = createUpdateChannel() + private val updateChannels = ConcurrentHashMap>() + + private val semaphore = Semaphore(serverConfig.maxParallelUpdateRequests) + + private fun getOrCreateUpdateChannelFor(source: String): Channel { + return updateChannels.getOrPut(source) { + logger.debug { "getOrCreateUpdateChannelFor: created channel for $source - channels: ${updateChannels.size + 1}" } + createUpdateChannel() + } + } private fun createUpdateChannel(): Channel { val channel = Channel(Channel.UNLIMITED) channel.consumeAsFlow() .onEach { job -> - _status.value = UpdateStatus( - process(job), - tracker.any { (_, job) -> - job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING - } - ) + semaphore.withPermit { + _status.value = UpdateStatus( + process(job), + tracker.any { (_, job) -> + job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING + } + ) + } } .catch { logger.error(it) { "Error during updates" } } .launchIn(scope) @@ -49,7 +63,7 @@ class Updater : IUpdater { tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) _status.update { UpdateStatus(tracker.values.toList(), true) } tracker[job.manga.id] = try { - logger.info { "Updating ${job.manga.title}" } + logger.info { "Updating \"${job.manga.title}\" (source: ${job.manga.sourceId})" } Chapter.getChapterList(job.manga.id, true) job.copy(status = JobStatus.COMPLETE) } catch (e: Exception) { @@ -61,6 +75,7 @@ class Updater : IUpdater { } override fun addMangaToQueue(manga: MangaDataClass) { + val updateChannel = getOrCreateUpdateChannelFor(manga.sourceId) scope.launch { updateChannel.send(UpdateJob(manga)) } @@ -72,7 +87,7 @@ class Updater : IUpdater { scope.coroutineContext.cancelChildren() tracker.clear() _status.update { UpdateStatus() } - updateChannel.cancel() - updateChannel = createUpdateChannel() + updateChannels.forEach { (_, channel) -> channel.cancel() } + updateChannels.clear() } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt b/server/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt index 054a92e9..1fac844d 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt @@ -33,6 +33,9 @@ class ServerConfig(config: Config, moduleName: String = MODULE_NAME) : SystemPro val downloadAsCbz: Boolean by overridableConfig val downloadsPath: String by overridableConfig + // updater + val maxParallelUpdateRequests: Int by overridableConfig + // Authentication val basicAuthEnabled: Boolean by overridableConfig val basicAuthUsername: String by overridableConfig diff --git a/server/src/main/resources/server-reference.conf b/server/src/main/resources/server-reference.conf index 023664f6..d6a1d889 100644 --- a/server/src/main/resources/server-reference.conf +++ b/server/src/main/resources/server-reference.conf @@ -18,6 +18,9 @@ server.electronPath = "" server.downloadAsCbz = false server.downloadsPath = "" +# updater +server.maxParallelUpdateRequests = 10 # sets how many sources can be updated in parallel. updates are grouped by source and all mangas of a source are updated synchronously + # Authentication server.basicAuthEnabled = false server.basicAuthUsername = "" diff --git a/server/src/test/resources/server-reference.conf b/server/src/test/resources/server-reference.conf index 3073f859..8c04e2f6 100644 --- a/server/src/test/resources/server-reference.conf +++ b/server/src/test/resources/server-reference.conf @@ -10,6 +10,9 @@ server.socksProxyPort = "" # downloader server.downloadAsCbz = false +# updater +server.maxParallelUpdateRequests = 10 + # misc server.debugLogsEnabled = true server.systemTrayEnabled = false