diff --git a/server/build.gradle.kts b/server/build.gradle.kts index cd386dee..cbca5f9f 100644 --- a/server/build.gradle.kts +++ b/server/build.gradle.kts @@ -67,6 +67,7 @@ dependencies { implementation("com.expediagroup", "graphql-kotlin-server", "6.3.0") implementation("com.expediagroup", "graphql-kotlin-schema-generator", "6.3.0") implementation("com.graphql-java", "graphql-java-extended-scalars", "19.0") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.5.0-RC-native-mt") testImplementation(libs.mockk) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/GraphQL.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/GraphQL.kt index 46600e72..8e0225e3 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/GraphQL.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/GraphQL.kt @@ -13,6 +13,7 @@ import suwayomi.tachidesk.graphql.controller.GraphQLController object GraphQL { fun defineEndpoints() { post("graphql", GraphQLController::execute) + ws("graphql", GraphQLController::webSocket) // graphql playground get("graphql", GraphQLController::playground) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/controller/GraphQLController.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/controller/GraphQLController.kt index 5dd3812a..433b64d6 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/controller/GraphQLController.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/controller/GraphQLController.kt @@ -7,20 +7,19 @@ package suwayomi.tachidesk.graphql.controller -import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.javalin.http.Context -import suwayomi.tachidesk.graphql.impl.getGraphQLServer +import io.javalin.websocket.WsConfig +import suwayomi.tachidesk.graphql.server.TachideskGraphQLServer import suwayomi.tachidesk.server.JavalinSetup.future object GraphQLController { - private val mapper = jacksonObjectMapper() - private val tachideskGraphQLServer = getGraphQLServer(mapper) + private val server = TachideskGraphQLServer.create() /** execute graphql query */ fun execute(ctx: Context) { ctx.future( future { - tachideskGraphQLServer.execute(ctx) + server.execute(ctx) } ) } @@ -39,4 +38,13 @@ object GraphQLController { ctx.html(body ?: "Could not load playground") } + + fun webSocket(ws: WsConfig) { + ws.onMessage { ctx -> + server.handleSubscriptionMessage(ctx) + } + ws.onClose { ctx -> + server.handleSubscriptionDisconnect(ctx) + } + } } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/TachideskDataLoaderRegistryFactory.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/TachideskDataLoaderRegistryFactory.kt deleted file mode 100644 index a035f03d..00000000 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/dataLoaders/TachideskDataLoaderRegistryFactory.kt +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (C) Contributors to the Suwayomi project - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ - -package suwayomi.tachidesk.graphql.dataLoaders - -import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory - -val tachideskDataLoaderRegistryFactory = KotlinDataLoaderRegistryFactory( - MangaDataLoader(), - ChapterDataLoader(), - ChaptersForMangaDataLoader(), - ChapterMetaDataLoader(), - MangaMetaDataLoader(), - MangaForCategoryDataLoader(), - CategoryMetaDataLoader(), - CategoriesForMangaDataLoader() -) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLServer.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLServer.kt deleted file mode 100644 index dbe2c6c7..00000000 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLServer.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (C) Contributors to the Suwayomi project - * - * This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ - -package suwayomi.tachidesk.graphql.impl - -import com.expediagroup.graphql.server.execution.GraphQLRequestHandler -import com.expediagroup.graphql.server.execution.GraphQLServer -import com.fasterxml.jackson.databind.ObjectMapper -import io.javalin.http.Context -import suwayomi.tachidesk.graphql.dataLoaders.tachideskDataLoaderRegistryFactory - -class TachideskGraphQLServer( - requestParser: JavalinGraphQLRequestParser, - contextFactory: TachideskGraphQLContextFactory, - requestHandler: GraphQLRequestHandler -) : GraphQLServer(requestParser, contextFactory, requestHandler) - -fun getGraphQLServer(mapper: ObjectMapper): TachideskGraphQLServer { - val requestParser = JavalinGraphQLRequestParser(mapper) - val contextFactory = TachideskGraphQLContextFactory() - val graphQL = getGraphQLObject() - val requestHandler = GraphQLRequestHandler(graphQL, tachideskDataLoaderRegistryFactory) - - return TachideskGraphQLServer(requestParser, contextFactory, requestHandler) -} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/JavalinGraphQLRequestParser.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt similarity index 63% rename from server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/JavalinGraphQLRequestParser.kt rename to server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt index ad9b976f..7dc243b4 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/JavalinGraphQLRequestParser.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/JavalinGraphQLRequestParser.kt @@ -5,25 +5,18 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -package suwayomi.tachidesk.graphql.impl +package suwayomi.tachidesk.graphql.server import com.expediagroup.graphql.server.execution.GraphQLRequestParser import com.expediagroup.graphql.server.types.GraphQLServerRequest -import com.fasterxml.jackson.databind.ObjectMapper import io.javalin.http.Context import java.io.IOException -/** - * Custom logic for how Javalin parses the incoming [Context] into the [GraphQLServerRequest] - */ -class JavalinGraphQLRequestParser( - private val mapper: ObjectMapper -) : GraphQLRequestParser { +class JavalinGraphQLRequestParser : GraphQLRequestParser { @Suppress("BlockingMethodInNonBlockingContext") override suspend fun parseRequest(context: Context): GraphQLServerRequest = try { - val rawRequest = context.body() - mapper.readValue(rawRequest, GraphQLServerRequest::class.java) + context.bodyAsClass(GraphQLServerRequest::class.java) } catch (e: IOException) { throw IOException("Unable to parse GraphQL payload.") } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskDataLoaderRegistryFactory.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskDataLoaderRegistryFactory.kt new file mode 100644 index 00000000..1186a1ce --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskDataLoaderRegistryFactory.kt @@ -0,0 +1,28 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server + +import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory +import suwayomi.tachidesk.graphql.dataLoaders.* + +class TachideskDataLoaderRegistryFactory { + companion object { + fun create(): KotlinDataLoaderRegistryFactory { + return KotlinDataLoaderRegistryFactory( + MangaDataLoader(), + ChapterDataLoader(), + ChaptersForMangaDataLoader(), + ChapterMetaDataLoader(), + MangaMetaDataLoader(), + MangaForCategoryDataLoader(), + CategoryMetaDataLoader(), + CategoriesForMangaDataLoader() + ) + } + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLContextFactory.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLContextFactory.kt similarity index 69% rename from server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLContextFactory.kt rename to server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLContextFactory.kt index 970c6c8f..34147ae8 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLContextFactory.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLContextFactory.kt @@ -5,27 +5,37 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -package suwayomi.tachidesk.graphql.impl +package suwayomi.tachidesk.graphql.server import com.expediagroup.graphql.generator.execution.GraphQLContext import com.expediagroup.graphql.server.execution.GraphQLContextFactory import io.javalin.http.Context +import io.javalin.websocket.WsContext /** * Custom logic for how Tachidesk should create its context given the [Context] */ class TachideskGraphQLContextFactory : GraphQLContextFactory { - override suspend fun generateContextMap(request: Context): Map<*, Any> = - mutableMapOf( + override suspend fun generateContextMap(request: Context): Map<*, Any> = emptyMap() +// mutableMapOf( // "user" to User( // email = "fake@site.com", // firstName = "Someone", // lastName = "You Don't know", // universityId = 4 // ) - ).also { map -> +// ).also { map -> // request.headers["my-custom-header"]?.let { customHeader -> // map["customHeader"] = customHeader // } - } +// } + + fun generateContextMap(request: WsContext): Map<*, Any> = emptyMap() } + +/** + * Create a [GraphQLContext] from [this] map + * @return a new [GraphQLContext] + */ +fun Map<*, Any?>.toGraphQLContext(): graphql.GraphQLContext = + graphql.GraphQLContext.of(this) diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLSchema.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt similarity index 86% rename from server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLSchema.kt rename to server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt index 257426c2..e5de168b 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/impl/TachideskGraphQLSchema.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLSchema.kt @@ -5,20 +5,19 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ -package suwayomi.tachidesk.graphql.impl +package suwayomi.tachidesk.graphql.server import com.expediagroup.graphql.generator.SchemaGeneratorConfig import com.expediagroup.graphql.generator.TopLevelObject import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks -import com.expediagroup.graphql.generator.scalars.IDValueUnboxer import com.expediagroup.graphql.generator.toSchema -import graphql.GraphQL import graphql.scalars.ExtendedScalars import graphql.schema.GraphQLType import suwayomi.tachidesk.graphql.mutations.ChapterMutation import suwayomi.tachidesk.graphql.queries.CategoryQuery import suwayomi.tachidesk.graphql.queries.ChapterQuery import suwayomi.tachidesk.graphql.queries.MangaQuery +import suwayomi.tachidesk.graphql.subscriptions.DownloadSubscription import kotlin.reflect.KClass import kotlin.reflect.KType @@ -42,9 +41,8 @@ val schema = toSchema( ), mutations = listOf( TopLevelObject(ChapterMutation()) + ), + subscriptions = listOf( + TopLevelObject(DownloadSubscription()) ) ) - -fun getGraphQLObject(): GraphQL = GraphQL.newGraphQL(schema) - .valueUnboxer(IDValueUnboxer()) - .build() diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt new file mode 100644 index 00000000..65f6a170 --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/TachideskGraphQLServer.kt @@ -0,0 +1,57 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server + +import com.expediagroup.graphql.generator.execution.FlowSubscriptionExecutionStrategy +import com.expediagroup.graphql.server.execution.GraphQLRequestHandler +import com.expediagroup.graphql.server.execution.GraphQLServer +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import graphql.GraphQL +import io.javalin.http.Context +import io.javalin.websocket.WsCloseContext +import io.javalin.websocket.WsMessageContext +import suwayomi.tachidesk.graphql.server.subscriptions.ApolloSubscriptionProtocolHandler +import suwayomi.tachidesk.graphql.server.subscriptions.GraphQLSubscriptionHandler + +class TachideskGraphQLServer( + requestParser: JavalinGraphQLRequestParser, + contextFactory: TachideskGraphQLContextFactory, + requestHandler: GraphQLRequestHandler, + subscriptionHandler: GraphQLSubscriptionHandler +) : GraphQLServer(requestParser, contextFactory, requestHandler) { + private val objectMapper = jacksonObjectMapper() + private val subscriptionProtocolHandler = ApolloSubscriptionProtocolHandler(contextFactory, subscriptionHandler, objectMapper) + + fun handleSubscriptionMessage(context: WsMessageContext) { + subscriptionProtocolHandler.handleMessage(context) + .map { objectMapper.writeValueAsString(it) } + .map { context.send(it) } + .subscribe() + } + + fun handleSubscriptionDisconnect(context: WsCloseContext) { + subscriptionProtocolHandler.handleDisconnect(context) + } + + companion object { + private fun getGraphQLObject(): GraphQL = GraphQL.newGraphQL(schema) + .subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy()) + .build() + + fun create(): TachideskGraphQLServer { + val graphQL = getGraphQLObject() + + val requestParser = JavalinGraphQLRequestParser() + val contextFactory = TachideskGraphQLContextFactory() + val requestHandler = GraphQLRequestHandler(graphQL, TachideskDataLoaderRegistryFactory.create()) + val subscriptionHandler = GraphQLSubscriptionHandler(graphQL, TachideskDataLoaderRegistryFactory.create()) + + return TachideskGraphQLServer(requestParser, contextFactory, requestHandler, subscriptionHandler) + } + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt new file mode 100644 index 00000000..53df508e --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionProtocolHandler.kt @@ -0,0 +1,194 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server.subscriptions + +import com.expediagroup.graphql.server.types.GraphQLRequest +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.kotlin.convertValue +import com.fasterxml.jackson.module.kotlin.readValue +import io.javalin.websocket.WsContext +import io.javalin.websocket.WsMessageContext +import kotlinx.coroutines.reactor.asFlux +import kotlinx.coroutines.runBlocking +import org.slf4j.LoggerFactory +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.toFlux +import suwayomi.tachidesk.graphql.server.TachideskGraphQLContextFactory +import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ClientMessages.* +import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.* +import suwayomi.tachidesk.graphql.server.toGraphQLContext +import java.time.Duration + +/** + * Implementation of the `graphql-ws` protocol defined by Apollo + * https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md + * ported for Javalin + */ +class ApolloSubscriptionProtocolHandler( + private val contextFactory: TachideskGraphQLContextFactory, + private val subscriptionHandler: GraphQLSubscriptionHandler, + private val objectMapper: ObjectMapper +) { + private val sessionState = ApolloSubscriptionSessionState() + private val logger = LoggerFactory.getLogger(ApolloSubscriptionProtocolHandler::class.java) + private val keepAliveMessage = SubscriptionOperationMessage(type = GQL_CONNECTION_KEEP_ALIVE.type) + private val basicConnectionErrorMessage = SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type) + private val acknowledgeMessage = SubscriptionOperationMessage(GQL_CONNECTION_ACK.type) + + @Suppress("Detekt.TooGenericExceptionCaught") + fun handleMessage(context: WsMessageContext): Flux { + val operationMessage = convertToMessageOrNull(context.message()) ?: return Flux.just(basicConnectionErrorMessage) + logger.debug("GraphQL subscription client message, sessionId=${context.sessionId} operationMessage=$operationMessage") + + return try { + when (operationMessage.type) { + GQL_CONNECTION_INIT.type -> onInit(operationMessage, context) + GQL_START.type -> startSubscription(operationMessage, context) + GQL_STOP.type -> onStop(operationMessage, context) + GQL_CONNECTION_TERMINATE.type -> onDisconnect(context) + else -> onUnknownOperation(operationMessage, context) + } + } catch (exception: Exception) { + onException(exception) + } + } + + fun handleDisconnect(context: WsContext) { + onDisconnect(context) + } + + @Suppress("Detekt.TooGenericExceptionCaught") + private fun convertToMessageOrNull(payload: String): SubscriptionOperationMessage? { + return try { + objectMapper.readValue(payload) + } catch (exception: Exception) { + logger.error("Error parsing the subscription message", exception) + null + } + } + + /** + * If the keep alive configuration is set, send a message back to client at every interval until the session is terminated. + * Otherwise just return empty flux to append to the acknowledge message. + */ + private fun getKeepAliveFlux(context: WsContext): Flux { + val keepAliveInterval: Long? = 2000 + if (keepAliveInterval != null) { + return Flux.interval(Duration.ofMillis(keepAliveInterval)) + .map { keepAliveMessage } + .doOnSubscribe { sessionState.saveKeepAliveSubscription(context, it) } + } + + return Flux.empty() + } + + @Suppress("Detekt.TooGenericExceptionCaught") + private fun startSubscription( + operationMessage: SubscriptionOperationMessage, + context: WsContext + ): Flux { + val graphQLContext = sessionState.getGraphQLContext(context) + + if (operationMessage.id == null) { + logger.error("GraphQL subscription operation id is required") + return Flux.just(basicConnectionErrorMessage) + } + + if (sessionState.doesOperationExist(context, operationMessage)) { + logger.info("Already subscribed to operation ${operationMessage.id} for session ${context.sessionId}") + return Flux.empty() + } + + val payload = operationMessage.payload + + if (payload == null) { + logger.error("GraphQL subscription payload was null instead of a GraphQLRequest object") + sessionState.stopOperation(context, operationMessage) + return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)) + } + + try { + val request = objectMapper.convertValue(payload) + return subscriptionHandler.executeSubscription(request, graphQLContext) + .asFlux() + .map { + if (it.errors?.isNotEmpty() == true) { + SubscriptionOperationMessage(type = GQL_ERROR.type, id = operationMessage.id, payload = it) + } else { + SubscriptionOperationMessage(type = GQL_DATA.type, id = operationMessage.id, payload = it) + } + } + .concatWith(onComplete(operationMessage, context).toFlux()) + .doOnSubscribe { sessionState.saveOperation(context, operationMessage, it) } + } catch (exception: Exception) { + logger.error("Error running graphql subscription", exception) + // Do not terminate the session, just stop the operation messages + sessionState.stopOperation(context, operationMessage) + return Flux.just(SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id)) + } + } + + private fun onInit(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux { + saveContext(operationMessage, context) + val acknowledgeMessage = Mono.just(acknowledgeMessage) + val keepAliveFlux = getKeepAliveFlux(context) + return acknowledgeMessage.concatWith(keepAliveFlux) + .onErrorReturn(getConnectionErrorMessage(operationMessage)) + } + + /** + * Generate the context and save it for all future messages. + */ + private fun saveContext(operationMessage: SubscriptionOperationMessage, context: WsContext) { + runBlocking { + val graphQLContext = contextFactory.generateContextMap(context).toGraphQLContext() + sessionState.saveContext(context, graphQLContext) + } + } + + /** + * Called with the publisher has completed on its own. + */ + private fun onComplete( + operationMessage: SubscriptionOperationMessage, + context: WsContext + ): Mono { + return sessionState.completeOperation(context, operationMessage) + } + + /** + * Called with the client has called stop manually, or on error, and we need to cancel the publisher + */ + private fun onStop( + operationMessage: SubscriptionOperationMessage, + context: WsContext + ): Flux { + return sessionState.stopOperation(context, operationMessage).toFlux() + } + + private fun onDisconnect(context: WsContext): Flux { + sessionState.terminateSession(context) + return Flux.empty() + } + + private fun onUnknownOperation(operationMessage: SubscriptionOperationMessage, context: WsContext): Flux { + logger.error("Unknown subscription operation $operationMessage") + sessionState.stopOperation(context, operationMessage) + return Flux.just(getConnectionErrorMessage(operationMessage)) + } + + private fun onException(exception: Exception): Flux { + logger.error("Error parsing the subscription message", exception) + return Flux.just(basicConnectionErrorMessage) + } + + private fun getConnectionErrorMessage(operationMessage: SubscriptionOperationMessage): SubscriptionOperationMessage { + return SubscriptionOperationMessage(type = GQL_CONNECTION_ERROR.type, id = operationMessage.id) + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt new file mode 100644 index 00000000..7e2358ed --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/ApolloSubscriptionSessionState.kt @@ -0,0 +1,125 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server.subscriptions + +import graphql.GraphQLContext +import io.javalin.websocket.WsContext +import org.reactivestreams.Subscription +import reactor.core.publisher.Mono +import suwayomi.tachidesk.graphql.server.subscriptions.SubscriptionOperationMessage.ServerMessages.GQL_COMPLETE +import suwayomi.tachidesk.graphql.server.toGraphQLContext +import java.util.concurrent.ConcurrentHashMap + +internal class ApolloSubscriptionSessionState { + + // Sessions are saved by web socket session id + internal val activeKeepAliveSessions = ConcurrentHashMap() + + // Operations are saved by web socket session id, then operation id + internal val activeOperations = ConcurrentHashMap>() + + // The graphQL context is saved by web socket session id + private val cachedGraphQLContext = ConcurrentHashMap() + + /** + * Save the context created from the factory and possibly updated in the onConnect hook. + * This allows us to include some initial state to be used when handling all the messages. + * This will be removed in [terminateSession]. + */ + fun saveContext(context: WsContext, graphQLContext: GraphQLContext) { + cachedGraphQLContext[context.sessionId] = graphQLContext + } + + /** + * Return the graphQL context for this session. + */ + fun getGraphQLContext(context: WsContext): GraphQLContext = cachedGraphQLContext[context.sessionId] ?: emptyMap().toGraphQLContext() + + /** + * Save the session that is sending keep alive messages. + * This will override values without cancelling the subscription, so it is the responsibility of the consumer to cancel. + * These messages will be stopped on [terminateSession]. + */ + fun saveKeepAliveSubscription(context: WsContext, subscription: Subscription) { + activeKeepAliveSessions[context.sessionId] = subscription + } + + /** + * Save the operation that is sending data to the client. + * This will override values without cancelling the subscription so it is the responsibility of the consumer to cancel. + * These messages will be stopped on [stopOperation]. + */ + fun saveOperation(context: WsContext, operationMessage: SubscriptionOperationMessage, subscription: Subscription) { + val id = operationMessage.id + if (id != null) { + val operationsForSession: ConcurrentHashMap = activeOperations.getOrPut(context.sessionId) { ConcurrentHashMap() } + operationsForSession[id] = subscription + } + } + + /** + * Send the [GQL_COMPLETE] message. + * This can happen when the publisher finishes or if the client manually sends the stop message. + */ + fun completeOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono { + return getCompleteMessage(operationMessage) + .doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = false) } + } + + /** + * Stop the subscription sending data and send the [GQL_COMPLETE] message. + * Does NOT terminate the session. + */ + fun stopOperation(context: WsContext, operationMessage: SubscriptionOperationMessage): Mono { + return getCompleteMessage(operationMessage) + .doFinally { removeActiveOperation(context, operationMessage.id, cancelSubscription = true) } + } + + private fun getCompleteMessage(operationMessage: SubscriptionOperationMessage): Mono { + val id = operationMessage.id + if (id != null) { + return Mono.just(SubscriptionOperationMessage(type = GQL_COMPLETE.type, id = id)) + } + return Mono.empty() + } + + /** + * Remove active running subscription from the cache and cancel if needed + */ + private fun removeActiveOperation(context: WsContext, id: String?, cancelSubscription: Boolean) { + val operationsForSession = activeOperations[context.sessionId] + val subscription = operationsForSession?.get(id) + if (subscription != null) { + if (cancelSubscription) { + subscription.cancel() + } + operationsForSession.remove(id) + if (operationsForSession.isEmpty()) { + activeOperations.remove(context.sessionId) + } + } + } + + /** + * Terminate the session, cancelling the keep alive messages and all operations active for this session. + */ + fun terminateSession(context: WsContext) { + activeOperations[context.sessionId]?.forEach { (_, subscription) -> subscription.cancel() } + activeOperations.remove(context.sessionId) + cachedGraphQLContext.remove(context.sessionId) + activeKeepAliveSessions[context.sessionId]?.cancel() + activeKeepAliveSessions.remove(context.sessionId) + context.closeSession() + } + + /** + * Looks up the operation for the client, to check if it already exists + */ + fun doesOperationExist(context: WsContext, operationMessage: SubscriptionOperationMessage): Boolean = + activeOperations[context.sessionId]?.containsKey(operationMessage.id) ?: false +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FluxSubscriptionSource.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FluxSubscriptionSource.kt new file mode 100644 index 00000000..ab4c1675 --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/FluxSubscriptionSource.kt @@ -0,0 +1,20 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server.subscriptions + +import reactor.core.publisher.Flux +import reactor.core.publisher.FluxSink + +class FluxSubscriptionSource() { + private var sink: FluxSink? = null + val emitter: Flux = Flux.create { emitter -> sink = emitter } + + fun publish(value: T) { + sink?.next(value) + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/GraphQLSubscriptionHandler.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/GraphQLSubscriptionHandler.kt new file mode 100644 index 00000000..a402e642 --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/GraphQLSubscriptionHandler.kt @@ -0,0 +1,43 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server.subscriptions + +import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory +import com.expediagroup.graphql.server.extensions.toExecutionInput +import com.expediagroup.graphql.server.extensions.toGraphQLError +import com.expediagroup.graphql.server.extensions.toGraphQLKotlinType +import com.expediagroup.graphql.server.extensions.toGraphQLResponse +import com.expediagroup.graphql.server.types.GraphQLRequest +import com.expediagroup.graphql.server.types.GraphQLResponse +import graphql.ExecutionResult +import graphql.GraphQL +import graphql.GraphQLContext +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.map + +open class GraphQLSubscriptionHandler( + private val graphQL: GraphQL, + private val dataLoaderRegistryFactory: KotlinDataLoaderRegistryFactory? = null +) { + open fun executeSubscription( + graphQLRequest: GraphQLRequest, + graphQLContext: GraphQLContext = GraphQLContext.of(emptyMap()) + ): Flow> { + val dataLoaderRegistry = dataLoaderRegistryFactory?.generate() + val input = graphQLRequest.toExecutionInput(dataLoaderRegistry, graphQLContext) + + val res = graphQL.execute(input) + val data = res.getData>() + val mapped = data.map { result -> result.toGraphQLResponse() } + return mapped.catch { throwable -> + val error = throwable.toGraphQLError() + emit(GraphQLResponse(errors = listOf(error.toGraphQLKotlinType()))) + } + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/SubscriptionOperationMessage.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/SubscriptionOperationMessage.kt new file mode 100644 index 00000000..c118eb7c --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/server/subscriptions/SubscriptionOperationMessage.kt @@ -0,0 +1,39 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.server.subscriptions + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties + +/** + * The `graphql-ws` protocol from Apollo Client has some special text messages to signal events. + * Along with the HTTP WebSocket event handling we need to have some extra logic + * + * https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md + */ +@JsonIgnoreProperties(ignoreUnknown = true) +data class SubscriptionOperationMessage( + val type: String, + val id: String? = null, + val payload: Any? = null +) { + enum class ClientMessages(val type: String) { + GQL_CONNECTION_INIT("connection_init"), + GQL_START("start"), + GQL_STOP("stop"), + GQL_CONNECTION_TERMINATE("connection_terminate") + } + + enum class ServerMessages(val type: String) { + GQL_CONNECTION_ACK("connection_ack"), + GQL_CONNECTION_ERROR("connection_error"), + GQL_DATA("data"), + GQL_ERROR("error"), + GQL_COMPLETE("complete"), + GQL_CONNECTION_KEEP_ALIVE("ka") + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt new file mode 100644 index 00000000..6f1db4f6 --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/subscriptions/DownloadSubscription.kt @@ -0,0 +1,24 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.subscriptions + +import graphql.schema.DataFetchingEnvironment +import reactor.core.publisher.Flux +import suwayomi.tachidesk.graphql.server.subscriptions.FluxSubscriptionSource +import suwayomi.tachidesk.graphql.types.DownloadType +import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter + +val downloadSubscriptionSource = FluxSubscriptionSource() + +class DownloadSubscription { + fun downloadChanged(dataFetchingEnvironment: DataFetchingEnvironment): Flux { + return downloadSubscriptionSource.emitter.map { downloadChapter -> + DownloadType(downloadChapter) + } + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/ChapterType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/ChapterType.kt index 03a1102c..1d8f0e2c 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/ChapterType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/ChapterType.kt @@ -10,6 +10,7 @@ package suwayomi.tachidesk.graphql.types import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import graphql.schema.DataFetchingEnvironment import org.jetbrains.exposed.sql.ResultRow +import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass import suwayomi.tachidesk.manga.model.table.ChapterTable import java.util.concurrent.CompletableFuture @@ -50,6 +51,24 @@ class ChapterType( // transaction { ChapterTable.select { manga eq chapterEntry[manga].value }.count().toInt() }, ) + constructor(dataClass: ChapterDataClass) : this( + dataClass.id, + dataClass.url, + dataClass.name, + dataClass.uploadDate, + dataClass.chapterNumber, + dataClass.scanlator, + dataClass.mangaId, + dataClass.read, + dataClass.bookmarked, + dataClass.lastPageRead, + dataClass.lastReadAt, + dataClass.index, + dataClass.fetchedAt, + dataClass.downloaded, + dataClass.pageCount + ) + fun manga(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture { return dataFetchingEnvironment.getValueFromDataLoader("MangaDataLoader", mangaId) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt new file mode 100644 index 00000000..9e58d813 --- /dev/null +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/DownloadType.kt @@ -0,0 +1,46 @@ +/* + * Copyright (C) Contributors to the Suwayomi project + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ + +package suwayomi.tachidesk.graphql.types + +import com.expediagroup.graphql.generator.annotations.GraphQLIgnore +import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter +import suwayomi.tachidesk.manga.impl.download.model.DownloadState +import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass +import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass + +class DownloadType( + val chapterId: Int, + val chapterIndex: Int, + val mangaId: Int, + var state: DownloadState = DownloadState.Queued, + var progress: Float = 0f, + var tries: Int = 0, + @GraphQLIgnore + var mangaDataClass: MangaDataClass, + @GraphQLIgnore + var chapterDataClass: ChapterDataClass +) { + constructor(downloadChapter: DownloadChapter) : this( + downloadChapter.chapter.id, + downloadChapter.chapterIndex, + downloadChapter.mangaId, + downloadChapter.state, + downloadChapter.progress, + downloadChapter.tries, + downloadChapter.manga, + downloadChapter.chapter + ) + + fun manga(): MangaType { + return MangaType(mangaDataClass) + } + + fun chapter(): ChapterType { + return ChapterType(chapterDataClass) + } +} diff --git a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt index 40cd2222..1c1030ce 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/graphql/types/MangaType.kt @@ -10,6 +10,7 @@ package suwayomi.tachidesk.graphql.types import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import graphql.schema.DataFetchingEnvironment import org.jetbrains.exposed.sql.ResultRow +import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass import suwayomi.tachidesk.manga.model.dataclass.toGenreList import suwayomi.tachidesk.manga.model.table.MangaStatus import suwayomi.tachidesk.manga.model.table.MangaTable @@ -53,6 +54,25 @@ class MangaType( row[MangaTable.chaptersLastFetchedAt] ) + constructor(dataClass: MangaDataClass) : this( + dataClass.id, + dataClass.sourceId, + dataClass.url, + dataClass.title, + dataClass.thumbnailUrl, + dataClass.initialized, + dataClass.artist, + dataClass.author, + dataClass.description, + dataClass.genre, + dataClass.status, + dataClass.inLibrary, + dataClass.inLibraryAt, + dataClass.realUrl, + dataClass.lastFetchedAt, + dataClass.chaptersLastFetchedAt + ) + fun chapters(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture> { return dataFetchingEnvironment.getValueFromDataLoader>("ChaptersForMangaDataLoader", id) } diff --git a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt index 7ddd40d5..fe9a13a5 100644 --- a/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt +++ b/server/src/main/kotlin/suwayomi/tachidesk/manga/impl/download/DownloadManager.kt @@ -24,6 +24,7 @@ import mu.KotlinLogging import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.transactions.transaction +import suwayomi.tachidesk.graphql.subscriptions.downloadSubscriptionSource import suwayomi.tachidesk.manga.impl.download.model.DownloadChapter import suwayomi.tachidesk.manga.impl.download.model.DownloadState.Downloading import suwayomi.tachidesk.manga.impl.download.model.DownloadStatus @@ -100,6 +101,9 @@ object DownloadManager { notifyFlow.emit(Unit) } } + /*if (downloadChapter != null) { TODO GRAPHQL + downloadSubscriptionSource.publish(downloadChapter) + }*/ } private fun getStatus(): DownloadStatus { @@ -234,6 +238,7 @@ object DownloadManager { manga ) downloadQueue.add(downloadChapter) + downloadSubscriptionSource.publish(downloadChapter) logger.debug { "Added chapter ${chapter.id} to download queue (${manga.title} | ${chapter.name})" } return downloadChapter }