Download queue missing update fix (#450)

* Add immediate updates to download queue manager for updates that always needs to be delivered

* Update server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt

Co-authored-by: Mitchell Syer <Mitchellptbo@gmail.com>

* Revert change to make sure that data in status sent to client are actual

* Reduce number of immediate updates to clients

Co-authored-by: Mitchell Syer <Mitchellptbo@gmail.com>
This commit is contained in:
Valter Martinek
2022-11-16 17:33:51 +01:00
committed by GitHub
parent d6127d6811
commit e7cb88c757
2 changed files with 27 additions and 19 deletions

View File

@@ -80,17 +80,25 @@ object DownloadManager {
init { init {
scope.launch { scope.launch {
notifyFlow.sample(1.seconds).collect { notifyFlow.sample(1.seconds).collect {
val status = getStatus() sendStatusToAllClients()
clients.forEach {
it.value.send(status)
}
} }
} }
} }
private fun notifyAllClients() { private fun sendStatusToAllClients() {
scope.launch { val status = getStatus()
notifyFlow.emit(Unit) clients.forEach {
it.value.send(status)
}
}
private fun notifyAllClients(immediate: Boolean = false) {
if (immediate) {
sendStatusToAllClients()
} else {
scope.launch {
notifyFlow.emit(Unit)
}
} }
} }
@@ -206,7 +214,7 @@ object DownloadManager {
val addedChapters = inputs.mapNotNull { addToQueue(it.first, it.second) } val addedChapters = inputs.mapNotNull { addToQueue(it.first, it.second) }
if (addedChapters.isNotEmpty()) { if (addedChapters.isNotEmpty()) {
start() start()
notifyAllClients() notifyAllClients(true)
} }
scope.launch { scope.launch {
downloaderWatch.emit(Unit) downloaderWatch.emit(Unit)

View File

@@ -39,15 +39,15 @@ class Downloader(
private val scope: CoroutineScope, private val scope: CoroutineScope,
val sourceId: Long, val sourceId: Long,
private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>, private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>,
private val notifier: () -> Unit, private val notifier: (immediate: Boolean) -> Unit,
private val onComplete: () -> Unit private val onComplete: () -> Unit
) { ) {
private var job: Job? = null private var job: Job? = null
class StopDownloadException : Exception("Cancelled download") class StopDownloadException : Exception("Cancelled download")
class PauseDownloadException : Exception("Pause download") class PauseDownloadException : Exception("Pause download")
private suspend fun step(download: DownloadChapter?) { private suspend fun step(download: DownloadChapter?, immediate: Boolean) {
notifier() notifier(immediate)
currentCoroutineContext().ensureActive() currentCoroutineContext().ensureActive()
if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) { if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) {
if (download in downloadQueue) { if (download in downloadQueue) {
@@ -74,7 +74,7 @@ class Downloader(
} }
} }
notifier() notifier(false)
} }
suspend fun stop() { suspend fun stop() {
@@ -90,10 +90,10 @@ class Downloader(
try { try {
download.state = Downloading download.state = Downloading
step(download) step(download, true)
download.chapter = getChapterDownloadReady(download.chapterIndex, download.mangaId) download.chapter = getChapterDownloadReady(download.chapterIndex, download.mangaId)
step(download) step(download, false)
val pageCount = download.chapter.pageCount val pageCount = download.chapter.pageCount
for (pageNum in 0 until pageCount) { for (pageNum in 0 until pageCount) {
@@ -109,7 +109,7 @@ class Downloader(
.distinctUntilChanged() .distinctUntilChanged()
.onEach { .onEach {
download.progress = (pageNum.toFloat() + (it.toFloat() * 0.01f)) / pageCount download.progress = (pageNum.toFloat() + (it.toFloat() * 0.01f)) / pageCount
step(null) // don't throw on canceled download here since we can't do anything step(null, false) // don't throw on canceled download here since we can't do anything
} }
.launchIn(scope) .launchIn(scope)
} }
@@ -120,7 +120,7 @@ class Downloader(
} }
// TODO: retry on error with 2,4,8 seconds of wait // TODO: retry on error with 2,4,8 seconds of wait
download.progress = ((pageNum + 1).toFloat()) / pageCount download.progress = ((pageNum + 1).toFloat()) / pageCount
step(download) step(download, false)
} }
download.state = Finished download.state = Finished
transaction { transaction {
@@ -128,10 +128,10 @@ class Downloader(
it[isDownloaded] = true it[isDownloaded] = true
} }
} }
step(download) step(download, true)
downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex } downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex }
step(null) step(null, false)
} catch (e: CancellationException) { } catch (e: CancellationException) {
logger.debug("Downloader was stopped") logger.debug("Downloader was stopped")
downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued } downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued }
@@ -142,7 +142,7 @@ class Downloader(
download.tries++ download.tries++
download.state = Error download.state = Error
} finally { } finally {
notifier() notifier(false)
} }
} }
} }