Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
815874d
feat(engine): introduce StorageBackend abstraction layer
em3s Feb 5, 2026
1740f3c
feat(engine): add StorageBackend abstraction interfaces
em3s Feb 5, 2026
04e9720
feat(engine): add MemoryStorageBucket implementation
em3s Feb 5, 2026
81c271b
feat(engine): add MemoryStorageBackend implementation
em3s Feb 5, 2026
a1d03a3
feat(engine): add HBaseStorageBucket implementation
em3s Feb 5, 2026
311a1e2
feat(engine): add HBaseStorageBackend implementation
em3s Feb 5, 2026
8ec4f64
feat(engine): add DefaultStorageBackendFactory
em3s Feb 5, 2026
cf3397c
feat(engine): integrate StorageBackend into Graph
em3s Feb 5, 2026
bbe025f
feat(engine): Step 9 - Update HBaseOptions to use DefaultStorageBacke…
em3s Feb 5, 2026
493c1c4
feat(engine): Step 9 - Add EmbeddedStorageBackend for testing cluster
em3s Feb 5, 2026
4205aab
refactor(engine): rename MockStorageBackend to MockHBaseStorageBackend
em3s Feb 5, 2026
b7910e8
fix(engine): isolate buckets in MemoryStorageBackend
em3s Feb 5, 2026
db6df3b
refactor(engine): move MockHBaseStorageBackend to hbase package
em3s Feb 5, 2026
f5d0da8
refactor(engine): remove DefaultHBaseCluster
em3s Feb 5, 2026
269aa95
refactor(engine): rename EmbeddedStorageBackend to MiniHBaseStorageBa…
em3s Feb 5, 2026
f627cfc
refactor(engine): rename to HBaseTestingStorageBackend and fix server…
em3s Feb 5, 2026
dcc1f3f
fix(engine): address code review issues
em3s Feb 5, 2026
c3ad231
refactor(engine): improve StorageBackend design and address code review
em3s Feb 5, 2026
1926cf3
refactor(engine): extract DatastoreUri utility and add tests
em3s Feb 5, 2026
7d13aab
Merge remote-tracking branch 'origin/main' into feat/issue-173-engine…
em3s Feb 5, 2026
00d48ba
refactor(engine): simplify DefaultStorageBackendFactory initialization
em3s Feb 5, 2026
94a96db
feat(engine): add input validation to DatastoreUri.parse()
em3s Feb 5, 2026
3c04c7c
fix(engine): address code review round 3 issues
em3s Feb 6, 2026
2a6ca86
Merge remote-tracking branch 'origin/main' into feat/issue-173-engine…
em3s Feb 9, 2026
2dcf40c
fix(engine): update DefaultHBaseClusterTest for merge resolution
em3s Feb 9, 2026
f316409
Merge remote-tracking branch 'origin/main' into feat/issue-173-engine…
em3s Feb 9, 2026
95e9fce
fix(engine): resolve merge conflicts with main after storage backend PRs
em3s Feb 9, 2026
218c9ff
refactor(engine): remove v2 storage duplicates, wire v2 engine to v3 …
em3s Feb 9, 2026
e4df820
fix(engine): revert DatastoreUri to main — no uppercase/hyphen support
em3s Feb 9, 2026
17d06c1
chore: trigger PR update
em3s Feb 9, 2026
7347ac7
refactor(engine): rename Datastore*Label to HBaseStorageBackend*Label…
em3s Feb 9, 2026
f41829c
refactor(engine): make HBaseStorageTable implement HBaseTable via del…
em3s Feb 9, 2026
2e258ff
refactor(engine): rename StorageTable.getAll → get and batchAll → bat…
em3s Feb 9, 2026
20e1de7
refactor(engine): rename HBaseTable.get(List)/batch(List) to getAll/b…
em3s Feb 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.kakao.actionbase.engine.storage

import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTables

import reactor.core.publisher.Mono

/**
* Provides HBaseTables for v2 Label implementations that need direct HBase table access
* (e.g., Filters, CellUtil) beyond what StorageTable supports.
*/
@Deprecated("backwards compatibility for v2, use StorageBackend instead")
interface HBaseTablesProvider {
fun getHBaseTables(
namespace: String,
name: String,
): Mono<HBaseTables>
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.kakao.actionbase.engine.storage.hbase

import com.kakao.actionbase.engine.storage.HBaseTablesProvider
import com.kakao.actionbase.engine.storage.StorageBackend
import com.kakao.actionbase.engine.storage.StorageTable
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTable
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTables

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.AsyncAdmin
import org.apache.hadoop.hbase.client.AsyncConnection
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.security.UserGroupInformation
Expand All @@ -17,7 +20,8 @@ import reactor.core.scheduler.Schedulers

class HBaseStorageBackend private constructor(
private val connectionMono: Mono<AsyncConnection>,
) : StorageBackend {
) : StorageBackend,
HBaseTablesProvider {
override fun getStorageTable(
namespace: String,
name: String,
Expand All @@ -28,6 +32,18 @@ class HBaseStorageBackend private constructor(
HBaseStorageTable(hbaseTable)
}

override fun getHBaseTables(
namespace: String,
name: String,
): Mono<HBaseTables> =
connectionMono.map { conn ->
val table = conn.getTable(TableName.valueOf(namespace, name))
val hbaseTable = HBaseTable.create(table)
HBaseTables(hbaseTable, hbaseTable)
}

fun getAdminMono(): Mono<AsyncAdmin> = connectionMono.map { it.admin }.cache()

override fun close() {
connectionMono.block()?.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import reactor.core.publisher.Mono

class HBaseStorageTable(
private val table: HBaseTable,
) : StorageTable {
) : StorageTable,
HBaseTable by table {
override fun get(key: ByteArray): Mono<ByteArray?> {
val get = Get(key).addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER)
return table.get(get).handle { result, sink ->
Expand All @@ -31,7 +32,7 @@ class HBaseStorageTable(

override fun get(keys: List<ByteArray>): Mono<List<HBaseRecord>> {
val gets = keys.map { Get(it).addColumn(Constants.DEFAULT_COLUMN_FAMILY, Constants.DEFAULT_QUALIFIER) }
return table.get(gets).map { results ->
return table.getAll(gets).map { results ->
results.filter { !it.isEmpty }.map { result ->
HBaseRecord(
key = result.row,
Expand Down Expand Up @@ -107,7 +108,7 @@ class HBaseStorageTable(
)
}
}
return table.batch(mutations)
return table.batchAll(mutations)
}

override fun exists(key: ByteArray): Mono<Boolean> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.kakao.actionbase.engine.storage.hbase

import com.kakao.actionbase.engine.storage.HBaseTablesProvider
import com.kakao.actionbase.engine.storage.StorageBackend
import com.kakao.actionbase.engine.storage.StorageTable
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseConnections
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTable
import com.kakao.actionbase.v2.engine.storage.hbase.HBaseTables
import com.kakao.actionbase.v2.engine.storage.hbase.impl.NewMockTable

import org.apache.hadoop.hbase.TableName
Expand All @@ -17,7 +19,9 @@ import reactor.core.publisher.Mono
*
* Each namespace + name combination gets its own isolated table.
*/
class MockHBaseStorageBackend : StorageBackend {
class MockHBaseStorageBackend :
StorageBackend,
HBaseTablesProvider {
override fun getStorageTable(
namespace: String,
name: String,
Expand All @@ -26,6 +30,14 @@ class MockHBaseStorageBackend : StorageBackend {
return Mono.just(HBaseStorageTable(hbaseTable))
}

override fun getHBaseTables(
namespace: String,
name: String,
): Mono<HBaseTables> {
val hbaseTable = createMockHBaseTable(namespace, name)
return Mono.just(HBaseTables(hbaseTable, hbaseTable))
}

override fun close() {
// nothing to close
}
Expand Down
10 changes: 4 additions & 6 deletions engine/src/main/kotlin/com/kakao/actionbase/v2/engine/Graph.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.kakao.actionbase.v2.engine

import com.kakao.actionbase.engine.storage.DefaultStorageBackendFactory
import com.kakao.actionbase.v2.core.code.EdgeEncoderFactory
import com.kakao.actionbase.v2.core.code.EmptyEdgeIdEncoder
import com.kakao.actionbase.v2.core.code.IdEdgeEncoder
Expand All @@ -16,7 +17,6 @@ import com.kakao.actionbase.v2.engine.cdc.CdcContext
import com.kakao.actionbase.v2.engine.cdc.CdcFactory
import com.kakao.actionbase.v2.engine.client.kafka.KafkaClientFactory
import com.kakao.actionbase.v2.engine.client.web.WebClientFactory
import com.kakao.actionbase.v2.engine.compat.DefaultHBaseCluster
import com.kakao.actionbase.v2.engine.edge.MutationResult
import com.kakao.actionbase.v2.engine.edge.MutationResultItem
import com.kakao.actionbase.v2.engine.entity.AliasEntity
Expand Down Expand Up @@ -92,7 +92,6 @@ class Graph(
override val metastore: Database,
override val metadataTable: MetadataTable,
override val edgeEncoderFactory: EdgeEncoderFactory,
override val datastore: DefaultHBaseCluster,
private val systemStorages: Map<EntityName, StorageEntity>,
config: GraphConfig,
serviceLabel: Label,
Expand Down Expand Up @@ -892,7 +891,7 @@ class Graph(
intervalDisposable?.dispose()
log.info("Disposed Flux.interval for reloading metastore - {}", intervalDisposable)
HBaseConnections.closeConnections().block()
DefaultHBaseCluster.INSTANCE.close()
DefaultStorageBackendFactory.close()
}

fun status(name: EntityName): Mono<String> = getLabel(name).status()
Expand Down Expand Up @@ -941,7 +940,8 @@ class Graph(
kafkaClientFactory: KafkaClientFactory,
webClientFactory: WebClientFactory,
): Graph {
DefaultHBaseCluster.initialize(config.hbase)
// Initialize storage backend if not already initialized (idempotent)
DefaultStorageBackendFactory.initialize(config.hbase)
log.info("phase: {}", config.phase)
log.info("tenant: {}", config.tenant)
log.info("graph config: {}", config)
Expand Down Expand Up @@ -999,7 +999,6 @@ class Graph(
metadataTable,
edgeEncoderFactory,
storageEntities,
DefaultHBaseCluster.INSTANCE,
)

val serviceLabel =
Expand Down Expand Up @@ -1061,7 +1060,6 @@ class Graph(
defaults.metastore,
defaults.metadataTable,
defaults.edgeEncoderFactory,
defaults.datastore,
defaults.storages,
config,
serviceLabel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.kakao.actionbase.v2.engine

import com.kakao.actionbase.engine.EngineConstants
import com.kakao.actionbase.v2.core.code.EdgeEncoderFactory
import com.kakao.actionbase.v2.engine.compat.DefaultHBaseCluster
import com.kakao.actionbase.v2.engine.entity.EntityName
import com.kakao.actionbase.v2.engine.entity.StorageEntity
import com.kakao.actionbase.v2.engine.metadata.StorageType
Expand All @@ -16,7 +15,6 @@ interface GraphDefaults {
val metadataTable: MetadataTable
val storages: Map<EntityName, StorageEntity>
val edgeEncoderFactory: EdgeEncoderFactory
val datastore: DefaultHBaseCluster

fun getStorage(uri: String): StorageEntity? =
when {
Expand All @@ -35,5 +33,4 @@ data class AbstractGraphDefaults(
override val metadataTable: MetadataTable,
override val edgeEncoderFactory: EdgeEncoderFactory,
override val storages: Map<EntityName, StorageEntity>,
override val datastore: DefaultHBaseCluster,
) : GraphDefaults
Loading
Loading