update library grouped by source (#511)

* Update mangas grouped by source

* Limit parallel update requests
This commit is contained in:
schroda
2023-03-10 08:33:09 +01:00
committed by GitHub
parent a0081dec07
commit ec1d65f4c3
4 changed files with 34 additions and 10 deletions

View File

@@ -14,9 +14,12 @@ import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import mu.KotlinLogging import mu.KotlinLogging
import suwayomi.tachidesk.manga.impl.Chapter import suwayomi.tachidesk.manga.impl.Chapter
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import suwayomi.tachidesk.server.serverConfig
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
class Updater : IUpdater { class Updater : IUpdater {
@@ -27,18 +30,29 @@ class Updater : IUpdater {
override val status = _status.asStateFlow() override val status = _status.asStateFlow()
private val tracker = ConcurrentHashMap<Int, UpdateJob>() private val tracker = ConcurrentHashMap<Int, UpdateJob>()
private var updateChannel = createUpdateChannel() private val updateChannels = ConcurrentHashMap<String, Channel<UpdateJob>>()
private val semaphore = Semaphore(serverConfig.maxParallelUpdateRequests)
private fun getOrCreateUpdateChannelFor(source: String): Channel<UpdateJob> {
return updateChannels.getOrPut(source) {
logger.debug { "getOrCreateUpdateChannelFor: created channel for $source - channels: ${updateChannels.size + 1}" }
createUpdateChannel()
}
}
private fun createUpdateChannel(): Channel<UpdateJob> { private fun createUpdateChannel(): Channel<UpdateJob> {
val channel = Channel<UpdateJob>(Channel.UNLIMITED) val channel = Channel<UpdateJob>(Channel.UNLIMITED)
channel.consumeAsFlow() channel.consumeAsFlow()
.onEach { job -> .onEach { job ->
_status.value = UpdateStatus( semaphore.withPermit {
process(job), _status.value = UpdateStatus(
tracker.any { (_, job) -> process(job),
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING tracker.any { (_, job) ->
} job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
) }
)
}
} }
.catch { logger.error(it) { "Error during updates" } } .catch { logger.error(it) { "Error during updates" } }
.launchIn(scope) .launchIn(scope)
@@ -49,7 +63,7 @@ class Updater : IUpdater {
tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING)
_status.update { UpdateStatus(tracker.values.toList(), true) } _status.update { UpdateStatus(tracker.values.toList(), true) }
tracker[job.manga.id] = try { tracker[job.manga.id] = try {
logger.info { "Updating ${job.manga.title}" } logger.info { "Updating \"${job.manga.title}\" (source: ${job.manga.sourceId})" }
Chapter.getChapterList(job.manga.id, true) Chapter.getChapterList(job.manga.id, true)
job.copy(status = JobStatus.COMPLETE) job.copy(status = JobStatus.COMPLETE)
} catch (e: Exception) { } catch (e: Exception) {
@@ -61,6 +75,7 @@ class Updater : IUpdater {
} }
override fun addMangaToQueue(manga: MangaDataClass) { override fun addMangaToQueue(manga: MangaDataClass) {
val updateChannel = getOrCreateUpdateChannelFor(manga.sourceId)
scope.launch { scope.launch {
updateChannel.send(UpdateJob(manga)) updateChannel.send(UpdateJob(manga))
} }
@@ -72,7 +87,7 @@ class Updater : IUpdater {
scope.coroutineContext.cancelChildren() scope.coroutineContext.cancelChildren()
tracker.clear() tracker.clear()
_status.update { UpdateStatus() } _status.update { UpdateStatus() }
updateChannel.cancel() updateChannels.forEach { (_, channel) -> channel.cancel() }
updateChannel = createUpdateChannel() updateChannels.clear()
} }
} }

View File

@@ -33,6 +33,9 @@ class ServerConfig(config: Config, moduleName: String = MODULE_NAME) : SystemPro
val downloadAsCbz: Boolean by overridableConfig val downloadAsCbz: Boolean by overridableConfig
val downloadsPath: String by overridableConfig val downloadsPath: String by overridableConfig
// updater
val maxParallelUpdateRequests: Int by overridableConfig
// Authentication // Authentication
val basicAuthEnabled: Boolean by overridableConfig val basicAuthEnabled: Boolean by overridableConfig
val basicAuthUsername: String by overridableConfig val basicAuthUsername: String by overridableConfig

View File

@@ -18,6 +18,9 @@ server.electronPath = ""
server.downloadAsCbz = false server.downloadAsCbz = false
server.downloadsPath = "" server.downloadsPath = ""
# updater
server.maxParallelUpdateRequests = 10 # sets how many sources can be updated in parallel. updates are grouped by source and all mangas of a source are updated synchronously
# Authentication # Authentication
server.basicAuthEnabled = false server.basicAuthEnabled = false
server.basicAuthUsername = "" server.basicAuthUsername = ""

View File

@@ -10,6 +10,9 @@ server.socksProxyPort = ""
# downloader # downloader
server.downloadAsCbz = false server.downloadAsCbz = false
# updater
server.maxParallelUpdateRequests = 10
# misc # misc
server.debugLogsEnabled = true server.debugLogsEnabled = true
server.systemTrayEnabled = false server.systemTrayEnabled = false