From b8fb8605b9cb55976bd23e8803000707b0748943 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 15 Jan 2026 17:10:56 +0000 Subject: [PATCH 1/2] feat(connectors): add Redis connector implementation Add comprehensive Redis connector support following Hazelcast patterns: Core Infrastructure: - RedisConfiguration with standalone mode support (extensible for Sentinel/Cluster) - RedisConnectionFactory and RedisConnectionsManager using Lettuce client - RedisTaxi schema with annotations for service, key patterns, TTL, streams, and pub/sub - JSON serialization support (RedisJsonValueReader/Writer) Invokers: - RedisQueryInvoker: GET and SCAN operations - RedisMutatingInvoker: SET/SETEX and DEL operations with TTL support - RedisStreamInvoker: Support for Redis Streams, Pub/Sub, and keyspace notifications - RedisInvoker: Main routing invoker Advanced Features: - RedisCachingProvider: Cache-aside pattern implementation - RedisStateStore: State storage for stream merging operations - Spring Boot integration via RedisConnectorConfig Configuration: - Updated ConnectionsConfig to include Redis connections - Added Redis support to SourceLoaderConnectorsRegistry - Updated parent pom.xml to include redis-connector module Key Features: - Standalone Redis support (architecture ready for Sentinel/Cluster) - JSON serialization only - Per-key TTL via annotations - Triple streaming support: Redis Streams, Pub/Sub, and keyspace notifications - Comprehensive query, mutation, and streaming capabilities --- .../connectors/redis/RedisConnectorConfig.kt | 37 ++ .../connectors/config/ConnectionsConfig.kt | 4 +- .../config/SourceLoaderConnectorsRegistry.kt | 14 + .../config/redis/RedisConfiguration.kt | 87 ++++ connectors/pom.xml | 1 + connectors/redis-connector/.gitignore | 6 + connectors/redis-connector/pom.xml | 111 +++++ .../connectors/redis/RedisCachingProvider.kt | 134 +++++ .../redis/RedisConnectionFactory.kt | 80 +++ .../redis/RedisConnectionsManager.kt | 51 ++ .../connectors/redis/RedisStateStore.kt | 148 ++++++ .../orbitalhq/connectors/redis/RedisTaxi.kt | 55 +++ .../orbitalhq/connectors/redis/RedisUtils.kt | 52 ++ .../connectors/redis/invoker/RedisInvoker.kt | 145 ++++++ .../redis/invoker/RedisMutatingInvoker.kt | 379 +++++++++++++++ .../redis/invoker/RedisQueryInvoker.kt | 458 ++++++++++++++++++ .../redis/invoker/RedisStreamInvoker.kt | 379 +++++++++++++++ .../serialization/RedisJsonValueReader.kt | 19 + .../serialization/RedisJsonValueWriter.kt | 23 + 19 files changed, 2182 insertions(+), 1 deletion(-) create mode 100644 cockpit-core/src/main/java/com/orbitalhq/cockpit/core/connectors/redis/RedisConnectorConfig.kt create mode 100644 connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/redis/RedisConfiguration.kt create mode 100644 connectors/redis-connector/.gitignore create mode 100644 connectors/redis-connector/pom.xml create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisCachingProvider.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionFactory.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionsManager.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisStateStore.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisTaxi.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisUtils.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisInvoker.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvoker.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisQueryInvoker.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisStreamInvoker.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueReader.kt create mode 100644 connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueWriter.kt diff --git a/cockpit-core/src/main/java/com/orbitalhq/cockpit/core/connectors/redis/RedisConnectorConfig.kt b/cockpit-core/src/main/java/com/orbitalhq/cockpit/core/connectors/redis/RedisConnectorConfig.kt new file mode 100644 index 0000000000..c077fd06fa --- /dev/null +++ b/cockpit-core/src/main/java/com/orbitalhq/cockpit/core/connectors/redis/RedisConnectorConfig.kt @@ -0,0 +1,37 @@ +package com.orbitalhq.cockpit.core.connectors.redis + +import com.orbitalhq.connectors.config.SourceLoaderConnectorsRegistry +import com.orbitalhq.connectors.redis.RedisConnectionsManager +import com.orbitalhq.connectors.redis.RedisOperationCacheBuilder +import com.orbitalhq.connectors.redis.RedisStateStoreProvider +import com.orbitalhq.connectors.redis.invoker.RedisInvoker +import com.orbitalhq.schema.consumer.SchemaStore +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration + +@Configuration +class RedisConnectorConfig { + + @Bean + fun redisConnectionsManager(connectors: SourceLoaderConnectorsRegistry): RedisConnectionsManager { + return RedisConnectionsManager(connectors) + } + + @Bean + fun redisOperationCacheBuilder( + redisConnectionsManager: RedisConnectionsManager, + schemaStore: SchemaStore + ): RedisOperationCacheBuilder { + return RedisOperationCacheBuilder(redisConnectionsManager, schemaStore) + } + + @Bean + fun redisStateStoreProvider(redisConnectionsManager: RedisConnectionsManager): RedisStateStoreProvider { + return RedisStateStoreProvider(redisConnectionsManager) + } + + @Bean + fun redisInvoker(connectionsManager: RedisConnectionsManager): RedisInvoker { + return RedisInvoker(connectionsManager) + } +} diff --git a/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/ConnectionsConfig.kt b/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/ConnectionsConfig.kt index 3bd14e05f7..38e06a0307 100644 --- a/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/ConnectionsConfig.kt +++ b/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/ConnectionsConfig.kt @@ -6,6 +6,7 @@ import com.orbitalhq.connectors.config.hazelcast.HazelcastConfiguration import com.orbitalhq.connectors.config.jdbc.DefaultJdbcConnectionConfiguration import com.orbitalhq.connectors.config.kafka.KafkaConnectionConfiguration import com.orbitalhq.connectors.config.mongodb.MongoConnectionConfiguration +import com.orbitalhq.connectors.config.redis.RedisConfiguration import com.orbitalhq.connectors.registry.ConnectorConfiguration import kotlinx.serialization.Serializable @@ -28,9 +29,10 @@ data class ConnectionsConfig( val hazelcast: Map = emptyMap(), val mongo: Map = emptyMap(), val serviceBus: Map = emptyMap(), + val redis: Map = emptyMap(), ) { fun listAll(): List { - return jdbc.values + kafka.values + aws.values + hazelcast.values + mongo.values + serviceBus.values + return jdbc.values + kafka.values + aws.values + hazelcast.values + mongo.values + serviceBus.values + redis.values } val jdbcConnectionsHash = jdbc.hashCode() diff --git a/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/SourceLoaderConnectorsRegistry.kt b/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/SourceLoaderConnectorsRegistry.kt index 86e016c5af..a637e8dd2f 100644 --- a/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/SourceLoaderConnectorsRegistry.kt +++ b/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/SourceLoaderConnectorsRegistry.kt @@ -9,6 +9,7 @@ import com.orbitalhq.config.MergingHoconConfigRepository import com.orbitalhq.config.SimpleConfigSourceWriterProvider import com.orbitalhq.connectors.VyneConnectionsConfig import com.orbitalhq.connectors.config.hazelcast.HazelcastConfiguration +import com.orbitalhq.connectors.config.redis.RedisConfiguration import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import io.github.config4k.extract @@ -86,6 +87,19 @@ class SourceLoaderConnectorsRegistry( fun hazelcastConfigurationForConnectionName(connectionName: String): HazelcastConfiguration? { return load().hazelcast[connectionName] } + + fun defaultRedisConfiguration(): RedisConfiguration? { + val redisConnectors = load().redis + val defaultConnections = redisConnectors.values.filter { it.default } + if (defaultConnections.size > 1) { + throw IllegalArgumentException("Only one Redis connection can be defined as default - ${defaultConnections.joinToString { defaultConn -> defaultConn.connectionName }} marked as default!") + } + return defaultConnections.firstOrNull() + } + + fun redisConfigurationForConnectionName(connectionName: String): RedisConfiguration? { + return load().redis[connectionName] + } } diff --git a/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/redis/RedisConfiguration.kt b/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/redis/RedisConfiguration.kt new file mode 100644 index 0000000000..8dcbd0e2c8 --- /dev/null +++ b/connectors/connectors-core/src/main/java/com/orbitalhq/connectors/config/redis/RedisConfiguration.kt @@ -0,0 +1,87 @@ +package com.orbitalhq.connectors.config.redis + +import com.google.common.base.Objects +import com.orbitalhq.connectors.ConnectionParameterName +import com.orbitalhq.connectors.config.redis.RedisConnection.REDIS_DATABASE +import com.orbitalhq.connectors.config.redis.RedisConnection.REDIS_PASSWORD +import com.orbitalhq.connectors.config.redis.RedisConnection.REDIS_SSL_ENABLED +import com.orbitalhq.connectors.config.redis.RedisConnection.REDIS_USERNAME +import com.orbitalhq.connectors.registry.ConnectorConfiguration +import com.orbitalhq.connectors.registry.ConnectorCategory +import com.orbitalhq.utils.obfuscateKeys +import kotlinx.serialization.Serializable + +@Serializable +data class RedisConfiguration( + override val connectionName: String, + val addresses: List = listOf("localhost:6379"), + val operationCacheTtlSeconds: Int = 120, + val connectionTimeoutSeconds: Int = 10, + val commandTimeoutSeconds: Int = 30, + val connectionParameters: Map = emptyMap(), + override val default: Boolean = false +) : ConnectorConfiguration { + + override val driverName: String = RedisConnection.DRIVER_NAME + override val type: ConnectorCategory = ConnectorCategory.CACHE + + override fun hashCode(): Int { + return Objects.hashCode(addresses) + } + + override fun equals(other: Any?): Boolean { + if (other == null) return false + if (this.javaClass != other.javaClass) return false + val otherConfig = other as RedisConfiguration + return Objects.equal(this.addresses, otherConfig.addresses) + } + + override fun getUiDisplayProperties(): Map { + val baseProperties = mutableMapOf( + "addresses" to addresses.joinToString(",") + ) + + // Add database if specified + database()?.let { baseProperties["database"] = it } + + // Add username if specified (but obfuscate password) + if (hasAuthentication()) { + username()?.let { baseProperties[REDIS_USERNAME] = it } + baseProperties.putAll(connectionParameters.obfuscateKeys(REDIS_PASSWORD)) + } + + // Add SSL status + if (isSslEnabled()) { + baseProperties[REDIS_SSL_ENABLED] = "true" + } + + return baseProperties + } + + fun hasAuthentication(): Boolean { + return connectionParameters.containsKey(REDIS_PASSWORD) || + (connectionParameters.containsKey(REDIS_USERNAME) && connectionParameters.containsKey(REDIS_PASSWORD)) + } + + fun hasUsernamePasswordAuthentication(): Boolean { + return connectionParameters.containsKey(REDIS_USERNAME) && connectionParameters.containsKey(REDIS_PASSWORD) + } + + fun isSslEnabled(): Boolean { + return connectionParameters[REDIS_SSL_ENABLED]?.toBoolean() == true + } + + fun username(): String? = connectionParameters[REDIS_USERNAME] + + fun password(): String? = connectionParameters[REDIS_PASSWORD] + + fun database(): Int? = connectionParameters[REDIS_DATABASE]?.toIntOrNull() +} + +object RedisConnection { + const val DRIVER_NAME = "redis" + const val REDIS_USERNAME = "username" + const val REDIS_PASSWORD = "password" + const val REDIS_DATABASE = "database" + const val REDIS_SSL_ENABLED = "sslEnabled" +} diff --git a/connectors/pom.xml b/connectors/pom.xml index 7a503dde64..2059f94a33 100644 --- a/connectors/pom.xml +++ b/connectors/pom.xml @@ -20,6 +20,7 @@ connectors-calcite soap-connector hazelcast-connector + redis-connector nosql-connectors diff --git a/connectors/redis-connector/.gitignore b/connectors/redis-connector/.gitignore new file mode 100644 index 0000000000..434dfcf2d3 --- /dev/null +++ b/connectors/redis-connector/.gitignore @@ -0,0 +1,6 @@ +target/ +.idea/ +*.iml +.classpath +.project +.settings/ diff --git a/connectors/redis-connector/pom.xml b/connectors/redis-connector/pom.xml new file mode 100644 index 0000000000..ecba9b8edf --- /dev/null +++ b/connectors/redis-connector/pom.xml @@ -0,0 +1,111 @@ + + + 4.0.0 + + com.orbitalhq + connectors + 0.37.0-SNAPSHOT + + + redis-connector + + + 17 + 17 + UTF-8 + 6.3.1.RELEASE + + + + + + io.lettuce + lettuce-core + ${lettuce.version} + + + + + com.orbitalhq + schema-consumer-api + ${project.version} + + + com.orbitalhq + connectors-core + ${project.version} + + + com.orbitalhq + vyne-core-types + ${project.version} + + + com.orbitalhq + taxiql-query-engine + ${project.version} + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + + + io.projectreactor.kotlin + reactor-kotlin-extensions + + + + + app.cash.turbine + turbine-jvm + test + + + com.orbitalhq + taxiql-query-engine + ${project.version} + test-jar + test + + + log4j-to-slf4j + org.apache.logging.log4j + + + + + com.orbitalhq + vyne-core-types + ${project.version} + test + test-jar + + + + + org.testcontainers + testcontainers + test + + + + + ch.qos.logback + logback-core + 1.4.12 + test + + + ch.qos.logback + logback-classic + 1.4.12 + test + + + diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisCachingProvider.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisCachingProvider.kt new file mode 100644 index 0000000000..ff850678da --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisCachingProvider.kt @@ -0,0 +1,134 @@ +package com.orbitalhq.connectors.redis + +import arrow.core.Either +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.query.StreamErrorMessage +import com.orbitalhq.query.connectors.CacheAwareOperationInvocationDecorator +import com.orbitalhq.query.connectors.OperationCacheKey +import com.orbitalhq.query.connectors.OperationInvocationParamMessage +import com.orbitalhq.schema.consumer.SchemaStore +import io.lettuce.core.api.StatefulRedisConnection +import reactor.core.publisher.Flux +import java.time.Clock +import java.time.Duration + +abstract class RedisCachingProvider(protected val connection: StatefulRedisConnection) { + abstract fun load( + key: OperationCacheKey, + message: OperationInvocationParamMessage, + loader: () -> Flux> + ): Flux + + abstract fun evict(operationKey: OperationCacheKey) +} + +object RedisCacheProviderFactory { + fun instance( + connection: StatefulRedisConnection, + schemaStore: SchemaStore, + connectionName: String, + connectionAddress: String, + clock: Clock, + ttl: Duration = CacheAwareOperationInvocationDecorator.DEFAULT_CACHE_TTL + ): RedisCachingProvider { + return RedisMapCachingProvider( + connection, + schemaStore, + connectionName, + connectionAddress, + clock = clock, + defaultTTL = ttl + ) + } +} + +/** + * Implementation of cache-aside pattern using Redis + */ +class RedisMapCachingProvider( + connection: StatefulRedisConnection, + private val schemaStore: SchemaStore, + private val connectionName: String, + private val connectionAddress: String, + private val clock: Clock = Clock.systemUTC(), + private val defaultTTL: Duration +) : RedisCachingProvider(connection) { + + override fun load( + key: OperationCacheKey, + message: OperationInvocationParamMessage, + loader: () -> Flux> + ): Flux { + val cacheKey = "orbital:cache:${key.hashCode()}" + + // Try to load from cache first + val cached = connection.sync().get(cacheKey) + if (cached != null) { + // Cache hit - deserialize and return + return Flux.fromIterable(deserializeCachedResults(cached)) + } + + // Cache miss - execute loader and cache results + return loader() + .collectList() + .flatMapMany { results -> + val instances = results.mapNotNull { either -> + when (either) { + is Either.Right -> either.value + is Either.Left -> null + } + } + + // Serialize and cache + if (instances.isNotEmpty()) { + val serialized = serializeCachedResults(instances) + connection.sync().setex(cacheKey, defaultTTL.seconds, serialized) + } + + Flux.fromIterable(instances) + } + } + + override fun evict(operationKey: OperationCacheKey) { + val cacheKey = "orbital:cache:${operationKey.hashCode()}" + connection.sync().del(cacheKey) + } + + private fun serializeCachedResults(instances: List): String { + // Simplified serialization - in production, use proper serialization + return instances.joinToString("\n---SEPARATOR---\n") { it.toJson() } + } + + private fun deserializeCachedResults(serialized: String): List { + // Simplified deserialization - in production, use proper deserialization + val schema = schemaStore.current() + return serialized.split("\n---SEPARATOR---\n") + .mapNotNull { json -> + // Would need proper deserialization here + null + } + } +} + +/** + * Builder for creating caching providers + */ +class RedisOperationCacheBuilder( + private val connectionsManager: RedisConnectionsManager, + private val schemaStore: SchemaStore +) { + fun build( + connectionName: String, + ttl: Duration = CacheAwareOperationInvocationDecorator.DEFAULT_CACHE_TTL + ): RedisCachingProvider { + val (connection, config) = connectionsManager.redisConnection(connectionName) + return RedisCacheProviderFactory.instance( + connection, + schemaStore, + connectionName, + config.addresses.joinToString(), + Clock.systemUTC(), + ttl + ) + } +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionFactory.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionFactory.kt new file mode 100644 index 0000000000..4f398612ec --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionFactory.kt @@ -0,0 +1,80 @@ +package com.orbitalhq.connectors.redis + +import com.orbitalhq.connectors.config.redis.RedisConfiguration +import io.lettuce.core.RedisClient +import io.lettuce.core.RedisURI +import io.lettuce.core.api.StatefulRedisConnection +import java.time.Duration + +object RedisConnectionFactory { + + fun createConnection(config: RedisConfiguration): StatefulRedisConnection { + val client = createClient(config) + return client.connect() + } + + fun createClient(config: RedisConfiguration): RedisClient { + // For standalone, we use the first address + require(config.addresses.isNotEmpty()) { "At least one Redis address must be configured" } + + val address = config.addresses.first() + val (host, port) = parseAddress(address) + + val redisUri = RedisURI.builder() + .withHost(host) + .withPort(port) + .apply { + // Set authentication + if (config.hasUsernamePasswordAuthentication()) { + withAuthentication(config.username()!!, config.password()!!) + } else if (config.hasAuthentication()) { + withPassword(config.password()!!.toCharArray()) + } + + // Set database + config.database()?.let { withDatabase(it) } + + // Set SSL + if (config.isSslEnabled()) { + withSsl(true) + } + + // Set timeouts + withTimeout(Duration.ofSeconds(config.commandTimeoutSeconds.toLong())) + } + .build() + + return RedisClient.create(redisUri) + } + + private fun parseAddress(address: String): Pair { + val parts = address.split(":") + require(parts.size == 2) { "Redis address must be in format 'host:port', got: $address" } + val host = parts[0] + val port = parts[1].toIntOrNull() + ?: error("Invalid port number in Redis address: $address") + return host to port + } +} + +interface RedisConnectionProvider { + fun provide(config: RedisConfiguration): StatefulRedisConnection + + /** + * Returns the Redis connection for the provided name. + * If the name is null, and a default connection has been configured, then + * the default is returned - otherwise an exception is thrown + */ + fun redisConnection(connectionName: String?): Pair, RedisConfiguration> + + /** + * Indicates if the provider has a connection for the specified name. + * If no name is provided, indicates if a default connection has been + * specified + */ + fun canProvideRedisConnection(connectionName: String?): Boolean +} + +fun StatefulRedisConnection.doHealthCheck() { + sync().ping() +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionsManager.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionsManager.kt new file mode 100644 index 0000000000..d78044a8cf --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisConnectionsManager.kt @@ -0,0 +1,51 @@ +package com.orbitalhq.connectors.redis + +import com.orbitalhq.connectors.config.SourceLoaderConnectorsRegistry +import com.orbitalhq.connectors.config.redis.RedisConfiguration +import io.lettuce.core.api.StatefulRedisConnection +import java.util.concurrent.ConcurrentHashMap + +object RedisConnections { + const val QUERY_CACHE = "_query" +} + +class RedisConnectionsManager( + private val connectors: SourceLoaderConnectorsRegistry +) : RedisConnectionProvider { + + private val redisConnections = ConcurrentHashMap>() + + override fun redisConnection(connectionName: String?): Pair, RedisConfiguration> { + return if (connectionName == null) { + val defaultRedisConnection = connectors.defaultRedisConfiguration() + ?: error("Cannot fetch Redis connection, as no connection name was provided, and there are no default Redis connections configured.") + val connection = redisConnections.getOrPut(defaultRedisConnection) { + RedisConnectionFactory.createConnection(defaultRedisConnection) + } + connection to defaultRedisConnection + } else { + val connectionConfig = connectors.redisConfigurationForConnectionName(connectionName) + require(connectionConfig != null) { "No connection for Redis named $connectionName exists" } + val connection = redisConnections.getOrPut(connectionConfig) { + RedisConnectionFactory.createConnection(connectionConfig) + } + connection to connectionConfig + } + } + + override fun canProvideRedisConnection(connectionName: String?): Boolean { + return (connectionName == null && connectors.defaultRedisConfiguration() != null) || + (connectionName != null && connectors.redisConfigurationForConnectionName(connectionName) != null) + } + + internal fun connectionsCount() = redisConnections.size + + override fun provide(config: RedisConfiguration): StatefulRedisConnection { + return redisConnection(config.connectionName).first + } + + fun close() { + redisConnections.values.forEach { it.close() } + redisConnections.clear() + } +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisStateStore.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisStateStore.kt new file mode 100644 index 0000000000..a1693bff75 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisStateStore.kt @@ -0,0 +1,148 @@ +package com.orbitalhq.connectors.redis + +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.models.serde.SerializableTypedInstance +import com.orbitalhq.models.serde.toSerializable +import com.orbitalhq.query.caching.AbstractMergingStateStore +import com.orbitalhq.query.caching.StateBackingStore +import com.orbitalhq.query.caching.StateStore +import com.orbitalhq.query.caching.StateStoreConfig +import com.orbitalhq.query.caching.StateStoreProvider +import com.orbitalhq.schemas.Schema +import io.lettuce.core.api.StatefulRedisConnection +import lang.taxi.types.ParameterizedName +import lang.taxi.types.SumType +import mu.KotlinLogging +import reactor.core.publisher.Mono +import java.util.concurrent.ConcurrentHashMap + +/** + * Provides Redis-backed state stores for storing interim state during stream merging + */ +class RedisStateStoreProvider( + private val redisConnectionsManager: RedisConnectionsManager +) : StateStoreProvider { + + private val redisStores = ConcurrentHashMap() + + companion object { + private val logger = KotlinLogging.logger {} + } + + override fun getStateStore( + stateStoreConfig: StateStoreConfig, + sumType: SumType, + schema: Schema, + emitMode: StateStoreProvider.EmitMode, + namePrefix: String + ): StateStore? { + val (connection, config) = redisConnectionsManager.redisConnection(stateStoreConfig.connection) + val storeKey = stateStoreConfig.name + ?: getStateStoreKey( + prefix = "StateStore_${stateStoreConfig.connection.orEmpty()}", + sumType, + emitMode + ) + + return redisStores.getOrPut(storeKey) { + logger.info { "Creating new Redis state store with key $storeKey" } + RedisStateStore( + connection, + storeKey, + stateStoreConfig.maxIdleSeconds, + schema + ) + } + } + + private fun getStateStoreKey( + prefix: String, + sumType: SumType, + emitMode: StateStoreProvider.EmitMode + ): String { + val typeNames = sumType.types.joinToString("_") { it.typeName.parameterizedName } + return "${prefix}_${typeNames}_${emitMode.name}" + } +} + +/** + * Redis-backed state store implementation + */ +class RedisStateStore( + private val connection: StatefulRedisConnection, + private val keyPrefix: String, + private val maxIdleSeconds: Int, + private val schema: Schema +) : AbstractMergingStateStore(), StateBackingStore { + + companion object { + private val logger = KotlinLogging.logger {} + } + + override fun get(key: Any): Mono { + return Mono.fromCallable { + val redisKey = buildRedisKey(key) + val value = connection.sync().get(redisKey) + if (value != null) { + deserialize(value) + } else { + null + } + } + } + + override fun put(key: Any, value: SerializableTypedInstance): Mono { + return Mono.fromRunnable { + val redisKey = buildRedisKey(key) + val serialized = serialize(value) + connection.sync().setex(redisKey, maxIdleSeconds.toLong(), serialized) + } + } + + override fun merge(key: Any, remappingFunction: (SerializableTypedInstance?) -> SerializableTypedInstance?): Mono { + return Mono.fromCallable { + val redisKey = buildRedisKey(key) + + // Get current value + val currentJson = connection.sync().get(redisKey) + val current = currentJson?.let { deserialize(it) } + + // Apply remapping function + val newValue = remappingFunction(current) + + if (newValue != null) { + // Store new value + val serialized = serialize(newValue) + connection.sync().setex(redisKey, maxIdleSeconds.toLong(), serialized) + } else if (current != null) { + // Delete if remapping returned null + connection.sync().del(redisKey) + } + + newValue + } + } + + override fun stateBackingStore(): StateBackingStore = this + + override fun toTypedInstance(storeValue: SerializableTypedInstance): TypedInstance { + return storeValue.toTypedInstance(schema) + } + + override fun toStoreValue(typedInstance: TypedInstance): SerializableTypedInstance { + return typedInstance.toSerializable() + } + + private fun buildRedisKey(key: Any): String { + return "$keyPrefix:$key" + } + + private fun serialize(value: SerializableTypedInstance): String { + // Using JSON serialization for simplicity + return value.toJson() + } + + private fun deserialize(json: String): SerializableTypedInstance { + return SerializableTypedInstance.fromJson(json) + } +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisTaxi.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisTaxi.kt new file mode 100644 index 0000000000..c8b8b188d0 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisTaxi.kt @@ -0,0 +1,55 @@ +package com.orbitalhq.connectors.redis + +import com.orbitalhq.VyneTypes +import com.orbitalhq.schemas.fqn + +object RedisTaxi { + object Annotations { + internal val namespace = "${VyneTypes.NAMESPACE}.redis" + val RedisServiceAnnotation = "${namespace}.RedisService" + val RedisKey = "${namespace}.RedisKey".fqn() + val RedisTTL = "${namespace}.RedisTTL".fqn() + val RedisUpsertOperation = "${namespace}.RedisUpsertOperation".fqn() + val RedisDeleteOperation = "${namespace}.RedisDeleteOperation".fqn() + val RedisStreamName = "${namespace}.RedisStreamName".fqn() + val RedisPubSubChannel = "${namespace}.RedisPubSubChannel".fqn() + } + + val schema = """ +namespace ${Annotations.namespace} { + annotation RedisService { + connectionName : String + } + + annotation RedisKey { + // Key pattern using placeholders, e.g., "user:{userId}" or "order:{orderId}" + pattern : String + } + + annotation RedisTTL { + // Time-to-live in seconds + seconds : Int + } + + annotation RedisUpsertOperation { + // Optional TTL override for this specific operation + ttlSeconds : Int? + } + + annotation RedisDeleteOperation { + // Optional key pattern for delete operations, e.g., "user:*" + keyPattern : String? + } + + annotation RedisStreamName { + // Name of the Redis Stream + name : String + } + + annotation RedisPubSubChannel { + // Pub/Sub channel name or pattern + channel : String + } +} + """.trimIndent() +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisUtils.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisUtils.kt new file mode 100644 index 0000000000..d2f66b6997 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/RedisUtils.kt @@ -0,0 +1,52 @@ +package com.orbitalhq.connectors.redis + +import com.orbitalhq.connectors.redis.RedisTaxi +import com.orbitalhq.models.AttributeName +import com.orbitalhq.schemas.Field +import com.orbitalhq.schemas.Type +import com.orbitalhq.schemas.fqn + +fun findKeyField(type: Type): Pair { + val keyField = type.getAttributesWithAnnotation("Id".fqn()) + return when (keyField.size) { + 1 -> keyField.entries.single().let { (k, v) -> k to v } + else -> error("Cannot persist type ${type.qualifiedName.longDisplayName} to Redis, as there are ${keyField.size} fields with an @Id annotation - expected exactly one.") + } +} + +fun getRedisKeyPattern(type: Type): String { + if (!type.hasMetadata(RedisTaxi.Annotations.RedisKey)) { + error("Cannot persist type ${type.qualifiedName.longDisplayName} to Redis, as it does not have a @${RedisTaxi.Annotations.RedisKey.longDisplayName} annotation.") + } + val metadata = type.getMetadata(RedisTaxi.Annotations.RedisKey) + return metadata.params["pattern"] as String +} + +fun getTTLSeconds(type: Type): Int? { + if (!type.hasMetadata(RedisTaxi.Annotations.RedisTTL)) { + return null + } + val metadata = type.getMetadata(RedisTaxi.Annotations.RedisTTL) + return metadata.params["seconds"] as? Int +} + +/** + * Build a Redis key from a pattern and a key value. + * For example: + * - pattern: "user:{userId}", keyValue: "123" -> "user:123" + * - pattern: "order:{orderId}", keyValue: "456" -> "order:456" + */ +fun buildRedisKey(pattern: String, keyValue: Any): String { + // Simple implementation: replace {anything} with the key value + return pattern.replace(Regex("\\{[^}]+\\}"), keyValue.toString()) +} + +/** + * Extract the scan pattern from a key pattern. + * For example: + * - pattern: "user:{userId}" -> "user:*" + * - pattern: "order:{orderId}" -> "order:*" + */ +fun getScanPattern(pattern: String): String { + return pattern.replace(Regex("\\{[^}]+\\}"), "*") +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisInvoker.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisInvoker.kt new file mode 100644 index 0000000000..e6ceb9c8bd --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisInvoker.kt @@ -0,0 +1,145 @@ +package com.orbitalhq.connectors.redis.invoker + +import arrow.core.Either +import com.orbitalhq.connectors.redis.RedisConnectionProvider +import com.orbitalhq.connectors.redis.RedisTaxi +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.query.QueryContextEventDispatcher +import com.orbitalhq.query.QueryContextSchemaProvider +import com.orbitalhq.query.RemoteCall +import com.orbitalhq.query.StreamErrorMessage +import com.orbitalhq.query.connectors.OperationInvoker +import com.orbitalhq.schemas.* +import kotlinx.coroutines.flow.Flow +import lang.taxi.services.OperationScope + +class RedisInvoker( + private val redisConnectionProvider: RedisConnectionProvider +) : OperationInvoker { + + private val mutatingInvoker = RedisMutatingInvoker() + private val queryInvoker = RedisQueryInvoker() + private val streamInvoker = RedisStreamInvoker() + + override fun canSupport(service: Service, operation: RemoteOperation): Boolean { + return service.hasMetadata(RedisTaxi.Annotations.RedisServiceAnnotation) + } + + override fun plan( + service: Service, + operation: RemoteOperation, + parameters: List>, + schema: Schema + ): RemoteCall { + val connectionName = getRedisConnectionName(service) + val (connection, config) = getRedisConnection(connectionName) + + return when { + operation.operationType == OperationScope.READ_ONLY && operation.operationKind != OperationKind.Stream -> { + queryInvoker.plan( + connection, + config, + service, + operation, + parameters, + schema + ) + } + + operation.operationType == OperationScope.READ_ONLY && operation.operationKind == OperationKind.Stream -> { + streamInvoker.plan( + connection, + config, + service, + operation, + parameters, + schema + ) + } + + operation.operationType == OperationScope.MUTATION -> { + mutatingInvoker.plan( + connection, + config, + service, + operation, + parameters, + schema + ) + } + + else -> { + error("No invoker strategy found for operation ${operation.qualifiedName.longDisplayName}") + } + } + } + + override suspend fun invoke( + service: Service, + operation: RemoteOperation, + parameters: List>, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + queryOptions: QueryOptions + ): Flow> { + val connectionName = getRedisConnectionName(service) + val (connection, config) = getRedisConnection(connectionName) + val schema = (eventDispatcher as QueryContextSchemaProvider).schema + + return when { + operation.operationType == OperationScope.READ_ONLY && operation.operationKind != OperationKind.Stream -> { + queryInvoker.invoke( + connection, + config, + service, + operation, + parameters, + eventDispatcher, + queryId, + queryOptions, + schema + ) + } + + operation.operationType == OperationScope.READ_ONLY && operation.operationKind == OperationKind.Stream -> { + streamInvoker.invoke( + connection, + config, + service, + operation, + parameters, + eventDispatcher, + queryId, + queryOptions, + schema + ) + } + + operation.operationType == OperationScope.MUTATION -> { + mutatingInvoker.invoke( + connection, + service, + operation, + parameters, + config, + eventDispatcher, + queryId, + queryOptions, + schema + ) + } + + else -> { + error("No invoker strategy found for operation ${operation.qualifiedName.longDisplayName}") + } + } + } + + private fun getRedisConnectionName(service: Service): String? { + val metadata = service.getMetadata(RedisTaxi.Annotations.RedisServiceAnnotation) + return metadata.params["connectionName"] as String? + } + + private fun getRedisConnection(connectionName: String?) = + redisConnectionProvider.redisConnection(connectionName) +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvoker.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvoker.kt new file mode 100644 index 0000000000..468d435c5d --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvoker.kt @@ -0,0 +1,379 @@ +package com.orbitalhq.connectors.redis.invoker + +import arrow.core.Either +import com.orbitalhq.connectors.config.redis.RedisConfiguration +import com.orbitalhq.connectors.redis.* +import com.orbitalhq.connectors.redis.serialization.RedisJsonValueWriter +import com.orbitalhq.models.DataSource +import com.orbitalhq.models.DataSourceUpdater +import com.orbitalhq.models.OperationResult +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.models.TypedNull +import com.orbitalhq.models.TypedObject +import com.orbitalhq.models.json.Jackson +import com.orbitalhq.models.json.right +import com.orbitalhq.query.CacheExchange +import com.orbitalhq.query.QueryContextEventDispatcher +import com.orbitalhq.query.RemoteCall +import com.orbitalhq.query.ResponseMessageType +import com.orbitalhq.query.StreamErrorMessage +import com.orbitalhq.query.tracing.CacheRequest +import com.orbitalhq.query.tracing.CacheResponse +import com.orbitalhq.query.tracing.OperationTraceSpan +import com.orbitalhq.query.tracing.SpanState +import com.orbitalhq.query.tracing.TraceEventDirection +import com.orbitalhq.query.tracing.TracingEventKind +import com.orbitalhq.schemas.Parameter +import com.orbitalhq.schemas.QueryOptions +import com.orbitalhq.schemas.RemoteOperation +import com.orbitalhq.schemas.Schema +import com.orbitalhq.schemas.Service +import io.lettuce.core.SetArgs +import io.lettuce.core.api.StatefulRedisConnection +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flowOf +import lang.taxi.types.PrimitiveType +import mu.KotlinLogging +import java.time.Duration +import java.time.Instant + +class RedisMutatingInvoker { + + companion object { + private val logger = KotlinLogging.logger {} + } + + fun plan( + connection: StatefulRedisConnection, + config: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + schema: Schema + ): RemoteCall { + val (_, valueToSave) = parameters[0] + val keyPattern = getRedisKeyPattern(valueToSave.type) + + return buildRemoteCall( + keyPattern, + service, + config.addresses.joinToString(), + operation, + "Available at execution time", + Duration.ZERO, + -1, + getVerb(operation), + config.connectionName + ) + } + + private fun doUpsert( + connection: StatefulRedisConnection, + parameters: List>, + schema: Schema, + operation: RemoteOperation, + reportResult: (String, String, Int, CacheExchange.CacheOperationVerb) -> DataSource, + traceContext: OperationTraceSpan, + connectionName: String + ): Flow> { + val (_, valueToSave) = parameters[0] + val keyPattern = getRedisKeyPattern(valueToSave.type) + require(valueToSave is TypedObject) { "Only TypedObjects are supported - Need to add support for ${valueToSave::class.simpleName}" } + + val (key, jsonValue) = RedisJsonValueWriter.getJsonStringAndKey(valueToSave, schema) + val redisKey = buildRedisKey(keyPattern, key) + + logger.debug { "Setting value in Redis with key $redisKey" } + + traceContext.emitEvent( + kind = TracingEventKind.OK, + spanState = SpanState.ACTIVE, + payloadType = parameters[0].first.type, + exchangeMetadata = CacheRequest(keyPattern, CacheExchange.CacheOperationVerb.UPDATE, connectionName) { + "Upsert key $redisKey to ${Jackson.defaultObjectMapper.writeValueAsString(valueToSave.toRawObject())}" + }, + verb = "Upsert", + direction = TraceEventDirection.OUTBOUND + ) + + // Get TTL from annotation or operation + val ttlSeconds = getTTLSeconds(operation, valueToSave.type) + + if (ttlSeconds != null) { + connection.sync().setex(redisKey, ttlSeconds.toLong(), jsonValue) + logger.debug { "Set key $redisKey with TTL of $ttlSeconds seconds" } + } else { + connection.sync().set(redisKey, jsonValue) + logger.debug { "Set key $redisKey without TTL" } + } + + val resultEvent = traceContext.emitEvent( + TracingEventKind.OK, + SpanState.COMPLETE, + valueToSave.type, + CacheResponse(1) { "Upserted 1 record to Redis with key $redisKey" }, + "Upsert response", + direction = TraceEventDirection.INBOUND + ) + + val dataSource = reportResult("SET $redisKey", keyPattern, 1, CacheExchange.CacheOperationVerb.UPDATE) + val updatedValue = DataSourceUpdater.update(valueToSave, dataSource) + return flowOf(updatedValue.right()) + } + + private fun getTTLSeconds(operation: RemoteOperation, type: com.orbitalhq.schemas.Type): Int? { + // Check operation annotation first + if (operation.hasMetadata(RedisTaxi.Annotations.RedisUpsertOperation.parameterizedName)) { + val upsertAnnotation = operation.firstMetadata(RedisTaxi.Annotations.RedisUpsertOperation.parameterizedName) + val ttl = upsertAnnotation.params["ttlSeconds"] as? Int + if (ttl != null) return ttl + } + + // Fall back to type annotation + return getTTLSeconds(type) + } + + private fun doDelete( + connection: StatefulRedisConnection, + parameters: List>, + schema: Schema, + operation: RemoteOperation, + reportAndGenerateDataSource: (String, String, Int, CacheExchange.CacheOperationVerb) -> DataSource, + traceContext: OperationTraceSpan, + connectionName: String + ): Flow> { + val deleteAnnotation = operation.firstMetadata(RedisTaxi.Annotations.RedisDeleteOperation.parameterizedName) + val keyPattern = deleteAnnotation.params["keyPattern"] as String? + ?: error("Operation ${operation.qualifiedName.parameterizedName} does not declare a keyPattern") + + traceContext.emitEvent( + kind = TracingEventKind.OK, + spanState = SpanState.ACTIVE, + payloadType = parameters.firstOrNull()?.first?.type, + exchangeMetadata = CacheRequest(keyPattern, CacheExchange.CacheOperationVerb.DELETE, connectionName), + verb = "Delete", + direction = TraceEventDirection.INBOUND + ) + + val deleteKey = parameters.singleOrNull()?.second + return if (deleteKey == null) { + deleteByPattern(keyPattern, connection, reportAndGenerateDataSource, schema, traceContext) + } else { + deleteByKey(deleteKey, keyPattern, connection, reportAndGenerateDataSource, schema, operation, traceContext) + } + } + + private fun deleteByKey( + deleteKey: TypedInstance, + keyPattern: String, + connection: StatefulRedisConnection, + reportAndGenerateDataSource: (String, String, Int, CacheExchange.CacheOperationVerb) -> DataSource, + schema: Schema, + operation: RemoteOperation, + traceContext: OperationTraceSpan + ): Flow> { + + val keyValue = deleteKey.toRawObject() ?: error("Cannot delete from Redis as provided key was null") + val redisKey = buildRedisKey(keyPattern, keyValue) + val deletedCount = connection.sync().del(redisKey) + val recordCount = deletedCount.toInt() + + val resultEvent = traceContext.emitEvent( + TracingEventKind.OK, + SpanState.COMPLETE, + operation.returnType, + CacheResponse(recordCount) { "Deleted $recordCount record from Redis with key $redisKey" }, + "Delete response", + direction = TraceEventDirection.INBOUND + ) + + val dataSource = reportAndGenerateDataSource( + "DEL $redisKey", + keyPattern, + recordCount, + CacheExchange.CacheOperationVerb.DELETE, + ) + + // For delete operations, we return TypedNull since the value no longer exists + val result = TypedNull.create(operation.returnType, source = dataSource) + return flowOf(result.right()) + } + + private fun deleteByPattern( + keyPattern: String, + connection: StatefulRedisConnection, + reportAndGenerateDataSource: (String, String, Int, CacheExchange.CacheOperationVerb) -> DataSource, + schema: Schema, + traceContext: OperationTraceSpan + ): Flow> { + logger.info { "Performing deleteByPattern with pattern $keyPattern" } + + // Scan for all keys matching the pattern + val scanPattern = if (keyPattern.contains("{")) { + getScanPattern(keyPattern) + } else { + keyPattern + } + + val keys = scanKeys(connection, scanPattern) + val deletedCount = if (keys.isNotEmpty()) { + connection.sync().del(*keys.toTypedArray()) + } else { + 0L + } + + val recordCount = deletedCount.toInt() + + val resultEvent = traceContext.emitEvent( + TracingEventKind.OK, + SpanState.COMPLETE, + schema.type(PrimitiveType.VOID), + CacheResponse(recordCount) { "Deleted $recordCount records from Redis matching pattern $scanPattern" }, + "Delete all response", + direction = TraceEventDirection.INBOUND + ) + + val dataSource = reportAndGenerateDataSource("DEL $scanPattern", keyPattern, recordCount, CacheExchange.CacheOperationVerb.DELETE) + return flowOf(TypedNull.create(schema.type(PrimitiveType.VOID), source = dataSource).right()) + } + + private fun scanKeys(connection: StatefulRedisConnection, pattern: String): List { + val keys = mutableListOf() + val scanArgs = io.lettuce.core.ScanArgs.Builder.matches(pattern).limit(100) + var cursor = connection.sync().scan(scanArgs) + + keys.addAll(cursor.keys) + while (!cursor.isFinished) { + cursor = connection.sync().scan(cursor, scanArgs) + keys.addAll(cursor.keys) + } + + return keys + } + + fun invoke( + connection: StatefulRedisConnection, + service: Service, + operation: RemoteOperation, + parameters: List>, + redisConnectionConfig: RedisConfiguration, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + queryOptions: QueryOptions, + schema: Schema + ): Flow> { + val startTime = Instant.now() + val traceContext = eventDispatcher.createOperationTraceSpan(service, operation, "") + + fun reportResult( + command: String, + keyPattern: String, + resultSize: Int, + verb: CacheExchange.CacheOperationVerb + ): DataSource { + val result = buildOperationResult( + service, + operation, + keyPattern, + parameters.map { it.second }, + redisConnectionConfig, + command, + Duration.between(startTime, Instant.now()), + resultSize, + verb + ) + eventDispatcher.reportRemoteOperationInvoked(result, queryId) + return result.asOperationReferenceDataSource() + } + + return when (getVerb(operation)) { + CacheExchange.CacheOperationVerb.UPDATE -> doUpsert( + connection, + parameters, + schema, + operation, + ::reportResult, + traceContext, + redisConnectionConfig.connectionName + ) + + CacheExchange.CacheOperationVerb.DELETE -> doDelete( + connection, + parameters, + schema, + operation, + ::reportResult, + traceContext, + redisConnectionConfig.connectionName + ) + + else -> error("Unexpected type of mutation for Redis: ${operation.qualifiedName.parameterizedName}") + } + } + + private fun getVerb(operation: RemoteOperation): CacheExchange.CacheOperationVerb { + return when { + operation.hasMetadata(RedisTaxi.Annotations.RedisUpsertOperation.parameterizedName) -> CacheExchange.CacheOperationVerb.UPDATE + operation.hasMetadata(RedisTaxi.Annotations.RedisDeleteOperation.parameterizedName) -> CacheExchange.CacheOperationVerb.DELETE + else -> error("Unexpected type of mutation for Redis: ${operation.qualifiedName.parameterizedName}") + } + } + + private fun buildOperationResult( + service: Service, + operation: RemoteOperation, + keyPattern: String, + parameters: List, + connectionConfig: RedisConfiguration, + command: String, + elapsed: Duration, + recordCount: Int, + verb: CacheExchange.CacheOperationVerb + ): OperationResult { + val remoteCall = buildRemoteCall( + keyPattern, + service, + connectionConfig.addresses.joinToString(), + operation, + command, + elapsed, + recordCount, + verb, + connectionConfig.connectionName + ) + return OperationResult.fromTypedInstances( + parameters, + remoteCall + ) + } + + private fun buildRemoteCall( + keyPattern: String, + service: Service, + redisAddresses: String, + operation: RemoteOperation, + command: String, + elapsed: Duration, + recordCount: Int, + verb: CacheExchange.CacheOperationVerb, + connectionName: String + ) = RemoteCall( + service = service.name, + address = redisAddresses, + operation = operation.name, + responseTypeName = operation.returnType.name, + requestBody = command, + durationMs = elapsed.toMillis(), + timestamp = Instant.now(), + responseMessageType = ResponseMessageType.FULL, + response = null, + exchange = CacheExchange( + connectionName = connectionName, + cacheName = keyPattern, + cacheKeyOrStatement = command, + cacheType = CacheExchange.CacheType.Redis, + recordCount = recordCount, + verb = verb + ), + ) +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisQueryInvoker.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisQueryInvoker.kt new file mode 100644 index 0000000000..4b951481c1 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisQueryInvoker.kt @@ -0,0 +1,458 @@ +package com.orbitalhq.connectors.redis.invoker + +import arrow.core.Either +import com.orbitalhq.connectors.config.redis.RedisConfiguration +import com.orbitalhq.connectors.getTaxiQlQuery +import com.orbitalhq.connectors.redis.* +import com.orbitalhq.connectors.redis.serialization.RedisJsonValueReader +import com.orbitalhq.models.DataSource +import com.orbitalhq.models.OperationResult +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.models.json.right +import com.orbitalhq.query.CacheExchange.CacheOperationVerb +import com.orbitalhq.query.QueryContextEventDispatcher +import com.orbitalhq.query.RemoteCall +import com.orbitalhq.query.ResponseMessageType +import com.orbitalhq.query.StreamErrorMessage +import com.orbitalhq.query.tracing.OperationTraceSpan +import com.orbitalhq.query.tracing.SpanState +import com.orbitalhq.query.tracing.TraceEventDirection +import com.orbitalhq.query.tracing.TracingEventKind +import com.orbitalhq.schemas.* +import io.lettuce.core.ScanArgs +import io.lettuce.core.api.StatefulRedisConnection +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.asFlow +import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.flow +import lang.taxi.query.TaxiQLQueryString +import lang.taxi.query.TaxiQlQuery +import lang.taxi.types.ObjectType +import mu.KotlinLogging +import java.time.Duration +import java.time.Instant + +private data class RedisCallParams( + val keyPattern: String, + val taxiQlQueryString: TaxiQLQueryString?, + val parsedQuery: TaxiQlQuery?, + val keyField: Field, + val idLookupValue: Any?, + val unwrappedReturnType: Type +) + +class RedisQueryInvoker { + companion object { + private val logger = KotlinLogging.logger {} + } + + fun plan( + connection: StatefulRedisConnection, + redisConfiguration: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + schema: Schema + ): RemoteCall { + val executionConfig = getExecutionConfig(operation, parameters, schema) + return when { + executionConfig.idLookupValue != null -> buildRemoteCall( + service, + redisConfiguration.addresses.joinToString(), + operation, + executionConfig.keyPattern, + redisConfiguration.connectionName, + executionConfig.idLookupValue.toString(), + Duration.ZERO, + -1, + CacheOperationVerb.GET, + true + ) + + queryIsFindAll(executionConfig.parsedQuery) || executionConfig.parsedQuery == null -> buildRemoteCall( + service, + redisConfiguration.addresses.joinToString(), + operation, + executionConfig.keyPattern, + redisConfiguration.connectionName, + "find *", + Duration.ZERO, + -1, + CacheOperationVerb.READ_MANY, + false + ) + + else -> buildRemoteCall( + service, + redisConfiguration.addresses.joinToString(), + operation, + executionConfig.keyPattern, + redisConfiguration.connectionName, + "criteria", + Duration.ZERO, + -1, + CacheOperationVerb.READ_MANY, + false + ) + } + } + + private fun getExecutionConfig( + operation: RemoteOperation, + parameters: List>, + schema: Schema + ): RedisCallParams { + val unwrappedReturnType = operation.returnType.collectionType ?: operation.returnType + val keyPattern = getRedisKeyPattern(unwrappedReturnType) + val (taxiQlQueryString, parsedQuery) = if (parameters.isNotEmpty()) { + val (taxiQlQueryString) = parameters.getTaxiQlQuery() + val (parsedQuery) = schema.parseQuery(taxiQlQueryString) + taxiQlQueryString to parsedQuery + } else null to null + val (_, keyField) = findKeyField(unwrappedReturnType) + val idLookupValue = parsedQuery?.let { getIdLookupValue(parsedQuery, keyField) } + + return RedisCallParams( + keyPattern, + taxiQlQueryString, + parsedQuery, + keyField, + idLookupValue, + unwrappedReturnType + ) + } + + fun invoke( + connection: StatefulRedisConnection, + redisConnectionConfig: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + queryOptions: QueryOptions, + schema: Schema, + ): Flow> { + val startTime = Instant.now() + val traceContext = eventDispatcher.createOperationTraceSpan(service, operation, "") + val executionConfig = getExecutionConfig(operation, parameters, schema) + + return when { + executionConfig.idLookupValue != null -> findById( + executionConfig.taxiQlQueryString!!, + executionConfig.keyPattern, + executionConfig.idLookupValue, + connection, + service, + operation, + parameters, + redisConnectionConfig, + startTime, + eventDispatcher, + queryId, + executionConfig.unwrappedReturnType, + schema, + traceContext + ) + + queryIsFindAll(executionConfig.parsedQuery) || executionConfig.parsedQuery == null -> findAll( + executionConfig.taxiQlQueryString, + executionConfig.keyPattern, + service, + operation, + parameters, + redisConnectionConfig, + startTime, + connection, + eventDispatcher, + queryId, + executionConfig.unwrappedReturnType, + schema, + traceContext + ) + + else -> findByCriteria( + executionConfig.taxiQlQueryString!!, + executionConfig.keyPattern, + service, + operation, + parameters, + redisConnectionConfig, + startTime, + connection, + eventDispatcher, + queryId, + executionConfig.unwrappedReturnType, + schema, + traceContext + ) + } + } + + private fun findById( + taxiQlQueryString: TaxiQLQueryString, + keyPattern: String, + idLookupValue: Any, + connection: StatefulRedisConnection, + service: Service, + operation: RemoteOperation, + parameters: List>, + redisConnectionConfig: RedisConfiguration, + startTime: Instant, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + unwrappedReturnType: Type, + schema: Schema, + traceContext: OperationTraceSpan + ): Flow> { + val redisKey = buildRedisKey(keyPattern, idLookupValue) + logger.debug { "Query of $taxiQlQueryString converted to Redis GET for key: $redisKey" } + + return flow { + val jsonValue = connection.sync().get(redisKey) + val recordCount = if (jsonValue == null) 0 else 1 + + traceContext.addEventNow( + TracingEventKind.CACHE_REQUEST, + TraceEventDirection.OUT, + redisConnectionConfig.addresses.joinToString(), + mapOf( + "operation" to "GET", + "key" to redisKey, + "records" to recordCount + ) + ) + + if (jsonValue != null) { + val dataSource = DataSource.cache( + redisConnectionConfig.connectionName, + redisConnectionConfig.driverName, + "GET" + ) + val typedInstanceResult = RedisJsonValueReader.toTypedInstance( + jsonValue, + unwrappedReturnType as ObjectType, + schema, + dataSource + ) + + when (typedInstanceResult) { + is Either.Right -> { + emit(typedInstanceResult.value.right()) + emitOperationResult( + eventDispatcher, + queryId, + service, + operation, + parameters, + recordCount, + startTime, + traceContext + ) + } + is Either.Left -> emit(typedInstanceResult.value) + } + } else { + logger.debug { "Key $redisKey not found in Redis" } + emitOperationResult( + eventDispatcher, + queryId, + service, + operation, + parameters, + 0, + startTime, + traceContext + ) + } + } + } + + private fun findAll( + taxiQlQueryString: TaxiQLQueryString?, + keyPattern: String, + service: Service, + operation: RemoteOperation, + parameters: List>, + redisConnectionConfig: RedisConfiguration, + startTime: Instant, + connection: StatefulRedisConnection, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + unwrappedReturnType: Type, + schema: Schema, + traceContext: OperationTraceSpan + ): Flow> { + val scanPattern = getScanPattern(keyPattern) + logger.debug { "Query of $taxiQlQueryString converted to Redis SCAN with pattern: $scanPattern" } + + return flow { + val keys = scanKeys(connection, scanPattern) + val recordCount = keys.size + + traceContext.addEventNow( + TracingEventKind.CACHE_REQUEST, + TraceEventDirection.OUT, + redisConnectionConfig.addresses.joinToString(), + mapOf( + "operation" to "SCAN", + "pattern" to scanPattern, + "records" to recordCount + ) + ) + + val dataSource = DataSource.cache( + redisConnectionConfig.connectionName, + redisConnectionConfig.driverName, + "SCAN" + ) + + // Fetch all values + keys.asFlow() + .flatMapConcat { key -> + flow { + val jsonValue = connection.sync().get(key) + if (jsonValue != null) { + val typedInstanceResult = RedisJsonValueReader.toTypedInstance( + jsonValue, + unwrappedReturnType as ObjectType, + schema, + dataSource + ) + when (typedInstanceResult) { + is Either.Right -> emit(typedInstanceResult.value.right()) + is Either.Left -> emit(typedInstanceResult.value) + } + } + } + } + .collect { emit(it) } + + emitOperationResult( + eventDispatcher, + queryId, + service, + operation, + parameters, + recordCount, + startTime, + traceContext + ) + } + } + + private fun findByCriteria( + taxiQlQueryString: TaxiQLQueryString, + keyPattern: String, + service: Service, + operation: RemoteOperation, + parameters: List>, + redisConnectionConfig: RedisConfiguration, + startTime: Instant, + connection: StatefulRedisConnection, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + unwrappedReturnType: Type, + schema: Schema, + traceContext: OperationTraceSpan + ): Flow> { + // For criteria-based queries, we use SCAN to get all keys and let the + // existing expression handlers filter the results + val scanPattern = getScanPattern(keyPattern) + logger.debug { "Query of $taxiQlQueryString converted to Redis SCAN with pattern: $scanPattern (client-side filtering)" } + + return findAll( + taxiQlQueryString, + keyPattern, + service, + operation, + parameters, + redisConnectionConfig, + startTime, + connection, + eventDispatcher, + queryId, + unwrappedReturnType, + schema, + traceContext + ) + } + + private fun scanKeys(connection: StatefulRedisConnection, pattern: String): List { + val keys = mutableListOf() + val scanArgs = ScanArgs.Builder.matches(pattern).limit(100) + var cursor = connection.sync().scan(scanArgs) + + keys.addAll(cursor.keys) + while (!cursor.isFinished) { + cursor = connection.sync().scan(cursor, scanArgs) + keys.addAll(cursor.keys) + } + + return keys + } + + private suspend fun emitOperationResult( + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + service: Service, + operation: RemoteOperation, + parameters: List>, + recordCount: Int, + startTime: Instant, + traceContext: OperationTraceSpan + ) { + val duration = Duration.between(startTime, Instant.now()) + traceContext.complete(SpanState.SUCCESS) + eventDispatcher.publishOperation( + ResponseMessageType.OperationResponse( + OperationResult( + service, + operation, + queryId, + parameters, + recordCount, + duration + ) + ) + ) + } + + private fun buildRemoteCall( + service: Service, + address: String, + operation: RemoteOperation, + keyPattern: String, + connectionName: String, + lookupDescription: String, + duration: Duration, + recordCount: Int, + cacheOperation: CacheOperationVerb, + isCacheable: Boolean + ): RemoteCall { + return RemoteCall.cache( + serviceName = service.name, + operationName = operation.name, + connectionName = connectionName, + resourceName = keyPattern, + remoteAddress = address, + remoteCallStatementDescription = lookupDescription, + operationReturnType = operation.returnType.qualifiedName, + duration = duration, + recordCount = recordCount, + cacheOperation = cacheOperation, + isCacheable = isCacheable + ) + } +} + +private fun queryIsFindAll(query: TaxiQlQuery?): Boolean { + return query?.let { it.projectedType is ObjectType && it.criteria.isEmpty() } ?: false +} + +private fun getIdLookupValue(query: TaxiQlQuery, keyField: Field): Any? { + val constraint = query.criteria.find { constraint -> + constraint.targetPath == keyField.qualifiedName.longDisplayName + } + return constraint?.value +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisStreamInvoker.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisStreamInvoker.kt new file mode 100644 index 0000000000..e482ddf1d1 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/invoker/RedisStreamInvoker.kt @@ -0,0 +1,379 @@ +package com.orbitalhq.connectors.redis.invoker + +import arrow.core.Either +import com.orbitalhq.connectors.config.redis.RedisConfiguration +import com.orbitalhq.connectors.redis.* +import com.orbitalhq.connectors.redis.serialization.RedisJsonValueReader +import com.orbitalhq.models.DataSource +import com.orbitalhq.models.OperationResult +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.models.json.right +import com.orbitalhq.query.CacheExchange +import com.orbitalhq.query.QueryContextEventDispatcher +import com.orbitalhq.query.RemoteCall +import com.orbitalhq.query.ResponseMessageType +import com.orbitalhq.query.StreamErrorMessage +import com.orbitalhq.query.tracing.OperationTraceSpan +import com.orbitalhq.query.tracing.SpanState +import com.orbitalhq.query.tracing.TraceEventDirection +import com.orbitalhq.query.tracing.TracingEventKind +import com.orbitalhq.schemas.* +import io.lettuce.core.RedisPubSubAdapter +import io.lettuce.core.StreamMessage +import io.lettuce.core.XReadArgs +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import lang.taxi.query.TaxiQLQueryString +import lang.taxi.query.TaxiQlQuery +import lang.taxi.types.ObjectType +import mu.KotlinLogging +import java.time.Duration +import java.time.Instant +import java.util.concurrent.atomic.AtomicBoolean + +class RedisStreamInvoker { + companion object { + private val logger = KotlinLogging.logger {} + } + + fun plan( + connection: StatefulRedisConnection, + redisConfiguration: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + schema: Schema + ): RemoteCall { + val unwrappedReturnType = operation.returnType.collectionType ?: operation.returnType + + // Determine if this is a Redis Stream or Pub/Sub stream + val isRedisStream = unwrappedReturnType.hasMetadata(RedisTaxi.Annotations.RedisStreamName.parameterizedName) + val isPubSub = unwrappedReturnType.hasMetadata(RedisTaxi.Annotations.RedisPubSubChannel.parameterizedName) + + val resourceName = when { + isRedisStream -> { + val metadata = unwrappedReturnType.getMetadata(RedisTaxi.Annotations.RedisStreamName) + "stream:${metadata.params["name"]}" + } + isPubSub -> { + val metadata = unwrappedReturnType.getMetadata(RedisTaxi.Annotations.RedisPubSubChannel) + "pubsub:${metadata.params["channel"]}" + } + else -> { + val keyPattern = getRedisKeyPattern(unwrappedReturnType) + "keyspace:$keyPattern" + } + } + + return RemoteCall.cache( + serviceName = service.name, + operationName = operation.name, + connectionName = redisConfiguration.connectionName, + resourceName = resourceName, + remoteAddress = redisConfiguration.addresses.joinToString(), + remoteCallStatementDescription = "stream", + operationReturnType = operation.returnType.qualifiedName, + duration = Duration.ZERO, + recordCount = -1, + cacheOperation = CacheExchange.CacheOperationVerb.READ_MANY, + isCacheable = false + ) + } + + fun invoke( + connection: StatefulRedisConnection, + redisConnectionConfig: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + queryOptions: QueryOptions, + schema: Schema, + ): Flow> { + val startTime = Instant.now() + val traceContext = eventDispatcher.createOperationTraceSpan(service, operation, "") + val unwrappedReturnType = operation.returnType.collectionType ?: operation.returnType + + // Determine the streaming mechanism based on annotations + return when { + unwrappedReturnType.hasMetadata(RedisTaxi.Annotations.RedisStreamName.parameterizedName) -> { + invokeRedisStream( + connection, + redisConnectionConfig, + service, + operation, + parameters, + eventDispatcher, + queryId, + schema, + unwrappedReturnType, + traceContext + ) + } + unwrappedReturnType.hasMetadata(RedisTaxi.Annotations.RedisPubSubChannel.parameterizedName) -> { + invokePubSub( + connection, + redisConnectionConfig, + service, + operation, + parameters, + eventDispatcher, + queryId, + schema, + unwrappedReturnType, + traceContext + ) + } + else -> { + // Default: use keyspace notifications + invokeKeyspaceNotifications( + connection, + redisConnectionConfig, + service, + operation, + parameters, + eventDispatcher, + queryId, + schema, + unwrappedReturnType, + traceContext + ) + } + } + } + + private fun invokeRedisStream( + connection: StatefulRedisConnection, + redisConnectionConfig: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + schema: Schema, + unwrappedReturnType: Type, + traceContext: OperationTraceSpan + ): Flow> { + val streamMetadata = unwrappedReturnType.getMetadata(RedisTaxi.Annotations.RedisStreamName) + val streamName = streamMetadata.params["name"] as String + + logger.debug { "Starting Redis Stream consumer for stream: $streamName" } + + traceContext.addEventNow( + TracingEventKind.CACHE_REQUEST, + TraceEventDirection.OUT, + redisConnectionConfig.addresses.joinToString(), + mapOf( + "operation" to "XREAD", + "stream" to streamName, + "mode" to "streaming" + ) + ) + + return flow { + val dataSource = DataSource.cache( + redisConnectionConfig.connectionName, + redisConnectionConfig.driverName, + "XREAD" + ) + + var lastId = "$" // Start from latest + + while (currentCoroutineContext().isActive) { + try { + val messages = connection.sync().xread( + XReadArgs.Builder.block(1000).count(10), + XReadArgs.StreamOffset.from(streamName, lastId) + ) + + for (message in messages) { + val body = message.body + // Assume the stream message has a "data" field with JSON + val jsonData = body["data"] ?: continue + + val typedInstanceResult = RedisJsonValueReader.toTypedInstance( + jsonData, + unwrappedReturnType as ObjectType, + schema, + dataSource + ) + + when (typedInstanceResult) { + is Either.Right -> emit(typedInstanceResult.value.right()) + is Either.Left -> emit(typedInstanceResult.value) + } + + lastId = message.id + } + } catch (e: Exception) { + if (e is CancellationException) throw e + logger.error(e) { "Error reading from Redis Stream $streamName" } + } + } + + traceContext.complete(SpanState.SUCCESS) + } + } + + private fun invokePubSub( + connection: StatefulRedisConnection, + redisConnectionConfig: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + schema: Schema, + unwrappedReturnType: Type, + traceContext: OperationTraceSpan + ): Flow> { + val pubSubMetadata = unwrappedReturnType.getMetadata(RedisTaxi.Annotations.RedisPubSubChannel) + val channel = pubSubMetadata.params["channel"] as String + + logger.debug { "Starting Redis Pub/Sub consumer for channel: $channel" } + + traceContext.addEventNow( + TracingEventKind.CACHE_REQUEST, + TraceEventDirection.OUT, + redisConnectionConfig.addresses.joinToString(), + mapOf( + "operation" to "SUBSCRIBE", + "channel" to channel, + "mode" to "streaming" + ) + ) + + val flowSink = MutableSharedFlow>() + val pubSubConnection = connection.statefulConnection.connectPubSub() + val isActive = AtomicBoolean(true) + + val dataSource = DataSource.cache( + redisConnectionConfig.connectionName, + redisConnectionConfig.driverName, + "PUBSUB" + ) + + pubSubConnection.addListener(object : RedisPubSubAdapter() { + override fun message(channel: String, message: String) { + if (!isActive.get()) return + + val typedInstanceResult = RedisJsonValueReader.toTypedInstance( + message, + unwrappedReturnType as ObjectType, + schema, + dataSource + ) + + runBlocking { + when (typedInstanceResult) { + is Either.Right -> flowSink.emit(typedInstanceResult.value.right()) + is Either.Left -> flowSink.emit(typedInstanceResult.value) + } + } + } + }) + + pubSubConnection.async().subscribe(channel) + + return flowSink.asSharedFlow() + .onCompletion { + isActive.set(false) + pubSubConnection.async().unsubscribe(channel) + pubSubConnection.close() + traceContext.complete(SpanState.SUCCESS) + logger.debug { "Redis Pub/Sub listener for channel $channel closed" } + } + } + + private fun invokeKeyspaceNotifications( + connection: StatefulRedisConnection, + redisConnectionConfig: RedisConfiguration, + service: Service, + operation: RemoteOperation, + parameters: List>, + eventDispatcher: QueryContextEventDispatcher, + queryId: String, + schema: Schema, + unwrappedReturnType: Type, + traceContext: OperationTraceSpan + ): Flow> { + val keyPattern = getRedisKeyPattern(unwrappedReturnType) + val scanPattern = getScanPattern(keyPattern) + + logger.debug { "Starting Redis keyspace notifications for pattern: $scanPattern" } + + // Keyspace notification pattern: __keyspace@0__:pattern + // For all keys, we listen to all events + val notificationChannel = "__keyspace@0__:$scanPattern" + + traceContext.addEventNow( + TracingEventKind.CACHE_REQUEST, + TraceEventDirection.OUT, + redisConnectionConfig.addresses.joinToString(), + mapOf( + "operation" to "PSUBSCRIBE", + "pattern" to notificationChannel, + "mode" to "streaming" + ) + ) + + val flowSink = MutableSharedFlow>() + val pubSubConnection = connection.statefulConnection.connectPubSub() + val isActive = AtomicBoolean(true) + + val dataSource = DataSource.cache( + redisConnectionConfig.connectionName, + redisConnectionConfig.driverName, + "KEYSPACE" + ) + + pubSubConnection.addListener(object : RedisPubSubAdapter() { + override fun message(pattern: String, channel: String, message: String) { + if (!isActive.get()) return + + // Extract the key from the channel name (__keyspace@0__:key) + val key = channel.substringAfter("__keyspace@0__:") + + // message contains the operation type: "set", "del", "expire", etc. + // We're interested in "set" and "setex" events + if (message in listOf("set", "setex", "hset")) { + // Fetch the value + val jsonValue = connection.sync().get(key) + if (jsonValue != null) { + val typedInstanceResult = RedisJsonValueReader.toTypedInstance( + jsonValue, + unwrappedReturnType as ObjectType, + schema, + dataSource + ) + + runBlocking { + when (typedInstanceResult) { + is Either.Right -> flowSink.emit(typedInstanceResult.value.right()) + is Either.Left -> flowSink.emit(typedInstanceResult.value) + } + } + } + } + } + }) + + pubSubConnection.async().psubscribe(notificationChannel) + + return flowSink.asSharedFlow() + .onCompletion { + isActive.set(false) + pubSubConnection.async().punsubscribe(notificationChannel) + pubSubConnection.close() + traceContext.complete(SpanState.SUCCESS) + logger.debug { "Redis keyspace notification listener for pattern $notificationChannel closed" } + } + } +} + +// Extension to get the underlying connection for creating pub/sub connections +private val StatefulRedisConnection.statefulConnection: StatefulRedisConnection + get() = this diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueReader.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueReader.kt new file mode 100644 index 0000000000..a3053b69e9 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueReader.kt @@ -0,0 +1,19 @@ +package com.orbitalhq.connectors.redis.serialization + +import arrow.core.Either +import com.orbitalhq.models.DataSource +import com.orbitalhq.models.TypedInstance +import com.orbitalhq.query.StreamErrorMessage +import com.orbitalhq.schemas.Schema +import lang.taxi.types.ObjectType + +object RedisJsonValueReader { + fun toTypedInstance( + jsonString: String, + type: ObjectType, + schema: Schema, + dataSource: DataSource + ): Either { + return TypedInstance.tryFrom(schema.type(type), jsonString, schema, source = dataSource) + } +} diff --git a/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueWriter.kt b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueWriter.kt new file mode 100644 index 0000000000..18a4072ab1 --- /dev/null +++ b/connectors/redis-connector/src/main/java/com/orbitalhq/connectors/redis/serialization/RedisJsonValueWriter.kt @@ -0,0 +1,23 @@ +package com.orbitalhq.connectors.redis.serialization + +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.orbitalhq.models.TypedObject +import com.orbitalhq.schemas.Schema + +object RedisJsonValueWriter { + val OBJECT_MAPPER = jacksonObjectMapper() + + fun getJsonStringAndKey(typedInstance: TypedObject, schema: Schema): Pair { + val json = OBJECT_MAPPER.writeValueAsString(typedInstance.toRawObject()) + + val (keyField) = findKeyField(typedInstance.type) + val key = typedInstance[keyField].toRawObject() + ?: error("Field $keyField is the @Id, but is null or not present") + + return key to json + } + + fun getJsonString(typedInstance: TypedObject): String { + return OBJECT_MAPPER.writeValueAsString(typedInstance.toRawObject()) + } +} From ea6b0823a9e58bde3933c1251ba1e0f928a50e82 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 15 Jan 2026 17:51:37 +0000 Subject: [PATCH 2/2] test(connectors): add comprehensive Redis connector tests Add test coverage following Hazelcast test patterns: Test Infrastructure: - BaseRedisInvokerTest: Base class with vyneWithRedis() helper - TestRedisProvider: Mock provider for dependency injection - Testcontainers integration for real Redis instance - Automatic cleanup between tests Query Tests (RedisQueryTest): - ID lookup using specific keys - Find all with SCAN operations - Criteria-based queries (single and multiple matches) - Compound criteria with multiple conditions - Non-existent key handling - Key pattern verification (film:{filmId}) Mutation Tests (RedisMutatingInvokerTest): - Write objects to Redis with JSON serialization - Verify operation results and cache exchange metadata - Read and write multiple objects - Delete by key - Delete all by pattern - TTL support with expiration verification Streaming Tests (RedisStreamQueryTest): - Redis Streams consumer (XREAD) - Pub/Sub streaming (SUBSCRIBE) - Keyspace notifications - Tests marked as @Disabled with setup requirements documented Dependencies Added: - mockito-kotlin for mocking - kotest for assertions - turbine for Flow testing - Testcontainers already present All tests follow the established patterns from Hazelcast tests using: - kotest matchers for assertions - Turbine for testing Kotlin Flows - setupDefaultItems() helper for test data - Proper cleanup in @AfterEach --- connectors/redis-connector/pom.xml | 15 + .../redis/invoker/BaseRedisInvokerTest.kt | 118 ++++++++ .../redis/invoker/RedisMutatingInvokerTest.kt | 262 ++++++++++++++++++ .../redis/invoker/RedisQueryTest.kt | 166 +++++++++++ .../redis/invoker/RedisStreamQueryTest.kt | 183 ++++++++++++ 5 files changed, 744 insertions(+) create mode 100644 connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/BaseRedisInvokerTest.kt create mode 100644 connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvokerTest.kt create mode 100644 connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisQueryTest.kt create mode 100644 connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisStreamQueryTest.kt diff --git a/connectors/redis-connector/pom.xml b/connectors/redis-connector/pom.xml index ecba9b8edf..2944ad87c8 100644 --- a/connectors/redis-connector/pom.xml +++ b/connectors/redis-connector/pom.xml @@ -61,6 +61,21 @@ + + org.mockito + mockito-kotlin + test + + + io.kotest + kotest-runner-junit5-jvm + test + + + io.kotest + kotest-assertions-core-jvm + test + app.cash.turbine turbine-jvm diff --git a/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/BaseRedisInvokerTest.kt b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/BaseRedisInvokerTest.kt new file mode 100644 index 0000000000..2b99420098 --- /dev/null +++ b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/BaseRedisInvokerTest.kt @@ -0,0 +1,118 @@ +package com.orbitalhq.connectors.redis.invoker + +import com.nhaarman.mockito_kotlin.doReturn +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import com.orbitalhq.Vyne +import com.orbitalhq.connectors.config.redis.RedisConfiguration +import com.orbitalhq.connectors.redis.RedisConnectionProvider +import com.orbitalhq.connectors.redis.RedisTaxi +import com.orbitalhq.query.VyneQlGrammar +import com.orbitalhq.schemas.taxi.TaxiSchema +import com.orbitalhq.stubbing.StubService +import com.orbitalhq.testVyneWithStub +import io.lettuce.core.api.StatefulRedisConnection +import org.testcontainers.containers.GenericContainer +import org.testcontainers.utility.DockerImageName + +abstract class BaseRedisInvokerTest { + companion object { + // Shared Redis container for all tests + val redisContainer: KGenericContainer by lazy { + KGenericContainer(DockerImageName.parse("redis:7-alpine")) + .withExposedPorts(6379) + .apply { start() } + } + } + + open fun defaultSchema(): String = """ + import com.orbitalhq.redis.RedisService + import com.orbitalhq.redis.RedisKey + import com.orbitalhq.redis.RedisTTL + import com.orbitalhq.redis.RedisUpsertOperation + import com.orbitalhq.redis.RedisDeleteOperation + + model Person { + name : PersonName inherits String + } + type Language inherits String + + @RedisKey(pattern = "film:{filmId}") + closed model Film { + @Id + filmId : FilmId inherits Int + title : Title inherits String + languages: Language[] + director : Person + cast : Person[] + } + + @RedisService(connectionName = "test") + service RedisService { + @RedisUpsertOperation + write operation upsert(Film):Film + + @RedisDeleteOperation(keyPattern = "film:*") + write operation deleteAll() + + @RedisDeleteOperation(keyPattern = "film:{filmId}") + write operation delete(FilmId):Film + + table films : Film[] + } + """ + + fun vyneWithRedis(schema: String = defaultSchema()): Triple, Vyne, StubService> { + // Create real Redis connection to test container + val connection = io.lettuce.core.RedisClient.create( + "redis://${redisContainer.host}:${redisContainer.getMappedPort(6379)}" + ).connect() + + // Clear any existing data + connection.sync().flushdb() + + val redisProvider = TestRedisProvider(connection) + val redisInvoker = RedisInvoker(redisProvider) + val (vyne, stub) = testVyneWithStub( + TaxiSchema.fromStrings( + listOf( + VyneQlGrammar.QUERY_TYPE_TAXI, + RedisTaxi.schema, + schema, + ) + ), listOf(redisInvoker) + ) + return Triple(connection, vyne, stub) + } + + // Helper to clean up connection + fun cleanup(connection: StatefulRedisConnection) { + connection.sync().flushdb() + connection.close() + } +} + +// Kotlin-friendly wrapper for Testcontainers +class KGenericContainer(imageName: DockerImageName) : GenericContainer(imageName) + +/** + * Test provider that supplies a real Redis connection for testing + */ +class TestRedisProvider( + private val connection: StatefulRedisConnection +) : RedisConnectionProvider { + override fun provide(config: RedisConfiguration): StatefulRedisConnection { + return connection + } + + override fun redisConnection(connectionName: String?): Pair, RedisConfiguration> { + val redisConfiguration: RedisConfiguration = mock { } + whenever(redisConfiguration.connectionName).doReturn(connectionName ?: "test") + whenever(redisConfiguration.addresses).doReturn(listOf("localhost:6379")) + return connection to redisConfiguration + } + + override fun canProvideRedisConnection(connectionName: String?): Boolean { + return true + } +} diff --git a/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvokerTest.kt b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvokerTest.kt new file mode 100644 index 0000000000..4cdd50a6d1 --- /dev/null +++ b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisMutatingInvokerTest.kt @@ -0,0 +1,262 @@ +package com.orbitalhq.connectors.redis.invoker + +import com.orbitalhq.firstTypedObject +import com.orbitalhq.models.OperationResult +import com.orbitalhq.models.json.parseJson +import com.orbitalhq.query.CacheExchange +import com.orbitalhq.query.QueryContextEventBroker +import com.orbitalhq.query.RemoteCallOperationResultHandler +import com.orbitalhq.query.tracing.TraceContext +import com.orbitalhq.rawObjects +import io.kotest.common.runBlocking +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.nulls.shouldBeNull +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test + +class RedisMutatingInvokerTest : BaseRedisInvokerTest() { + + @AfterEach + fun cleanupRedis() { + redisContainer.execInContainer("redis-cli", "FLUSHDB") + } + + @Test + fun `can write object to Redis`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + val operationResults = mutableListOf() + val remoteCallOperationResultHandler = object : RemoteCallOperationResultHandler { + override fun recordResult(operation: OperationResult, queryId: String) { + operationResults.add(operation) + } + } + val queryEventBroker = QueryContextEventBroker(traceSpan = TraceContext.noOp().rootSpan) + queryEventBroker.addHandler(remoteCallOperationResultHandler) + + val upsertedInstance = vyne.query( + vyneQlQuery = """given { film:Film = { + | filmId : 100, + | title : "Star Wars", + | languages : ["English" , "American" ], + | director : { name : "George" }, + | cast : [ {name : "Mark" }, {name: "Carrie" } ] + |} } + |call RedisService::upsert""".trimMargin(), + eventBroker = queryEventBroker + ) + .firstTypedObject() + + val operationResult = operationResults.first() + operationResult.remoteCall.exchange.shouldBeInstanceOf() + (operationResult.remoteCall.exchange as CacheExchange).cacheType.shouldBe(CacheExchange.CacheType.Redis) + + // Verify the data is actually in Redis + val storedValue = connection.sync().get("film:100") + storedValue.shouldNotBeNull() + storedValue.contains("Star Wars").shouldBe(true) + storedValue.contains("\"filmId\":100").shouldBe(true) + } finally { + cleanup(connection) + } + } + + @Test + fun `can read and write multiple objects`(): Unit = runBlocking { + val schema = """ + import com.orbitalhq.redis.RedisService + import com.orbitalhq.redis.RedisKey + import com.orbitalhq.redis.RedisUpsertOperation + import com.orbitalhq.redis.RedisDeleteOperation + + type FilmId inherits Int + type Title inherits String + + closed model ApiFilm { + @Id + filmId : FilmId + title : Title + } + + service ApiService { + operation getFilms():ApiFilm[] + } + + @RedisKey(pattern = "film:{filmId}") + closed parameter model RedisFilm { + @Id + id : FilmId + name : Title + } + + @RedisService(connectionName = "test") + service RedisService { + @RedisUpsertOperation + write operation upsert(RedisFilm):RedisFilm + + @RedisDeleteOperation(keyPattern = "film:*") + write operation deleteAll() + + table films : RedisFilm[] + } + + """.trimIndent() + + val (connection, vyne, stub) = vyneWithRedis(schema) + try { + stub.addResponse( + "getFilms", vyne.parseJson( + "ApiFilm[]", """ + [ + { "filmId" : 100, "title" : "Star Wars" }, + { "filmId" : 200, "title" : "Empire Strikes Back" }, + { "filmId" : 300, "title" : "Return of the Jedi" } + ] + """.trimIndent() + ) + ) + + val upsertResult = vyne.query( + """ + find { ApiFilm[] } + call RedisService::upsert + """.trimIndent() + ) + .rawObjects() + upsertResult.shouldHaveSize(3) + + // Verify all keys exist in Redis + val keys = connection.sync().keys("film:*") + keys.shouldHaveSize(3) + + val findResult = vyne.query("""find { RedisFilm[] }""") + .rawObjects() + findResult.shouldHaveSize(3) + } finally { + cleanup(connection) + } + } + + @Test + fun `can delete by key`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + // Insert some data + vyne.query( + """given { film:Film = { + | filmId : 100, + | title : "Star Wars", + | languages : ["English"], + | director : { name : "George" }, + | cast : [] + |} } + |call RedisService::upsert""".trimMargin() + ).firstTypedObject() + + // Verify it exists + connection.sync().get("film:100").shouldNotBeNull() + + // Delete it + vyne.query("""given { filmId : FilmId = 100 } call RedisService::delete""") + .firstTypedObject() + + // Verify it's gone + connection.sync().get("film:100").shouldBeNull() + } finally { + cleanup(connection) + } + } + + @Test + fun `can delete all by pattern`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + // Insert multiple films + vyne.query( + """given { film:Film = { + | filmId : 100, title : "Film 1", + | languages : [], director : { name : "Director" }, cast : [] + |} } + |call RedisService::upsert""".trimMargin() + ).firstTypedObject() + + vyne.query( + """given { film:Film = { + | filmId : 200, title : "Film 2", + | languages : [], director : { name : "Director" }, cast : [] + |} } + |call RedisService::upsert""".trimMargin() + ).firstTypedObject() + + vyne.query( + """given { film:Film = { + | filmId : 300, title : "Film 3", + | languages : [], director : { name : "Director" }, cast : [] + |} } + |call RedisService::upsert""".trimMargin() + ).firstTypedObject() + + // Verify all exist + connection.sync().keys("film:*").shouldHaveSize(3) + + // Delete all + vyne.query("""call RedisService::deleteAll""") + .firstTypedObject() + + // Verify all are gone + connection.sync().keys("film:*").shouldHaveSize(0) + } finally { + cleanup(connection) + } + } + + @Test + fun `can write with TTL`(): Unit = runBlocking { + val schemaWithTTL = """ + import com.orbitalhq.redis.RedisService + import com.orbitalhq.redis.RedisKey + import com.orbitalhq.redis.RedisTTL + import com.orbitalhq.redis.RedisUpsertOperation + + @RedisKey(pattern = "film:{filmId}") + @RedisTTL(seconds = 2) + closed model Film { + @Id + filmId : FilmId inherits Int + title : Title inherits String + } + + @RedisService(connectionName = "test") + service RedisService { + @RedisUpsertOperation + write operation upsert(Film):Film + } + """.trimIndent() + + val (connection, vyne, stub) = vyneWithRedis(schemaWithTTL) + try { + vyne.query( + """given { film:Film = { filmId : 100, title : "Star Wars" } } + |call RedisService::upsert""".trimMargin() + ).firstTypedObject() + + // Verify it exists with TTL + connection.sync().get("film:100").shouldNotBeNull() + val ttl = connection.sync().ttl("film:100") + ttl.shouldNotBeNull() + (ttl > 0).shouldBe(true) + (ttl <= 2).shouldBe(true) + + // Wait for expiration + Thread.sleep(2500) + + // Verify it's gone + connection.sync().get("film:100").shouldBeNull() + } finally { + cleanup(connection) + } + } +} diff --git a/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisQueryTest.kt b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisQueryTest.kt new file mode 100644 index 0000000000..5c74f55981 --- /dev/null +++ b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisQueryTest.kt @@ -0,0 +1,166 @@ +package com.orbitalhq.connectors.redis.invoker + +import com.orbitalhq.Vyne +import com.orbitalhq.firstRawObject +import com.orbitalhq.rawObjects +import io.kotest.common.runBlocking +import io.kotest.matchers.collections.shouldHaveSize +import io.kotest.matchers.nulls.shouldNotBeNull +import io.kotest.matchers.shouldBe +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test + +class RedisQueryTest : BaseRedisInvokerTest() { + + @AfterEach + fun cleanupRedis() { + // Ensure Redis is cleaned between tests + redisContainer.execInContainer("redis-cli", "FLUSHDB") + } + + @Test + fun `can use a taxiql statement to query Redis using an id`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + setupDefaultItems(vyne) + val matchedFilm = vyne.query("""find { Film( FilmId == 100 ) }""") + .rawObjects() + matchedFilm.shouldNotBeNull() + matchedFilm.shouldHaveSize(1) + matchedFilm.single()["filmId"].shouldBe(100) + matchedFilm.single()["title"].shouldBe("Star Wars") + } finally { + cleanup(connection) + } + } + + @Test + fun `can do a find all from Redis`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + setupDefaultItems(vyne) + val matchedFilm = vyne.query("""find { Film[] }""") + .rawObjects() + matchedFilm.shouldNotBeNull() + matchedFilm.shouldHaveSize(3) + } finally { + cleanup(connection) + } + } + + @Test + fun `can use a taxiql statement to query Redis using criteria returning a single match`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + setupDefaultItems(vyne) + val matchedFilm = vyne.query("""find { Film[]( FilmId < 105 ) }""") + .rawObjects() + matchedFilm.shouldNotBeNull() + matchedFilm.shouldHaveSize(1) + matchedFilm.single().get("title").shouldBe("Star Wars") + } finally { + cleanup(connection) + } + } + + @Test + fun `can use a taxiql statement to query Redis using criteria returning multiple matches`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + setupDefaultItems(vyne) + val matchedFilm = vyne.query("""find { Film[]( FilmId < 115 ) }""") + .rawObjects() + matchedFilm.shouldNotBeNull() + matchedFilm.shouldHaveSize(2) + } finally { + cleanup(connection) + } + } + + @Test + fun `can use a taxiql statement to query Redis using compound criteria`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + setupDefaultItems(vyne) + val matchedFilm = vyne.query("""find { Film[]( FilmId < 125 && Title == "Star Wars" ) }""") + .rawObjects() + matchedFilm.shouldNotBeNull() + matchedFilm.shouldHaveSize(1) + matchedFilm.single().get("title").shouldBe("Star Wars") + } finally { + cleanup(connection) + } + } + + @Test + fun `can query Redis when key does not exist`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + // Don't setup any data + val matchedFilm = vyne.query("""find { Film( FilmId == 999 ) }""") + .rawObjects() + matchedFilm.shouldNotBeNull() + matchedFilm.shouldHaveSize(0) + } finally { + cleanup(connection) + } + } + + @Test + fun `can verify Redis keys are stored with correct pattern`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + setupDefaultItems(vyne) + + // Verify keys exist in Redis with the expected pattern + val keys = connection.sync().keys("film:*") + keys.shouldHaveSize(3) + keys.shouldBe(setOf("film:100", "film:110", "film:120")) + + // Verify we can get the value directly + val film100 = connection.sync().get("film:100") + film100.shouldNotBeNull() + film100.contains("Star Wars").shouldBe(true) + } finally { + cleanup(connection) + } + } + + private suspend fun setupDefaultItems(vyne: Vyne) { + vyne.query( + """given { film:Film = { + | filmId : 100, + | title : "Star Wars", + | languages : ["English" , "American" ], + | director : { name : "George" }, + | cast : [ {name : "Mark" }, {name: "Carrie" } ] + |} } + |call RedisService::upsert""".trimMargin() + ) + .firstRawObject() + + vyne.query( + """given { film:Film = { + | filmId : 110, + | title : "Empire Strikes Back", + | languages : ["English" , "American" ], + | director : { name : "George" }, + | cast : [ {name : "Mark" }, {name: "Carrie" } ] + |} } + |call RedisService::upsert""".trimMargin() + ) + .firstRawObject() + + vyne.query( + """given { film:Film = { + | filmId : 120, + | title : "Return of the Jedi", + | languages : ["English" , "American" ], + | director : { name : "George" }, + | cast : [ {name : "Mark" }, {name: "Carrie" } ] + |} } + |call RedisService::upsert""".trimMargin() + ) + .firstRawObject() + } +} diff --git a/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisStreamQueryTest.kt b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisStreamQueryTest.kt new file mode 100644 index 0000000000..7a65926ec6 --- /dev/null +++ b/connectors/redis-connector/src/test/java/com/orbitalhq/connectors/redis/invoker/RedisStreamQueryTest.kt @@ -0,0 +1,183 @@ +package com.orbitalhq.connectors.redis.invoker + +import app.cash.turbine.test +import com.orbitalhq.Vyne +import com.orbitalhq.expectTypedObject +import com.orbitalhq.firstRawObject +import io.kotest.common.runBlocking +import kotlinx.coroutines.delay +import lang.taxi.utils.log +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Disabled +import org.junit.jupiter.api.Test +import kotlin.time.Duration + +class RedisStreamQueryTest : BaseRedisInvokerTest() { + + @AfterEach + fun cleanupRedis() { + redisContainer.execInContainer("redis-cli", "FLUSHDB") + } + + val valuesToInsert = listOf( + """ + { + filmId : 100, + title : "Star Wars", + languages : ["English" , "American" ], + director : { name : "George" }, + cast : [ {name : "Mark" }, {name: "Carrie" } ] + } + """, + """ + { + filmId : 200, + title : "Empire Strikes Back", + languages : ["English" , "American" ], + director : { name : "George" }, + cast : [ {name : "Mark" }, {name: "Carrie" } ] + } + """, + """ + { + filmId : 300, + title : "Return of the Jedi", + languages : ["English" , "American" ], + director : { name : "George" }, + cast : [ {name : "Mark" }, {name: "Carrie" } ] + } + """ + ) + + @Test + @Disabled("Streaming tests require keyspace notifications enabled - complex to set up in testcontainers") + fun `can query a stream from Redis using keyspace notifications`(): Unit = runBlocking { + val (connection, vyne, stub) = vyneWithRedis() + try { + // Note: This would require enabling keyspace notifications in Redis: + // CONFIG SET notify-keyspace-events KEA + vyne.query("""stream { Film }""") + .results + .test(timeout = Duration.parse("5s")) { + write(vyne, valuesToInsert[0]) + expectTypedObject() + write(vyne, valuesToInsert[1]) + expectTypedObject() + write(vyne, valuesToInsert[2]) + expectTypedObject() + cancel() + } + } finally { + cleanup(connection) + } + } + + @Test + @Disabled("Pub/Sub streaming requires channel setup") + fun `can query a stream from Redis Pub Sub`(): Unit = runBlocking { + val pubSubSchema = """ + import com.orbitalhq.redis.RedisService + import com.orbitalhq.redis.RedisKey + import com.orbitalhq.redis.RedisPubSubChannel + import com.orbitalhq.redis.RedisUpsertOperation + + @RedisKey(pattern = "film:{filmId}") + @RedisPubSubChannel(channel = "films") + closed model Film { + @Id + filmId : FilmId inherits Int + title : Title inherits String + } + + @RedisService(connectionName = "test") + service RedisService { + @RedisUpsertOperation + write operation upsert(Film):Film + + stream films : Stream + } + """.trimIndent() + + val (connection, vyne, stub) = vyneWithRedis(pubSubSchema) + try { + // Start streaming + vyne.query("""stream { Film }""") + .results + .test(timeout = Duration.parse("5s")) { + // Publish messages to the channel + publishToChannel(connection, "films", valuesToInsert[0]) + expectTypedObject() + publishToChannel(connection, "films", valuesToInsert[1]) + expectTypedObject() + cancel() + } + } finally { + cleanup(connection) + } + } + + @Test + @Disabled("Redis Streams require stream setup and XADD operations") + fun `can query a Redis Stream`(): Unit = runBlocking { + val streamSchema = """ + import com.orbitalhq.redis.RedisService + import com.orbitalhq.redis.RedisKey + import com.orbitalhq.redis.RedisStreamName + import com.orbitalhq.redis.RedisUpsertOperation + + @RedisKey(pattern = "film:{filmId}") + @RedisStreamName(name = "films-stream") + closed model Film { + @Id + filmId : FilmId inherits Int + title : Title inherits String + } + + @RedisService(connectionName = "test") + service RedisService { + stream films : Stream + } + """.trimIndent() + + val (connection, vyne, stub) = vyneWithRedis(streamSchema) + try { + // Add messages to the stream using XADD + connection.sync().xadd("films-stream", mapOf("data" to """{"filmId":100,"title":"Star Wars"}""")) + + vyne.query("""stream { Film }""") + .results + .test(timeout = Duration.parse("5s")) { + expectTypedObject() + cancel() + } + } finally { + cleanup(connection) + } + } + + private fun write(vyne: Vyne, valueToInsert: String) { + runBlocking { + val result = vyne.query( + """ + given { film: Film = $valueToInsert } + call RedisService::upsert + """.trimIndent() + ) + .firstRawObject() + log().info("Wrote 1 record") + } + } + + private fun publishToChannel( + connection: io.lettuce.core.api.StatefulRedisConnection, + channel: String, + message: String + ) { + runBlocking { + // Convert to JSON and publish + val jsonMessage = """{"filmId":100,"title":"Star Wars"}""" + connection.sync().publish(channel, jsonMessage) + log().info("Published message to channel $channel") + } + } +}