mirror of
https://github.com/Suwayomi/Tachidesk.git
synced 2025-12-10 06:42:07 +01:00
Feature/graphql download queue subscription send only updates (#1011)
* Emit only download changes instead of full status The download subscription emitted the full download status, which, depending on how big the queue was, took forever because the graphql subscription does not support data loader batching, causing it to run into the n+1 problem * Rename "DownloadManager#status" to "DownloadManager#updates" * Add initial queue to download subscription type Adds the current queue at the time of sending the initial message. This field is null for all following messages after the initial one * Optionally limit and omit download updates To prevent the n+1 dataloader issue, the max number of updates included in the download subscription can be limited. This way, the problem will be circumvented and instead, the latest download status should be (re-)fetched via the download status query, which does not run into this problem. * Formatting
This commit is contained in:
@@ -10,6 +10,7 @@ import suwayomi.tachidesk.graphql.types.ChapterType
|
|||||||
import suwayomi.tachidesk.graphql.types.DownloadStatus
|
import suwayomi.tachidesk.graphql.types.DownloadStatus
|
||||||
import suwayomi.tachidesk.manga.impl.Chapter
|
import suwayomi.tachidesk.manga.impl.Chapter
|
||||||
import suwayomi.tachidesk.manga.impl.download.DownloadManager
|
import suwayomi.tachidesk.manga.impl.download.DownloadManager
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.DEQUEUED
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.Status
|
import suwayomi.tachidesk.manga.impl.download.model.Status
|
||||||
import suwayomi.tachidesk.manga.model.table.ChapterTable
|
import suwayomi.tachidesk.manga.model.table.ChapterTable
|
||||||
import suwayomi.tachidesk.server.JavalinSetup.future
|
import suwayomi.tachidesk.server.JavalinSetup.future
|
||||||
@@ -94,7 +95,12 @@ class DownloadMutation {
|
|||||||
clientMutationId = clientMutationId,
|
clientMutationId = clientMutationId,
|
||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(DownloadManager.status.first { it.queue.any { it.chapter.id in chapters } })
|
DownloadStatus(
|
||||||
|
DownloadManager.updates
|
||||||
|
.first {
|
||||||
|
DownloadManager.getStatus().queue.any { it.chapter.id in chapters }
|
||||||
|
}.let { DownloadManager.getStatus() },
|
||||||
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -122,7 +128,11 @@ class DownloadMutation {
|
|||||||
clientMutationId = clientMutationId,
|
clientMutationId = clientMutationId,
|
||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(DownloadManager.status.first { it.queue.any { it.chapter.id == chapter } })
|
DownloadStatus(
|
||||||
|
DownloadManager.updates
|
||||||
|
.first { it.updates.any { it.downloadChapter.chapter.id == chapter } }
|
||||||
|
.let { DownloadManager.getStatus() },
|
||||||
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -152,7 +162,14 @@ class DownloadMutation {
|
|||||||
clientMutationId = clientMutationId,
|
clientMutationId = clientMutationId,
|
||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(DownloadManager.status.first { it.queue.none { it.chapter.id in chapters } })
|
DownloadStatus(
|
||||||
|
DownloadManager.updates
|
||||||
|
.first {
|
||||||
|
it.updates.none {
|
||||||
|
it.downloadChapter.chapter.id in chapters && it.type != DEQUEUED
|
||||||
|
}
|
||||||
|
}.let { DownloadManager.getStatus() },
|
||||||
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -180,7 +197,14 @@ class DownloadMutation {
|
|||||||
clientMutationId = clientMutationId,
|
clientMutationId = clientMutationId,
|
||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(DownloadManager.status.first { it.queue.none { it.chapter.id == chapter } })
|
DownloadStatus(
|
||||||
|
DownloadManager.updates
|
||||||
|
.first {
|
||||||
|
it.updates.none {
|
||||||
|
it.downloadChapter.chapter.id == chapter && it.type != DEQUEUED
|
||||||
|
}
|
||||||
|
}.let { DownloadManager.getStatus() },
|
||||||
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -206,7 +230,9 @@ class DownloadMutation {
|
|||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(
|
DownloadStatus(
|
||||||
DownloadManager.status.first { it.status == Status.Started },
|
DownloadManager.updates
|
||||||
|
.first { it.status == Status.Started }
|
||||||
|
.let { DownloadManager.getStatus() },
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -232,7 +258,9 @@ class DownloadMutation {
|
|||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(
|
DownloadStatus(
|
||||||
DownloadManager.status.first { it.status == Status.Stopped },
|
DownloadManager.updates
|
||||||
|
.first { it.status == Status.Stopped }
|
||||||
|
.let { DownloadManager.getStatus() },
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -258,7 +286,9 @@ class DownloadMutation {
|
|||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(
|
DownloadStatus(
|
||||||
DownloadManager.status.first { it.status == Status.Stopped && it.queue.isEmpty() },
|
DownloadManager.updates
|
||||||
|
.first { it.status == Status.Stopped }
|
||||||
|
.let { DownloadManager.getStatus() },
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@@ -288,7 +318,9 @@ class DownloadMutation {
|
|||||||
downloadStatus =
|
downloadStatus =
|
||||||
withTimeout(30.seconds) {
|
withTimeout(30.seconds) {
|
||||||
DownloadStatus(
|
DownloadStatus(
|
||||||
DownloadManager.status.first { it.queue.indexOfFirst { it.chapter.id == chapter } <= to },
|
DownloadManager.updates
|
||||||
|
.first { it.updates.indexOfFirst { it.downloadChapter.chapter.id == chapter } <= to }
|
||||||
|
.let { DownloadManager.getStatus() },
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
package suwayomi.tachidesk.graphql.queries
|
package suwayomi.tachidesk.graphql.queries
|
||||||
|
|
||||||
import kotlinx.coroutines.flow.first
|
|
||||||
import suwayomi.tachidesk.graphql.types.DownloadStatus
|
import suwayomi.tachidesk.graphql.types.DownloadStatus
|
||||||
import suwayomi.tachidesk.manga.impl.download.DownloadManager
|
import suwayomi.tachidesk.manga.impl.download.DownloadManager
|
||||||
import suwayomi.tachidesk.server.JavalinSetup.future
|
import suwayomi.tachidesk.server.JavalinSetup.future
|
||||||
@@ -9,6 +8,6 @@ import java.util.concurrent.CompletableFuture
|
|||||||
class DownloadQuery {
|
class DownloadQuery {
|
||||||
fun downloadStatus(): CompletableFuture<DownloadStatus> =
|
fun downloadStatus(): CompletableFuture<DownloadStatus> =
|
||||||
future {
|
future {
|
||||||
DownloadStatus(DownloadManager.status.first())
|
DownloadStatus(DownloadManager.getStatus())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,14 +7,54 @@
|
|||||||
|
|
||||||
package suwayomi.tachidesk.graphql.subscriptions
|
package suwayomi.tachidesk.graphql.subscriptions
|
||||||
|
|
||||||
|
import com.expediagroup.graphql.generator.annotations.GraphQLDeprecated
|
||||||
|
import com.expediagroup.graphql.generator.annotations.GraphQLDescription
|
||||||
import kotlinx.coroutines.flow.Flow
|
import kotlinx.coroutines.flow.Flow
|
||||||
import kotlinx.coroutines.flow.map
|
import kotlinx.coroutines.flow.map
|
||||||
import suwayomi.tachidesk.graphql.types.DownloadStatus
|
import suwayomi.tachidesk.graphql.types.DownloadStatus
|
||||||
|
import suwayomi.tachidesk.graphql.types.DownloadUpdates
|
||||||
import suwayomi.tachidesk.manga.impl.download.DownloadManager
|
import suwayomi.tachidesk.manga.impl.download.DownloadManager
|
||||||
|
|
||||||
class DownloadSubscription {
|
class DownloadSubscription {
|
||||||
|
@GraphQLDeprecated("Replaced width downloadStatusChanged", ReplaceWith("downloadStatusChanged(input)"))
|
||||||
fun downloadChanged(): Flow<DownloadStatus> =
|
fun downloadChanged(): Flow<DownloadStatus> =
|
||||||
DownloadManager.status.map { downloadStatus ->
|
DownloadManager.status.map { downloadStatus ->
|
||||||
DownloadStatus(downloadStatus)
|
DownloadStatus(downloadStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data class DownloadChangedInput(
|
||||||
|
@GraphQLDescription(
|
||||||
|
"Sets a max number of updates that can be contained in a download update message." +
|
||||||
|
"Everything above this limit will be omitted and the \"downloadStatus\" should be re-fetched via the " +
|
||||||
|
"corresponding query. Due to the graphql subscription execution strategy not supporting batching for data loaders, " +
|
||||||
|
"the data loaders run into the n+1 problem, which can cause the server to get unresponsive until the status " +
|
||||||
|
"update has been handled. This is an issue e.g. when mass en- or dequeuing downloads.",
|
||||||
|
)
|
||||||
|
val maxUpdates: Int?,
|
||||||
|
)
|
||||||
|
|
||||||
|
fun downloadStatusChanged(input: DownloadChangedInput): Flow<DownloadUpdates> {
|
||||||
|
val omitUpdates = input.maxUpdates != null
|
||||||
|
val maxUpdates = input.maxUpdates ?: 50
|
||||||
|
|
||||||
|
return DownloadManager.updates.map { downloadUpdates ->
|
||||||
|
val omittedUpdates = omitUpdates && downloadUpdates.updates.size > maxUpdates
|
||||||
|
|
||||||
|
// the graphql subscription execution strategy does not support data loader batching which causes the n+1 problem,
|
||||||
|
// thus, too many updates (e.g. on mass enqueue or dequeue) causes unresponsiveness of the server until the
|
||||||
|
// update has been handled
|
||||||
|
val actualDownloadUpdates =
|
||||||
|
if (omittedUpdates) {
|
||||||
|
suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates(
|
||||||
|
downloadUpdates.status,
|
||||||
|
downloadUpdates.updates.subList(0, maxUpdates),
|
||||||
|
downloadUpdates.initial,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
downloadUpdates
|
||||||
|
}
|
||||||
|
|
||||||
|
DownloadUpdates(actualDownloadUpdates, omittedUpdates)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,6 +7,7 @@
|
|||||||
|
|
||||||
package suwayomi.tachidesk.graphql.types
|
package suwayomi.tachidesk.graphql.types
|
||||||
|
|
||||||
|
import com.expediagroup.graphql.generator.annotations.GraphQLDescription
|
||||||
import com.expediagroup.graphql.generator.annotations.GraphQLIgnore
|
import com.expediagroup.graphql.generator.annotations.GraphQLIgnore
|
||||||
import com.expediagroup.graphql.server.extensions.getValueFromDataLoader
|
import com.expediagroup.graphql.server.extensions.getValueFromDataLoader
|
||||||
import graphql.schema.DataFetchingEnvironment
|
import graphql.schema.DataFetchingEnvironment
|
||||||
@@ -18,6 +19,9 @@ import suwayomi.tachidesk.graphql.server.primitives.PageInfo
|
|||||||
import suwayomi.tachidesk.graphql.types.DownloadState.FINISHED
|
import suwayomi.tachidesk.graphql.types.DownloadState.FINISHED
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.Status
|
import suwayomi.tachidesk.manga.impl.download.model.Status
|
||||||
import java.util.concurrent.CompletableFuture
|
import java.util.concurrent.CompletableFuture
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState as OtherDownloadState
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadState as OtherDownloadState
|
||||||
@@ -35,6 +39,28 @@ data class DownloadStatus(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data class DownloadUpdates(
|
||||||
|
val state: DownloaderState,
|
||||||
|
val updates: List<suwayomi.tachidesk.graphql.types.DownloadUpdate>,
|
||||||
|
@GraphQLDescription("The current download queue at the time of sending initial message. Is null for all following messages")
|
||||||
|
val initial: List<DownloadType>?,
|
||||||
|
@GraphQLDescription(
|
||||||
|
"Indicates whether updates have been omitted based on the \"maxUpdates\" subscription variable. " +
|
||||||
|
"In case updates have been omitted, the \"downloadStatus\" query should be re-fetched.",
|
||||||
|
)
|
||||||
|
val omittedUpdates: Boolean,
|
||||||
|
) {
|
||||||
|
constructor(downloadUpdates: DownloadUpdates, omittedUpdates: Boolean) : this(
|
||||||
|
when (downloadUpdates.status) {
|
||||||
|
Status.Stopped -> DownloaderState.STOPPED
|
||||||
|
Status.Started -> DownloaderState.STARTED
|
||||||
|
},
|
||||||
|
downloadUpdates.updates.map { DownloadUpdate(it) },
|
||||||
|
downloadUpdates.initial?.map { DownloadType(it) },
|
||||||
|
omittedUpdates,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
class DownloadType(
|
class DownloadType(
|
||||||
@get:GraphQLIgnore
|
@get:GraphQLIgnore
|
||||||
val chapterId: Int,
|
val chapterId: Int,
|
||||||
@@ -43,6 +69,7 @@ class DownloadType(
|
|||||||
val state: DownloadState,
|
val state: DownloadState,
|
||||||
val progress: Float,
|
val progress: Float,
|
||||||
val tries: Int,
|
val tries: Int,
|
||||||
|
val position: Int,
|
||||||
) : Node {
|
) : Node {
|
||||||
constructor(downloadChapter: DownloadChapter) : this(
|
constructor(downloadChapter: DownloadChapter) : this(
|
||||||
downloadChapter.chapter.id,
|
downloadChapter.chapter.id,
|
||||||
@@ -55,6 +82,7 @@ class DownloadType(
|
|||||||
},
|
},
|
||||||
downloadChapter.progress,
|
downloadChapter.progress,
|
||||||
downloadChapter.tries,
|
downloadChapter.tries,
|
||||||
|
downloadChapter.position,
|
||||||
)
|
)
|
||||||
|
|
||||||
fun manga(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<MangaType> {
|
fun manga(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<MangaType> {
|
||||||
@@ -76,6 +104,16 @@ class DownloadType(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class DownloadUpdate(
|
||||||
|
val type: DownloadUpdateType,
|
||||||
|
val download: DownloadType,
|
||||||
|
) : Node {
|
||||||
|
constructor(downloadUpdate: DownloadUpdate) : this(
|
||||||
|
downloadUpdate.type,
|
||||||
|
DownloadType(downloadUpdate.downloadChapter),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
enum class DownloadState {
|
enum class DownloadState {
|
||||||
QUEUED,
|
QUEUED,
|
||||||
DOWNLOADING,
|
DOWNLOADING,
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ import kotlinx.coroutines.channels.BufferOverflow
|
|||||||
import kotlinx.coroutines.coroutineScope
|
import kotlinx.coroutines.coroutineScope
|
||||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||||
import kotlinx.coroutines.flow.launchIn
|
import kotlinx.coroutines.flow.launchIn
|
||||||
import kotlinx.coroutines.flow.map
|
|
||||||
import kotlinx.coroutines.flow.onEach
|
import kotlinx.coroutines.flow.onEach
|
||||||
import kotlinx.coroutines.flow.onStart
|
import kotlinx.coroutines.flow.onStart
|
||||||
import kotlinx.coroutines.flow.sample
|
import kotlinx.coroutines.flow.sample
|
||||||
@@ -36,6 +35,9 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading
|
|||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdates
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.Status
|
import suwayomi.tachidesk.manga.impl.download.model.Status
|
||||||
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
|
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
|
||||||
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
|
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
|
||||||
@@ -47,6 +49,7 @@ import uy.kohesive.injekt.Injekt
|
|||||||
import uy.kohesive.injekt.api.get
|
import uy.kohesive.injekt.api.get
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
import java.util.concurrent.CopyOnWriteArrayList
|
||||||
|
import java.util.concurrent.CopyOnWriteArraySet
|
||||||
import kotlin.reflect.jvm.jvmName
|
import kotlin.reflect.jvm.jvmName
|
||||||
import kotlin.time.Duration.Companion.seconds
|
import kotlin.time.Duration.Companion.seconds
|
||||||
|
|
||||||
@@ -57,6 +60,7 @@ object DownloadManager {
|
|||||||
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
|
||||||
private val clients = ConcurrentHashMap<String, WsContext>()
|
private val clients = ConcurrentHashMap<String, WsContext>()
|
||||||
private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>()
|
private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>()
|
||||||
|
private val downloadUpdates = CopyOnWriteArraySet<DownloadUpdate>()
|
||||||
private val downloaders = ConcurrentHashMap<String, Downloader>()
|
private val downloaders = ConcurrentHashMap<String, Downloader>()
|
||||||
|
|
||||||
private const val DOWNLOAD_QUEUE_KEY = "downloadQueueKey"
|
private const val DOWNLOAD_QUEUE_KEY = "downloadQueueKey"
|
||||||
@@ -81,6 +85,13 @@ object DownloadManager {
|
|||||||
scope.launch { saveQueueFlow.emit(Unit) }
|
scope.launch { saveQueueFlow.emit(Unit) }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun handleDownloadUpdate(
|
||||||
|
immediate: Boolean,
|
||||||
|
download: DownloadUpdate? = null,
|
||||||
|
) {
|
||||||
|
notifyAllClients(immediate, listOfNotNull(download))
|
||||||
|
}
|
||||||
|
|
||||||
fun restoreAndResumeDownloads() {
|
fun restoreAndResumeDownloads() {
|
||||||
scope.launch {
|
scope.launch {
|
||||||
logger.debug { "restoreAndResumeDownloads: Restore download queue..." }
|
logger.debug { "restoreAndResumeDownloads: Restore download queue..." }
|
||||||
@@ -124,9 +135,15 @@ object DownloadManager {
|
|||||||
|
|
||||||
private val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
private val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||||
|
|
||||||
|
@Deprecated("Replaced with updatesFlow", replaceWith = ReplaceWith("updatesFlow"))
|
||||||
private val statusFlow = MutableSharedFlow<DownloadStatus>()
|
private val statusFlow = MutableSharedFlow<DownloadStatus>()
|
||||||
|
|
||||||
|
@Deprecated("Replaced with updates", replaceWith = ReplaceWith("updates"))
|
||||||
val status = statusFlow.onStart { emit(getStatus()) }
|
val status = statusFlow.onStart { emit(getStatus()) }
|
||||||
|
|
||||||
|
private val updatesFlow = MutableSharedFlow<DownloadUpdates>()
|
||||||
|
val updates = updatesFlow.onStart { emit(getDownloadUpdates(addInitial = true)) }
|
||||||
|
|
||||||
init {
|
init {
|
||||||
scope.launch {
|
scope.launch {
|
||||||
notifyFlow.sample(1.seconds).collect {
|
notifyFlow.sample(1.seconds).collect {
|
||||||
@@ -147,12 +164,21 @@ object DownloadManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun notifyAllClients(immediate: Boolean = false) {
|
private fun notifyAllClients(
|
||||||
|
immediate: Boolean = false,
|
||||||
|
downloads: List<DownloadUpdate> = emptyList(),
|
||||||
|
) {
|
||||||
|
downloadUpdates.addAll(downloads)
|
||||||
|
|
||||||
if (immediate) {
|
if (immediate) {
|
||||||
val status = getStatus()
|
val status = getStatus()
|
||||||
|
val updates = getDownloadUpdates()
|
||||||
|
|
||||||
|
downloadUpdates.clear()
|
||||||
|
|
||||||
scope.launch {
|
scope.launch {
|
||||||
statusFlow.emit(status)
|
statusFlow.emit(status)
|
||||||
|
updatesFlow.emit(updates)
|
||||||
sendStatusToAllClients(status)
|
sendStatusToAllClients(status)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -164,7 +190,7 @@ object DownloadManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun getStatus(): DownloadStatus =
|
fun getStatus(): DownloadStatus =
|
||||||
DownloadStatus(
|
DownloadStatus(
|
||||||
if (downloadQueue.none { it.state == Downloading }) {
|
if (downloadQueue.none { it.state == Downloading }) {
|
||||||
Status.Stopped
|
Status.Stopped
|
||||||
@@ -174,6 +200,17 @@ object DownloadManager {
|
|||||||
downloadQueue.toList(),
|
downloadQueue.toList(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
private fun getDownloadUpdates(addInitial: Boolean = false): DownloadUpdates =
|
||||||
|
DownloadUpdates(
|
||||||
|
if (downloadQueue.none { it.state == Downloading }) {
|
||||||
|
Status.Stopped
|
||||||
|
} else {
|
||||||
|
Status.Started
|
||||||
|
},
|
||||||
|
downloadUpdates.toList(),
|
||||||
|
if (addInitial) downloadQueue.toList() else null,
|
||||||
|
)
|
||||||
|
|
||||||
private val downloaderWatch = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
private val downloaderWatch = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||||
|
|
||||||
init {
|
init {
|
||||||
@@ -234,7 +271,7 @@ object DownloadManager {
|
|||||||
scope = scope,
|
scope = scope,
|
||||||
sourceId = sourceId,
|
sourceId = sourceId,
|
||||||
downloadQueue = downloadQueue,
|
downloadQueue = downloadQueue,
|
||||||
notifier = ::notifyAllClients,
|
notifier = ::handleDownloadUpdate,
|
||||||
onComplete = ::refreshDownloaders,
|
onComplete = ::refreshDownloaders,
|
||||||
onDownloadFinished = ::triggerSaveDownloadQueue,
|
onDownloadFinished = ::triggerSaveDownloadQueue,
|
||||||
)
|
)
|
||||||
@@ -303,7 +340,7 @@ object DownloadManager {
|
|||||||
val addedChapters = inputs.mapNotNull { addToQueue(it.first, it.second) }
|
val addedChapters = inputs.mapNotNull { addToQueue(it.first, it.second) }
|
||||||
if (addedChapters.isNotEmpty()) {
|
if (addedChapters.isNotEmpty()) {
|
||||||
start()
|
start()
|
||||||
notifyAllClients(true)
|
notifyAllClients(false, addedChapters.map { DownloadUpdate(DownloadUpdateType.QUEUED, it) })
|
||||||
}
|
}
|
||||||
scope.launch {
|
scope.launch {
|
||||||
downloaderWatch.emit(Unit)
|
downloaderWatch.emit(Unit)
|
||||||
@@ -328,6 +365,7 @@ object DownloadManager {
|
|||||||
manga.id,
|
manga.id,
|
||||||
chapter,
|
chapter,
|
||||||
manga,
|
manga,
|
||||||
|
downloadQueue.size,
|
||||||
)
|
)
|
||||||
downloadQueue.add(newDownloadChapter)
|
downloadQueue.add(newDownloadChapter)
|
||||||
triggerSaveDownloadQueue()
|
triggerSaveDownloadQueue()
|
||||||
@@ -374,7 +412,7 @@ object DownloadManager {
|
|||||||
downloadQueue.removeAll(chapterDownloads)
|
downloadQueue.removeAll(chapterDownloads)
|
||||||
triggerSaveDownloadQueue()
|
triggerSaveDownloadQueue()
|
||||||
|
|
||||||
notifyAllClients()
|
notifyAllClients(false, chapterDownloads.toList().map { DownloadUpdate(DownloadUpdateType.DEQUEUED, it) })
|
||||||
}
|
}
|
||||||
|
|
||||||
fun reorder(
|
fun reorder(
|
||||||
@@ -410,6 +448,8 @@ object DownloadManager {
|
|||||||
|
|
||||||
downloadQueue -= download
|
downloadQueue -= download
|
||||||
downloadQueue.add(to, download)
|
downloadQueue.add(to, download)
|
||||||
|
download.position = to
|
||||||
|
notifyAllClients(false, listOf(DownloadUpdate(DownloadUpdateType.POSITION, download)))
|
||||||
triggerSaveDownloadQueue()
|
triggerSaveDownloadQueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -439,9 +479,10 @@ object DownloadManager {
|
|||||||
logger.debug { "clear" }
|
logger.debug { "clear" }
|
||||||
|
|
||||||
stop()
|
stop()
|
||||||
|
val removedDownloads = downloadQueue.toList().map { DownloadUpdate(DownloadUpdateType.DEQUEUED, it) }
|
||||||
downloadQueue.clear()
|
downloadQueue.clear()
|
||||||
triggerSaveDownloadQueue()
|
triggerSaveDownloadQueue()
|
||||||
notifyAllClients()
|
notifyAllClients(false, removedDownloads)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,6 +27,10 @@ import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading
|
|||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Error
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Finished
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Finished
|
||||||
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Queued
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdate
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.ERROR
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.FINISHED
|
||||||
|
import suwayomi.tachidesk.manga.impl.download.model.DownloadUpdateType.PROGRESS
|
||||||
import suwayomi.tachidesk.manga.model.table.ChapterTable
|
import suwayomi.tachidesk.manga.model.table.ChapterTable
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
import java.util.concurrent.CopyOnWriteArrayList
|
||||||
|
|
||||||
@@ -34,7 +38,7 @@ class Downloader(
|
|||||||
private val scope: CoroutineScope,
|
private val scope: CoroutineScope,
|
||||||
val sourceId: String,
|
val sourceId: String,
|
||||||
private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>,
|
private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>,
|
||||||
private val notifier: (immediate: Boolean) -> Unit,
|
private val notifier: (immediate: Boolean, download: DownloadUpdate?) -> Unit,
|
||||||
private val onComplete: () -> Unit,
|
private val onComplete: () -> Unit,
|
||||||
private val onDownloadFinished: () -> Unit,
|
private val onDownloadFinished: () -> Unit,
|
||||||
) {
|
) {
|
||||||
@@ -49,10 +53,11 @@ class Downloader(
|
|||||||
class PauseDownloadException : Exception("Pause download")
|
class PauseDownloadException : Exception("Pause download")
|
||||||
|
|
||||||
private suspend fun step(
|
private suspend fun step(
|
||||||
download: DownloadChapter?,
|
downloadUpdate: DownloadUpdate?,
|
||||||
immediate: Boolean,
|
immediate: Boolean,
|
||||||
) {
|
) {
|
||||||
notifier(immediate)
|
val download = downloadUpdate?.downloadChapter
|
||||||
|
notifier(immediate, downloadUpdate)
|
||||||
currentCoroutineContext().ensureActive()
|
currentCoroutineContext().ensureActive()
|
||||||
if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) {
|
if (download != null && download != availableSourceDownloads.firstOrNull { it.state != Error }) {
|
||||||
if (download in downloadQueue) {
|
if (download in downloadQueue) {
|
||||||
@@ -81,7 +86,7 @@ class Downloader(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.debug { "started" }
|
logger.debug { "started" }
|
||||||
notifier(false)
|
notifier(false, null)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,14 +95,14 @@ class Downloader(
|
|||||||
logger.debug { "stopped" }
|
logger.debug { "stopped" }
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun finishDownload(
|
private fun finishDownload(
|
||||||
logger: KLogger,
|
logger: KLogger,
|
||||||
download: DownloadChapter,
|
download: DownloadChapter,
|
||||||
) {
|
) {
|
||||||
downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex }
|
downloadQueue -= download
|
||||||
step(null, false)
|
notifier(true, DownloadUpdate(FINISHED, download))
|
||||||
logger.debug { "finished" }
|
|
||||||
onDownloadFinished()
|
onDownloadFinished()
|
||||||
|
logger.debug { "finished" }
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun run() {
|
private suspend fun run() {
|
||||||
@@ -122,12 +127,13 @@ class Downloader(
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
download.state = Downloading
|
download.state = Downloading
|
||||||
step(download, true)
|
step(DownloadUpdate(PROGRESS, download), true)
|
||||||
|
|
||||||
download.chapter = getChapterDownloadReadyByIndex(download.chapterIndex, download.mangaId)
|
download.chapter = getChapterDownloadReadyByIndex(download.chapterIndex, download.mangaId)
|
||||||
step(download, false)
|
|
||||||
|
|
||||||
ChapterDownloadHelper.download(download.mangaId, download.chapter.id, download, scope, this::step)
|
ChapterDownloadHelper.download(download.mangaId, download.chapter.id, download, scope) { downloadChapter, immediate ->
|
||||||
|
step(downloadChapter?.let { DownloadUpdate(PROGRESS, downloadChapter) }, immediate)
|
||||||
|
}
|
||||||
download.state = Finished
|
download.state = Finished
|
||||||
transaction {
|
transaction {
|
||||||
ChapterTable.update(
|
ChapterTable.update(
|
||||||
@@ -136,7 +142,6 @@ class Downloader(
|
|||||||
it[isDownloaded] = true
|
it[isDownloaded] = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
step(download, true)
|
|
||||||
finishDownload(downloadLogger, download)
|
finishDownload(downloadLogger, download)
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
logger.debug("Downloader was stopped")
|
logger.debug("Downloader was stopped")
|
||||||
@@ -149,7 +154,7 @@ class Downloader(
|
|||||||
download.tries++
|
download.tries++
|
||||||
download.state = Error
|
download.state = Error
|
||||||
} finally {
|
} finally {
|
||||||
notifier(false)
|
notifier(false, DownloadUpdate(ERROR, download))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ class DownloadChapter(
|
|||||||
val mangaId: Int,
|
val mangaId: Int,
|
||||||
var chapter: ChapterDataClass,
|
var chapter: ChapterDataClass,
|
||||||
var manga: MangaDataClass,
|
var manga: MangaDataClass,
|
||||||
|
var position: Int,
|
||||||
var state: DownloadState = Queued,
|
var state: DownloadState = Queued,
|
||||||
var progress: Float = 0f,
|
var progress: Float = 0f,
|
||||||
var tries: Int = 0,
|
var tries: Int = 0,
|
||||||
|
|||||||
@@ -16,3 +16,9 @@ data class DownloadStatus(
|
|||||||
val status: Status,
|
val status: Status,
|
||||||
val queue: List<DownloadChapter>,
|
val queue: List<DownloadChapter>,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
data class DownloadUpdates(
|
||||||
|
val status: Status,
|
||||||
|
val updates: List<DownloadUpdate>,
|
||||||
|
val initial: List<DownloadChapter>?,
|
||||||
|
)
|
||||||
|
|||||||
@@ -0,0 +1,15 @@
|
|||||||
|
package suwayomi.tachidesk.manga.impl.download.model
|
||||||
|
|
||||||
|
enum class DownloadUpdateType {
|
||||||
|
QUEUED,
|
||||||
|
DEQUEUED,
|
||||||
|
PROGRESS,
|
||||||
|
FINISHED,
|
||||||
|
ERROR,
|
||||||
|
POSITION,
|
||||||
|
}
|
||||||
|
|
||||||
|
data class DownloadUpdate(
|
||||||
|
val type: DownloadUpdateType,
|
||||||
|
val downloadChapter: DownloadChapter,
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user