Improve listener system

This commit is contained in:
Syer10
2025-10-07 12:33:24 -04:00
parent 2a8d937992
commit 1641a0e9f4
21 changed files with 102 additions and 111 deletions

View File

@@ -15,7 +15,6 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.onStart
@@ -34,16 +33,6 @@ class ServerListeners {
)
val mangaListener = _mangaListener.asSharedFlow()
private val _chapterIdsListener = MutableSharedFlow<List<Long>>(
extraBufferCapacity = Channel.UNLIMITED,
)
val chapterIdsListener = _chapterIdsListener.asSharedFlow()
private val _mangaChapterIdsListener = MutableSharedFlow<List<Long>>(
extraBufferCapacity = Channel.UNLIMITED,
)
val mangaChapterIdsListener = _mangaChapterIdsListener.asSharedFlow()
private val categoryMangaListener = MutableSharedFlow<Long>(
extraBufferCapacity = Channel.UNLIMITED,
)
@@ -65,9 +54,23 @@ class ServerListeners {
.buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.flatMapLatest { flow }
fun updateManga(vararg ids: Long) {
fun updateManga(ids: List<Long>) {
val ids = ids.filter { id -> id >= 0 }
if (ids.isEmpty()) {
return
}
scope.launch {
_mangaListener.emit(ids.toList())
_mangaListener.emit(ids)
}
}
fun updateManga(vararg ids: Long) {
val ids = ids.filter { id -> id >= 0 }
if (ids.isEmpty()) {
return
}
scope.launch {
_mangaListener.emit(ids)
}
}
@@ -88,36 +91,6 @@ class ServerListeners {
}
}
fun <T> combineChapters(
flow: Flow<T>,
chapterIdPredate: (suspend (List<Long>) -> Boolean)? = null,
mangaIdPredate: (suspend (List<Long>) -> Boolean)? = null,
): Flow<T> {
val idsListener = _chapterIdsListener
.filter { chapterIdPredate?.invoke(it) ?: false }
.startWith(Unit)
.combine(
_mangaChapterIdsListener.filter { mangaIdPredate?.invoke(it) ?: false }
.startWith(Unit),
) { _, _ -> }
return idsListener
.buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
.flatMapLatest { flow }
}
fun updateChapters(chapterIds: List<Long>) {
scope.launch {
_chapterIdsListener.emit(chapterIds)
}
}
fun updateChapters(vararg chapterIds: Long) {
scope.launch {
_chapterIdsListener.emit(chapterIds.toList())
}
}
companion object {
private val log = logging()
}

View File

@@ -23,8 +23,9 @@ class DeleteChapterDownload(
) {
suspend fun await(
chapterId: Long,
mangaId: Long?,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterId)
) = asFlow(chapterId, mangaId)
.catch {
onError(it)
log.warn(it) { "Failed to delete chapter download for $chapterId" }
@@ -44,8 +45,9 @@ class DeleteChapterDownload(
suspend fun await(
chapterIds: List<Long>,
mangaIds: List<Long>?,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterIds)
) = asFlow(chapterIds, mangaIds)
.catch {
onError(it)
log.warn(it) { "Failed to delete chapter download for $chapterIds" }
@@ -63,23 +65,23 @@ class DeleteChapterDownload(
}
.collect()
fun asFlow(chapterId: Long) =
fun asFlow(chapterId: Long, mangaId: Long?) =
chapterRepository.deleteDownloadedChapter(chapterId)
.onEach { serverListeners.updateChapters(chapterId) }
.onEach { serverListeners.updateManga(mangaId ?: -1) }
@JvmName("asFlowChapter")
fun asFlow(chapter: Chapter) =
chapterRepository.deleteDownloadedChapter(chapter.id)
.onEach { serverListeners.updateChapters(chapter.id) }
.onEach { serverListeners.updateManga(chapter.mangaId) }
fun asFlow(chapterIds: List<Long>) =
fun asFlow(chapterIds: List<Long>, mangaIds: List<Long>?) =
chapterRepository.deleteDownloadedChapters(chapterIds)
.onEach { serverListeners.updateChapters(chapterIds) }
.onEach { serverListeners.updateManga(mangaIds.orEmpty()) }
@JvmName("asFlowChapters")
fun asFlow(chapter: List<Chapter>) =
chapterRepository.deleteDownloadedChapters(chapter.map { it.id })
.onEach { serverListeners.updateChapters(chapter.map { it.id }) }
.onEach { serverListeners.updateManga(chapter.map { it.mangaId }) }
companion object {
private val log = logging()

View File

@@ -22,8 +22,9 @@ class GetChapter(
) {
suspend fun await(
chapterId: Long,
mangaId: Long?,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterId)
) = asFlow(chapterId, mangaId)
.take(1)
.catch {
onError(it)
@@ -42,16 +43,16 @@ class GetChapter(
}
.singleOrNull()
fun asFlow(chapterId: Long) =
serverListeners.combineChapters(
fun asFlow(chapterId: Long, mangaId: Long?) =
serverListeners.combineMangaUpdates(
chapterRepository.getChapter(chapterId),
chapterIdPredate = { ids -> chapterId in ids },
predate = { ids -> mangaId in ids },
)
fun asFlow(chapter: Chapter) =
serverListeners.combineChapters(
serverListeners.combineMangaUpdates(
chapterRepository.getChapter(chapter.id),
chapterIdPredate = { ids -> chapter.id in ids },
predate = { ids -> chapter.mangaId in ids },
)
companion object {

View File

@@ -43,15 +43,15 @@ class GetChapters(
.singleOrNull()
fun asFlow(mangaId: Long) =
serverListeners.combineChapters(
serverListeners.combineMangaUpdates(
chapterRepository.getChapters(mangaId),
chapterIdPredate = { ids -> false }, // todo
predate = { ids -> mangaId in ids },
)
fun asFlow(manga: Manga) =
serverListeners.combineChapters(
serverListeners.combineMangaUpdates(
chapterRepository.getChapters(manga.id),
chapterIdPredate = { ids -> false }, // todo
predate = { ids -> manga.id in ids },
)
companion object {

View File

@@ -40,13 +40,13 @@ class RefreshChapters(
}
.singleOrNull()
fun asFlow(mangaId: Long) =
fun asFlow(mangaId: Long, ) =
chapterRepository.fetchChapters(mangaId)
.onEach { serverListeners.updateChapters(mangaId) }
.onEach { serverListeners.updateManga(mangaId) }
fun asFlow(manga: Manga) =
chapterRepository.fetchChapters(manga.id)
.onEach { serverListeners.updateChapters(manga.id) }
.onEach { serverListeners.updateManga(manga.id) }
companion object {
private val log = logging()

View File

@@ -23,11 +23,12 @@ class UpdateChapter(
) {
suspend fun await(
chapterId: Long,
mangaId: Long?,
bookmarked: Boolean? = null,
read: Boolean? = null,
lastPageRead: Int? = null,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterId, bookmarked, read, lastPageRead)
) = asFlow(chapterId, mangaId, bookmarked, read, lastPageRead)
.catch {
onError(it)
log.warn(it) { "Failed to update chapter bookmark for chapter $chapterId" }
@@ -49,11 +50,12 @@ class UpdateChapter(
suspend fun await(
chapterIds: List<Long>,
mangaIds: List<Long>?,
bookmarked: Boolean? = null,
read: Boolean? = null,
lastPageRead: Int? = null,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterIds, bookmarked, read, lastPageRead)
) = asFlow(chapterIds, mangaIds, bookmarked, read, lastPageRead)
.catch {
onError(it)
log.warn(it) { "Failed to update chapter bookmark for chapters $chapterIds" }
@@ -76,6 +78,7 @@ class UpdateChapter(
fun asFlow(
chapterId: Long,
mangaId: Long?,
bookmarked: Boolean? = null,
read: Boolean? = null,
lastPageRead: Int? = null,
@@ -84,7 +87,7 @@ class UpdateChapter(
bookmarked = bookmarked,
read = read,
lastPageRead = lastPageRead,
).onEach { serverListeners.updateChapters(chapterId) }
).onEach { serverListeners.updateManga(mangaId ?: -1) }
fun asFlow(
chapter: Chapter,
@@ -96,10 +99,11 @@ class UpdateChapter(
bookmarked = bookmarked,
read = read,
lastPageRead = lastPageRead,
).onEach { serverListeners.updateChapters(chapter.id) }
).onEach { serverListeners.updateManga(chapter.mangaId) }
fun asFlow(
chapterIds: List<Long>,
mangaIds: List<Long>?,
bookmarked: Boolean? = null,
read: Boolean? = null,
lastPageRead: Int? = null,
@@ -108,7 +112,7 @@ class UpdateChapter(
bookmarked = bookmarked,
read = read,
lastPageRead = lastPageRead,
).onEach { serverListeners.updateChapters(chapterIds) }
).onEach { serverListeners.updateManga(mangaIds.orEmpty()) }
@JvmName("asFlowChapters")
fun asFlow(
@@ -121,7 +125,7 @@ class UpdateChapter(
bookmarked = bookmarked,
read = read,
lastPageRead = lastPageRead,
).onEach { serverListeners.updateChapters(chapters.map { it.id }) }
).onEach { serverListeners.updateManga(chapters.map { it.mangaId }.distinct()) }
companion object {
private val log = logging()

View File

@@ -22,9 +22,10 @@ class UpdateChapterLastPageRead(
) {
suspend fun await(
chapterId: Long,
mangaId: Long?,
lastPageRead: Int,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterId, lastPageRead)
) = asFlow(chapterId, mangaId, lastPageRead)
.catch {
onError(it)
log.warn(it) { "Failed to update chapter last page read for chapter $chapterId" }
@@ -44,11 +45,12 @@ class UpdateChapterLastPageRead(
fun asFlow(
chapterId: Long,
mangaId: Long?,
lastPageRead: Int,
) = chapterRepository.updateChapter(
chapterId = chapterId,
lastPageRead = lastPageRead,
).onEach { serverListeners.updateChapters(chapterId) }
).onEach { serverListeners.updateManga(mangaId ?: -1) }
fun asFlow(
chapter: Chapter,
@@ -56,7 +58,7 @@ class UpdateChapterLastPageRead(
) = chapterRepository.updateChapter(
chapterId = chapter.id,
lastPageRead = lastPageRead,
).onEach { serverListeners.updateChapters(chapter.id) }
).onEach { serverListeners.updateManga(chapter.mangaId) }
companion object {
private val log = logging()

View File

@@ -41,7 +41,7 @@ class UpdateChapterMeta(
"juiPageOffset",
pageOffset.toString(),
).collect()
serverListeners.updateChapters(chapter.id)
serverListeners.updateManga(chapter.mangaId)
}
emit(Unit)
}

View File

@@ -22,9 +22,10 @@ class UpdateChapterRead(
) {
suspend fun await(
chapterId: Long,
mangaId: Long?,
read: Boolean,
onError: suspend (Throwable) -> Unit = {},
) = asFlow(chapterId, read)
) = asFlow(chapterId, mangaId, read)
.catch {
onError(it)
log.warn(it) { "Failed to update chapter read status for chapter $chapterId" }
@@ -44,19 +45,21 @@ class UpdateChapterRead(
fun asFlow(
chapterId: Long,
mangaId: Long?,
read: Boolean,
) = chapterRepository.updateChapter(
chapterId = chapterId,
read = read,
).onEach { serverListeners.updateChapters(chapterId) }
).onEach { serverListeners.updateManga(mangaId ?: -1) }
fun asFlow(
chapterIds: List<Long>,
mangaIds: List<Long>?,
read: Boolean,
) = chapterRepository.updateChapters(
chapterIds = chapterIds,
read = read,
).onEach { serverListeners.updateChapters(chapterIds) }
).onEach { serverListeners.updateManga(mangaIds.orEmpty()) }
fun asFlow(
chapter: Chapter,
@@ -64,7 +67,7 @@ class UpdateChapterRead(
) = chapterRepository.updateChapter(
chapterId = chapter.id,
read = read,
).onEach { serverListeners.updateChapters(chapter.id) }
).onEach { serverListeners.updateManga(chapter.mangaId) }
companion object {
private val log = logging()

View File

@@ -23,12 +23,9 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.runningFold
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.datetime.LocalDate
@@ -96,20 +93,20 @@ class UpdatesPager(
private val changedChapters = MutableStateFlow(emptyMap<Long, Chapter>())
init {
serverListeners.chapterIdsListener
.onEach { updatedChapterIds ->
val chapters = coroutineScope {
updatedChapterIds.mapNotNull { id -> chapterIds.value.find { it == id } }.map {
async {
getChapter.await(it)
}
}.awaitAll().filterNotNull().associateBy { it.id }
}
changedChapters.update { it + chapters }
}
.launchIn(this)
}
// init {
// serverListeners.chapterIdsListener
// .onEach { updatedChapterIds ->
// val chapters = coroutineScope {
// updatedChapterIds.mapNotNull { id -> chapterIds.value.find { it == id } }.map {
// async {
// getChapter.await(it)
// }
// }.awaitAll().filterNotNull().associateBy { it.id }
// }
// changedChapters.update { it + chapters }
// }
// .launchIn(this)
// }
val updates = combine(
foldedUpdates,