Subscriptions!

This commit is contained in:
Valter Martinek
2022-11-11 22:56:14 +01:00
committed by Syer10
parent e2fa003239
commit 847a5fe71b
20 changed files with 658 additions and 77 deletions

View File

@@ -67,6 +67,7 @@ dependencies {
implementation("com.expediagroup", "graphql-kotlin-server", "6.3.0")
implementation("com.expediagroup", "graphql-kotlin-schema-generator", "6.3.0")
implementation("com.graphql-java", "graphql-java-extended-scalars", "19.0")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.5.0-RC-native-mt")
testImplementation(libs.mockk)
}

View File

@@ -13,6 +13,7 @@ import suwayomi.tachidesk.graphql.controller.GraphQLController
object GraphQL {
fun defineEndpoints() {
post("graphql", GraphQLController::execute)
ws("graphql", GraphQLController::webSocket)
// graphql playground
get("graphql", GraphQLController::playground)

View File

@@ -7,20 +7,19 @@
package suwayomi.tachidesk.graphql.controller
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.javalin.http.Context
import suwayomi.tachidesk.graphql.impl.getGraphQLServer
import io.javalin.websocket.WsConfig
import suwayomi.tachidesk.graphql.server.TachideskGraphQLServer
import suwayomi.tachidesk.server.JavalinSetup.future
object GraphQLController {
private val mapper = jacksonObjectMapper()
private val tachideskGraphQLServer = getGraphQLServer(mapper)
private val server = TachideskGraphQLServer.create()
/** execute graphql query */
fun execute(ctx: Context) {
ctx.future(
future {
tachideskGraphQLServer.execute(ctx)
server.execute(ctx)
}
)
}
@@ -39,4 +38,13 @@ object GraphQLController {
ctx.html(body ?: "Could not load playground")
}
fun webSocket(ws: WsConfig) {
ws.onMessage { ctx ->
server.handleSubscriptionMessage(ctx)
}
ws.onClose { ctx ->
server.handleSubscriptionDisconnect(ctx)
}
}
}

View File

@@ -1,21 +0,0 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.dataLoaders
import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory
val tachideskDataLoaderRegistryFactory = KotlinDataLoaderRegistryFactory(
MangaDataLoader(),
ChapterDataLoader(),
ChaptersForMangaDataLoader(),
ChapterMetaDataLoader(),
MangaMetaDataLoader(),
MangaForCategoryDataLoader(),
CategoryMetaDataLoader(),
CategoriesForMangaDataLoader()
)

View File

@@ -1,29 +0,0 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.impl
import com.expediagroup.graphql.server.execution.GraphQLRequestHandler
import com.expediagroup.graphql.server.execution.GraphQLServer
import com.fasterxml.jackson.databind.ObjectMapper
import io.javalin.http.Context
import suwayomi.tachidesk.graphql.dataLoaders.tachideskDataLoaderRegistryFactory
class TachideskGraphQLServer(
requestParser: JavalinGraphQLRequestParser,
contextFactory: TachideskGraphQLContextFactory,
requestHandler: GraphQLRequestHandler
) : GraphQLServer<Context>(requestParser, contextFactory, requestHandler)
fun getGraphQLServer(mapper: ObjectMapper): TachideskGraphQLServer {
val requestParser = JavalinGraphQLRequestParser(mapper)
val contextFactory = TachideskGraphQLContextFactory()
val graphQL = getGraphQLObject()
val requestHandler = GraphQLRequestHandler(graphQL, tachideskDataLoaderRegistryFactory)
return TachideskGraphQLServer(requestParser, contextFactory, requestHandler)
}

View File

@@ -5,25 +5,18 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.impl
package suwayomi.tachidesk.graphql.server
import com.expediagroup.graphql.server.execution.GraphQLRequestParser
import com.expediagroup.graphql.server.types.GraphQLServerRequest
import com.fasterxml.jackson.databind.ObjectMapper
import io.javalin.http.Context
import java.io.IOException
/**
* Custom logic for how Javalin parses the incoming [Context] into the [GraphQLServerRequest]
*/
class JavalinGraphQLRequestParser(
private val mapper: ObjectMapper
) : GraphQLRequestParser<Context> {
class JavalinGraphQLRequestParser : GraphQLRequestParser<Context> {
@Suppress("BlockingMethodInNonBlockingContext")
override suspend fun parseRequest(context: Context): GraphQLServerRequest = try {
val rawRequest = context.body()
mapper.readValue(rawRequest, GraphQLServerRequest::class.java)
context.bodyAsClass(GraphQLServerRequest::class.java)
} catch (e: IOException) {
throw IOException("Unable to parse GraphQL payload.")
}

View File

@@ -0,0 +1,28 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server
import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory
import suwayomi.tachidesk.graphql.dataLoaders.*
class TachideskDataLoaderRegistryFactory {
companion object {
fun create(): KotlinDataLoaderRegistryFactory {
return KotlinDataLoaderRegistryFactory(
MangaDataLoader(),
ChapterDataLoader(),
ChaptersForMangaDataLoader(),
ChapterMetaDataLoader(),
MangaMetaDataLoader(),
MangaForCategoryDataLoader(),
CategoryMetaDataLoader(),
CategoriesForMangaDataLoader()
)
}
}
}

View File

@@ -5,27 +5,37 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.impl
package suwayomi.tachidesk.graphql.server
import com.expediagroup.graphql.generator.execution.GraphQLContext
import com.expediagroup.graphql.server.execution.GraphQLContextFactory
import io.javalin.http.Context
import io.javalin.websocket.WsContext
/**
* Custom logic for how Tachidesk should create its context given the [Context]
*/
class TachideskGraphQLContextFactory : GraphQLContextFactory<GraphQLContext, Context> {
override suspend fun generateContextMap(request: Context): Map<*, Any> =
mutableMapOf<Any, Any>(
override suspend fun generateContextMap(request: Context): Map<*, Any> = emptyMap<Any, Any>()
// mutableMapOf<Any, Any>(
// "user" to User(
// email = "fake@site.com",
// firstName = "Someone",
// lastName = "You Don't know",
// universityId = 4
// )
).also { map ->
// ).also { map ->
// request.headers["my-custom-header"]?.let { customHeader ->
// map["customHeader"] = customHeader
// }
}
// }
fun generateContextMap(request: WsContext): Map<*, Any> = emptyMap<Any, Any>()
}
/**
* Create a [GraphQLContext] from [this] map
* @return a new [GraphQLContext]
*/
fun Map<*, Any?>.toGraphQLContext(): graphql.GraphQLContext =
graphql.GraphQLContext.of(this)

View File

@@ -5,20 +5,19 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.impl
package suwayomi.tachidesk.graphql.server
import com.expediagroup.graphql.generator.SchemaGeneratorConfig
import com.expediagroup.graphql.generator.TopLevelObject
import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
import com.expediagroup.graphql.generator.scalars.IDValueUnboxer
import com.expediagroup.graphql.generator.toSchema
import graphql.GraphQL
import graphql.scalars.ExtendedScalars
import graphql.schema.GraphQLType
import suwayomi.tachidesk.graphql.mutations.ChapterMutation
import suwayomi.tachidesk.graphql.queries.CategoryQuery
import suwayomi.tachidesk.graphql.queries.ChapterQuery
import suwayomi.tachidesk.graphql.queries.MangaQuery
import suwayomi.tachidesk.graphql.subscriptions.DownloadSubscription
import kotlin.reflect.KClass
import kotlin.reflect.KType
@@ -42,9 +41,8 @@ val schema = toSchema(
),
mutations = listOf(
TopLevelObject(ChapterMutation())
),
subscriptions = listOf(
TopLevelObject(DownloadSubscription())
)
)
fun getGraphQLObject(): GraphQL = GraphQL.newGraphQL(schema)
.valueUnboxer(IDValueUnboxer())
.build()

View File

@@ -0,0 +1,57 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server
import com.expediagroup.graphql.generator.execution.FlowSubscriptionExecutionStrategy
import com.expediagroup.graphql.server.execution.GraphQLRequestHandler
import com.expediagroup.graphql.server.execution.GraphQLServer
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import graphql.GraphQL
import io.javalin.http.Context
import io.javalin.websocket.WsCloseContext
import io.javalin.websocket.WsMessageContext
import suwayomi.tachidesk.graphql.server.subscriptions.ApolloSubscriptionProtocolHandler
import suwayomi.tachidesk.graphql.server.subscriptions.GraphQLSubscriptionHandler
class TachideskGraphQLServer(
requestParser: JavalinGraphQLRequestParser,
contextFactory: TachideskGraphQLContextFactory,
requestHandler: GraphQLRequestHandler,
subscriptionHandler: GraphQLSubscriptionHandler
) : GraphQLServer<Context>(requestParser, contextFactory, requestHandler) {
private val objectMapper = jacksonObjectMapper()
private val subscriptionProtocolHandler = ApolloSubscriptionProtocolHandler(contextFactory, subscriptionHandler, objectMapper)
fun handleSubscriptionMessage(context: WsMessageContext) {
subscriptionProtocolHandler.handleMessage(context)
.map { objectMapper.writeValueAsString(it) }
.map { context.send(it) }
.subscribe()
}
fun handleSubscriptionDisconnect(context: WsCloseContext) {
subscriptionProtocolHandler.handleDisconnect(context)
}
companion object {
private fun getGraphQLObject(): GraphQL = GraphQL.newGraphQL(schema)
.subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy())
.build()
fun create(): TachideskGraphQLServer {
val graphQL = getGraphQLObject()
val requestParser = JavalinGraphQLRequestParser()
val contextFactory = TachideskGraphQLContextFactory()
val requestHandler = GraphQLRequestHandler(graphQL, TachideskDataLoaderRegistryFactory.create())
val subscriptionHandler = GraphQLSubscriptionHandler(graphQL, TachideskDataLoaderRegistryFactory.create())
return TachideskGraphQLServer(requestParser, contextFactory, requestHandler, subscriptionHandler)
}
}
}

View File

@@ -0,0 +1,194 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server.subscriptions
import com.expediagroup.graphql.server.types.GraphQLRequest
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.convertValue
import com.fasterxml.jackson.module.kotlin.readValue
import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.runBlocking
import org.slf4j.LoggerFactory
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import suwayomi.tachidesk.graphql.server.TachideskGraphQLContextFactory
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ClientMessages.*
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.*
import suwayomi.tachidesk.graphql.server.toGraphQLContext
import java.time.Duration
/**
* Implementation of the `graphql-ws` protocol defined by Apollo
* https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
* ported for Javalin
*/
class ApolloSubscriptionProtocolHandler(
private val contextFactory: TachideskGraphQLContextFactory,
private val subscriptionHandler: GraphQLSubscriptionHandler,
private val objectMapper: ObjectMapper
) {
private val sessionState = ApolloSubscriptionSessionState()
private val logger = LoggerFactory.getLogger(ApolloSubscriptionProtocolHandler::class.java)
private val keepAliveMessage = SubscriptionOperationMessage(type = GQL_CONNECTION_KEEP_ALIVE.type)
private val basicConnectionErrorMessage = SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type)
private val acknowledgeMessage = SubscriptionOperationMessage(GQL_CONNECTION_ACK.type)
@Suppress("Detekt.TooGenericExceptionCaught")
fun handleMessage(context: WsMessageContext): Flux<SubscriptionOperationMessage> {
val operationMessage = convertToMessageOrNull(context.message()) ?: return Flux.just(basicConnectionErrorMessage)
logger.debug("GraphQL subscription client message, sessionId=${context.sessionId} operationMessage=$operationMessage")
return try {
when (operationMessage.type) {
GQL_CONNECTION_INIT.type -> onInit(operationMessage, context)
GQL_START.type -> startSubscription(operationMessage, context)
GQL_STOP.type -> onStop(operationMessage, context)
GQL_CONNECTION_TERMINATE.type -> onDisconnect(context)
else -> onUnknownOperation(operationMessage, context)
}
} catch (exception: Exception) {
onException(exception)
}
}
fun handleDisconnect(context: WsContext) {
onDisconnect(context)
}
@Suppress("Detekt.TooGenericExceptionCaught")
private fun convertToMessageOrNull(payload: String): SubscriptionOperationMessage? {
return try {
objectMapper.readValue(payload)
} catch (exception: Exception) {
logger.error("Error parsing the subscription message", exception)
null
}
}
/**
* If the keep alive configuration is set, send a message back to client at every interval until the session is terminated.
* Otherwise just return empty flux to append to the acknowledge message.
*/
private fun getKeepAliveFlux(context: WsContext): Flux<SubscriptionOperationMessage> {
val keepAliveInterval: Long? = 2000
if (keepAliveInterval != null) {
return Flux.interval(Duration.ofMillis(keepAliveInterval))
.map { keepAliveMessage }
.doOnSubscribe { sessionState.saveKeepAliveSubscription(context, it) }
}
return Flux.empty()
}
@Suppress("Detekt.TooGenericExceptionCaught")
private fun startSubscription(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Flux<SubscriptionOperationMessage> {
val graphQLContext = sessionState.getGraphQLContext(context)
if (operationMessage.id == null) {
logger.error("GraphQL subscription operation id is required")
return Flux.just(basicConnectionErrorMessage)
}
if (sessionState.doesOperationExist(context, operationMessage)) {
logger.info("Already subscribed to operation ${operationMessage.id} for session ${context.sessionId}")
return Flux.empty()
}
val payload = operationMessage.payload
if (payload == null) {
logger.error("GraphQL subscription payload was null instead of a GraphQLRequest object")
sessionState.stopOperation(context, operationMessage)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
try {
val request = objectMapper.convertValue<GraphQLRequest>(payload)
return subscriptionHandler.executeSubscription(request, graphQLContext)
.asFlux()
.map {
if (it.errors?.isNotEmpty() == true) {
SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it)
} else {
SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it)
}
}
.concatWith(onComplete(operationMessage, context).toFlux())
.doOnSubscribe { sessionState.saveOperation(context, operationMessage, it) }
} catch (exception: Exception) {
logger.error("Error running graphql subscription", exception)
// Do not terminate the session, just stop the operation messages
sessionState.stopOperation(context, operationMessage)
return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id))
}
}
private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux<SubscriptionOperationMessage> {
saveContext(operationMessage, context)
val acknowledgeMessage = Mono.just(acknowledgeMessage)
val keepAliveFlux = getKeepAliveFlux(context)
return acknowledgeMessage.concatWith(keepAliveFlux)
.onErrorReturn(getConnectionErrorMessage(operationMessage))
}
/**
* Generate the context and save it for all future messages.
*/
private fun saveContext(operationMessage: SubscriptionOperationMessage, context: WsContext) {
runBlocking {
val graphQLContext = contextFactory.generateContextMap(context).toGraphQLContext()
sessionState.saveContext(context, graphQLContext)
}
}
/**
* Called with the publisher has completed on its own.
*/
private fun onComplete(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Mono<SubscriptionOperationMessage> {
return sessionState.completeOperation(context, operationMessage)
}
/**
* Called with the client has called stop manually, or on error, and we need to cancel the publisher
*/
private fun onStop(
operationMessage: SubscriptionOperationMessage,
context: WsContext
): Flux<SubscriptionOperationMessage> {
return sessionState.stopOperation(context, operationMessage).toFlux()
}
private fun onDisconnect(context: WsContext): Flux<SubscriptionOperationMessage> {
sessionState.terminateSession(context)
return Flux.empty()
}
private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux<SubscriptionOperationMessage> {
logger.error("Unknown subscription operation $operationMessage")
sessionState.stopOperation(context, operationMessage)
return Flux.just(getConnectionErrorMessage(operationMessage))
}
private fun onException(exception: Exception): Flux<SubscriptionOperationMessage> {
logger.error("Error parsing the subscription message", exception)
return Flux.just(basicConnectionErrorMessage)
}
private fun getConnectionErrorMessage(operationMessage: SubscriptionOperationMessage): SubscriptionOperationMessage {
return SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)
}
}

View File

@@ -0,0 +1,125 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server.subscriptions
import graphql.GraphQLContext
import io.javalin.websocket.WsContext
import org.reactivestreams.Subscription
import reactor.core.publisher.Mono
import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE
import suwayomi.tachidesk.graphql.server.toGraphQLContext
import java.util.concurrent.ConcurrentHashMap
internal class ApolloSubscriptionSessionState {
// Sessions are saved by web socket session id
internal val activeKeepAliveSessions = ConcurrentHashMap<String, Subscription>()
// Operations are saved by web socket session id, then operation id
internal val activeOperations = ConcurrentHashMap<String, ConcurrentHashMap<String, Subscription>>()
// The graphQL context is saved by web socket session id
private val cachedGraphQLContext = ConcurrentHashMap<String, GraphQLContext>()
/**
* Save the context created from the factory and possibly updated in the onConnect hook.
* This allows us to include some initial state to be used when handling all the messages.
* This will be removed in [terminateSession].
*/
fun saveContext(context: WsContext, graphQLContext: GraphQLContext) {
cachedGraphQLContext[context.sessionId] = graphQLContext
}
/**
* Return the graphQL context for this session.
*/
fun getGraphQLContext(context: WsContext): GraphQLContext = cachedGraphQLContext[context.sessionId] ?: emptyMap<Any, Any>().toGraphQLContext()
/**
* Save the session that is sending keep alive messages.
* This will override values without cancelling the subscription, so it is the responsibility of the consumer to cancel.
* These messages will be stopped on [terminateSession].
*/
fun saveKeepAliveSubscription(context: WsContext, subscription: Subscription) {
activeKeepAliveSessions[context.sessionId] = subscription
}
/**
* Save the operation that is sending data to the client.
* This will override values without cancelling the subscription so it is the responsibility of the consumer to cancel.
* These messages will be stopped on [stopOperation].
*/
fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Subscription) {
val id = operationMessage.id
if (id != null) {
val operationsForSession: ConcurrentHashMap<String, Subscription> = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() }
operationsForSession[id] = subscription
}
}
/**
* Send the [GQL_COMPLETE] message.
* This can happen when the publisher finishes or if the client manually sends the stop message.
*/
fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
return getCompleteMessage(operationMessage)
.doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) }
}
/**
* Stop the subscription sending data and send the [GQL_COMPLETE] message.
* Does NOT terminate the session.
*/
fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
return getCompleteMessage(operationMessage)
.doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) }
}
private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Mono<SubscriptionOperationMessage> {
val id = operationMessage.id
if (id != null) {
return Mono.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id))
}
return Mono.empty()
}
/**
* Remove active running subscription from the cache and cancel if needed
*/
private fun removeActiveOperation(context: WsContext, id: String?, cancelSubscription: Boolean) {
val operationsForSession = activeOperations[context.sessionId]
val subscription = operationsForSession?.get(id)
if (subscription != null) {
if (cancelSubscription) {
subscription.cancel()
}
operationsForSession.remove(id)
if (operationsForSession.isEmpty()) {
activeOperations.remove(context.sessionId)
}
}
}
/**
* Terminate the session, cancelling the keep alive messages and all operations active for this session.
*/
fun terminateSession(context: WsContext) {
activeOperations[context.sessionId]?.forEach { (_, subscription) -> subscription.cancel() }
activeOperations.remove(context.sessionId)
cachedGraphQLContext.remove(context.sessionId)
activeKeepAliveSessions[context.sessionId]?.cancel()
activeKeepAliveSessions.remove(context.sessionId)
context.closeSession()
}
/**
* Looks up the operation for the client, to check if it already exists
*/
fun doesOperationExist(context: WsContext, operationMessage: SubscriptionOperationMessage): Boolean =
activeOperations[context.sessionId]?.containsKey(operationMessage.id) ?: false
}

View File

@@ -0,0 +1,20 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server.subscriptions
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
class FluxSubscriptionSource<T : Any>() {
private var sink: FluxSink<T>? = null
val emitter: Flux<T> = Flux.create<T> { emitter -> sink = emitter }
fun publish(value: T) {
sink?.next(value)
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server.subscriptions
import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory
import com.expediagroup.graphql.server.extensions.toExecutionInput
import com.expediagroup.graphql.server.extensions.toGraphQLError
import com.expediagroup.graphql.server.extensions.toGraphQLKotlinType
import com.expediagroup.graphql.server.extensions.toGraphQLResponse
import com.expediagroup.graphql.server.types.GraphQLRequest
import com.expediagroup.graphql.server.types.GraphQLResponse
import graphql.ExecutionResult
import graphql.GraphQL
import graphql.GraphQLContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.map
open class GraphQLSubscriptionHandler(
private val graphQL: GraphQL,
private val dataLoaderRegistryFactory: KotlinDataLoaderRegistryFactory? = null
) {
open fun executeSubscription(
graphQLRequest: GraphQLRequest,
graphQLContext: GraphQLContext = GraphQLContext.of(emptyMap<Any, Any>())
): Flow<GraphQLResponse<*>> {
val dataLoaderRegistry = dataLoaderRegistryFactory?.generate()
val input = graphQLRequest.toExecutionInput(dataLoaderRegistry, graphQLContext)
val res = graphQL.execute(input)
val data = res.getData<Flow<ExecutionResult>>()
val mapped = data.map { result -> result.toGraphQLResponse() }
return mapped.catch { throwable ->
val error = throwable.toGraphQLError()
emit(GraphQLResponse<Any?>(errors = listOf(error.toGraphQLKotlinType())))
}
}
}

View File

@@ -0,0 +1,39 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.server.subscriptions
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
/**
* The `graphql-ws` protocol from Apollo Client has some special text messages to signal events.
* Along with the HTTP WebSocket event handling we need to have some extra logic
*
* https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md
*/
@JsonIgnoreProperties(ignoreUnknown = true)
data class SubscriptionOperationMessage(
val type: String,
val id: String? = null,
val payload: Any? = null
) {
enum class ClientMessages(val type: String) {
GQL_CONNECTION_INIT("connection_init"),
GQL_START("start"),
GQL_STOP("stop"),
GQL_CONNECTION_TERMINATE("connection_terminate")
}
enum class ServerMessages(val type: String) {
GQL_CONNECTION_ACK("connection_ack"),
GQL_CONNECTION_ERROR("connection_error"),
GQL_DATA("data"),
GQL_ERROR("error"),
GQL_COMPLETE("complete"),
GQL_CONNECTION_KEEP_ALIVE("ka")
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.subscriptions
import graphql.schema.DataFetchingEnvironment
import reactor.core.publisher.Flux
import suwayomi.tachidesk.graphql.server.subscriptions.FluxSubscriptionSource
import suwayomi.tachidesk.graphql.types.DownloadType
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
val downloadSubscriptionSource = FluxSubscriptionSource<DownloadChapter>()
class DownloadSubscription {
fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flux<DownloadType> {
return downloadSubscriptionSource.emitter.map { downloadChapter ->
DownloadType(downloadChapter)
}
}
}

View File

@@ -10,6 +10,7 @@ package suwayomi.tachidesk.graphql.types
import com.expediagroup.graphql.server.extensions.getValueFromDataLoader
import graphql.schema.DataFetchingEnvironment
import org.jetbrains.exposed.sql.ResultRow
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
import suwayomi.tachidesk.manga.model.table.ChapterTable
import java.util.concurrent.CompletableFuture
@@ -50,6 +51,24 @@ class ChapterType(
// transaction { ChapterTable.select { manga eq chapterEntry[manga].value }.count().toInt() },
)
constructor(dataClass: ChapterDataClass) : this(
dataClass.id,
dataClass.url,
dataClass.name,
dataClass.uploadDate,
dataClass.chapterNumber,
dataClass.scanlator,
dataClass.mangaId,
dataClass.read,
dataClass.bookmarked,
dataClass.lastPageRead,
dataClass.lastReadAt,
dataClass.index,
dataClass.fetchedAt,
dataClass.downloaded,
dataClass.pageCount
)
fun manga(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<MangaType> {
return dataFetchingEnvironment.getValueFromDataLoader<Int, MangaType>("MangaDataLoader", mangaId)
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright (C) Contributors to the Suwayomi project
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */
package suwayomi.tachidesk.graphql.types
import com.expediagroup.graphql.generator.annotations.GraphQLIgnore
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadState
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
class DownloadType(
val chapterId: Int,
val chapterIndex: Int,
val mangaId: Int,
var state: DownloadState = DownloadState.Queued,
var progress: Float = 0f,
var tries: Int = 0,
@GraphQLIgnore
var mangaDataClass: MangaDataClass,
@GraphQLIgnore
var chapterDataClass: ChapterDataClass
) {
constructor(downloadChapter: DownloadChapter) : this(
downloadChapter.chapter.id,
downloadChapter.chapterIndex,
downloadChapter.mangaId,
downloadChapter.state,
downloadChapter.progress,
downloadChapter.tries,
downloadChapter.manga,
downloadChapter.chapter
)
fun manga(): MangaType {
return MangaType(mangaDataClass)
}
fun chapter(): ChapterType {
return ChapterType(chapterDataClass)
}
}

View File

@@ -10,6 +10,7 @@ package suwayomi.tachidesk.graphql.types
import com.expediagroup.graphql.server.extensions.getValueFromDataLoader
import graphql.schema.DataFetchingEnvironment
import org.jetbrains.exposed.sql.ResultRow
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
import suwayomi.tachidesk.manga.model.dataclass.toGenreList
import suwayomi.tachidesk.manga.model.table.MangaStatus
import suwayomi.tachidesk.manga.model.table.MangaTable
@@ -53,6 +54,25 @@ class MangaType(
row[MangaTable.chaptersLastFetchedAt]
)
constructor(dataClass: MangaDataClass) : this(
dataClass.id,
dataClass.sourceId,
dataClass.url,
dataClass.title,
dataClass.thumbnailUrl,
dataClass.initialized,
dataClass.artist,
dataClass.author,
dataClass.description,
dataClass.genre,
dataClass.status,
dataClass.inLibrary,
dataClass.inLibraryAt,
dataClass.realUrl,
dataClass.lastFetchedAt,
dataClass.chaptersLastFetchedAt
)
fun chapters(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<List<ChapterType>> {
return dataFetchingEnvironment.getValueFromDataLoader<Int, List<ChapterType>>("ChaptersForMangaDataLoader", id)
}

View File

@@ -24,6 +24,7 @@ import mu.KotlinLogging
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction
import suwayomi.tachidesk.graphql.subscriptions.downloadSubscriptionSource
import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter
import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading
import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus
@@ -100,6 +101,9 @@ object DownloadManager {
notifyFlow.emit(Unit)
}
}
/*if (downloadChapter != null) { TODO GRAPHQL
downloadSubscriptionSource.publish(downloadChapter)
}*/
}
private fun getStatus(): DownloadStatus {
@@ -234,6 +238,7 @@ object DownloadManager {
manga
)
downloadQueue.add(downloadChapter)
downloadSubscriptionSource.publish(downloadChapter)
logger.debug { "Added chapter ${chapter.id} to download queue (${manga.title} | ${chapter.name})" }
return downloadChapter
}