Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions nebula-dsl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@
<version>5.5.0</version>
</dependency>

<!-- Redis -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.3.2.RELEASE</version>
</dependency>

<!-- HikariCP -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.orbitalhq.nebula.kafka.KafkaDsl
import com.orbitalhq.nebula.logging.LogMessage
import com.orbitalhq.nebula.logging.StackLogStream
import com.orbitalhq.nebula.mongo.MongoDsl
import com.orbitalhq.nebula.redis.RedisDsl
import com.orbitalhq.nebula.s3.S3Dsl
import com.orbitalhq.nebula.sql.SqlDsl
import com.orbitalhq.nebula.taxi.TaxiPublisherDsl
Expand All @@ -33,7 +34,7 @@ data class NebulaStackWithSource(
class NebulaStack(
val name: StackName = NameGenerator.generateName(),
initialComponents: List<InfrastructureComponent<*>> = emptyList()
) : InfraDsl, KafkaDsl, S3Dsl, HttpDsl, SqlDsl, HazelcastDsl, MongoDsl, TaxiPublisherDsl {
) : InfraDsl, KafkaDsl, S3Dsl, HttpDsl, SqlDsl, HazelcastDsl, RedisDsl, MongoDsl, TaxiPublisherDsl {
private val _components = mutableListOf<InfrastructureComponent<*>>()

private val isStarted = AtomicBoolean(false)
Expand Down
30 changes: 30 additions & 0 deletions nebula-dsl/src/main/kotlin/com/orbitalhq/nebula/redis/RedisDsl.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.orbitalhq.nebula.redis

import com.orbitalhq.nebula.InfraDsl
import com.orbitalhq.nebula.core.ComponentName
import mu.KLogger
import mu.KotlinLogging

private val logger = KotlinLogging.logger {}

interface RedisDsl : InfraDsl {
fun redis(imageName: String = "redis:7-alpine",
componentName: ComponentName = "redis",
dsl: RedisBuilder.(KLogger) -> Unit): RedisExecutor {
val builder = RedisBuilder(imageName, componentName)
builder.dsl(logger)
return this.add(RedisExecutor(builder.build(), listOf(logger.name)))
}
}

class RedisBuilder(
private val imageName: String,
private val componentName: ComponentName,
) {
fun build(): RedisConfig = RedisConfig(imageName, componentName)
}

data class RedisConfig(
val imageName: String,
val componentName: ComponentName,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.orbitalhq.nebula.redis

import com.orbitalhq.nebula.HostConfig
import com.orbitalhq.nebula.InfrastructureComponent
import com.orbitalhq.nebula.NebulaConfig
import com.orbitalhq.nebula.StackRunner
import com.orbitalhq.nebula.containerInfoFrom
import com.orbitalhq.nebula.core.ComponentInfo
import com.orbitalhq.nebula.core.ComponentLifecycleEvent
import com.orbitalhq.nebula.core.ComponentName
import com.orbitalhq.nebula.core.ComponentType
import com.orbitalhq.nebula.events.ComponentLifecycleEventSource
import com.orbitalhq.nebula.logging.LogStream
import com.orbitalhq.nebula.logging.LoggerName
import io.github.oshai.kotlinlogging.KotlinLogging
import org.testcontainers.containers.GenericContainer
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.utility.DockerImageName
import reactor.core.publisher.Flux

val StackRunner.redis: List<RedisExecutor>
get() {
return this.component<RedisExecutor>()
}


data class RedisContainerConfig(
val port: Int
)
class RedisExecutor(private val config: RedisConfig, loggers: List<LoggerName>) : InfrastructureComponent<RedisContainerConfig> {
companion object {
private val logger = KotlinLogging.logger {}
}

private lateinit var container: GenericContainer<*>
override val name: ComponentName = config.componentName
override val type: ComponentType = "redis"
override val logStream: LogStream = LogStream(name, slf4jLoggerNames = loggers + listOf(RedisExecutor::class))
private val eventSource = ComponentLifecycleEventSource(logStream = logStream)

override fun start(nebulaConfig: NebulaConfig, hostConfig: HostConfig): ComponentInfo<RedisContainerConfig> {
eventSource.starting()
container = GenericContainer(DockerImageName.parse(config.imageName))
.withExposedPorts(6379)
.withNetwork(nebulaConfig.network)
.withNetworkAliases(config.componentName)
container.waitingFor(Wait.forListeningPort())
eventSource.startContainerAndEmitEvents(container, name)

componentInfo = ComponentInfo(
containerInfoFrom(container),
RedisContainerConfig(
container.firstMappedPort
),
type = type,
name = name,
id = id
)
eventSource.running()
logger.info { "Redis container started" }
return componentInfo!!
}

override fun stop() {
eventSource.stopping()
eventSource.stopContainerAndEmitEvents(container)
}

override var componentInfo: ComponentInfo<RedisContainerConfig>? = null
private set

override val lifecycleEvents: Flux<ComponentLifecycleEvent> = eventSource.events
override val currentState: ComponentLifecycleEvent
get() {
return eventSource.currentState
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.orbitalhq.nebula.redis

import com.orbitalhq.nebula.StackRunner
import com.orbitalhq.nebula.stack
import com.orbitalhq.nebula.start
import io.kotest.core.spec.style.DescribeSpec
import io.kotest.matchers.shouldBe
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisURI

class RedisExecutorTest : DescribeSpec({
lateinit var infra: StackRunner

describe("redis executor") {
it("should create a redis instance that can be connected to") {
infra = stack {
redis { }
}.start()

val port = infra.redis.single().componentInfo!!.componentConfig.port
val redisUri = RedisURI.Builder
.redis("localhost", port)
.build()

val client = RedisClient.create(redisUri)
val connection = client.connect()
val syncCommands = connection.sync()

// Test basic operations
syncCommands.set("test-key", "test-value")
val value = syncCommands.get("test-key")
value shouldBe "test-value"

connection.close()
client.shutdown()
}

it("should support multiple operations") {
infra = stack {
redis { }
}.start()

val port = infra.redis.single().componentInfo!!.componentConfig.port
val redisUri = RedisURI.Builder
.redis("localhost", port)
.build()

val client = RedisClient.create(redisUri)
val connection = client.connect()
val syncCommands = connection.sync()

// Test multiple key-value operations
syncCommands.set("key1", "value1")
syncCommands.set("key2", "value2")
syncCommands.set("key3", "value3")

syncCommands.get("key1") shouldBe "value1"
syncCommands.get("key2") shouldBe "value2"
syncCommands.get("key3") shouldBe "value3"

// Test delete operation
syncCommands.del("key2")
syncCommands.get("key2") shouldBe null

connection.close()
client.shutdown()
}
}

}) {
}
Loading