Skip to content

Commit b4d75b8

Browse files
[kv] Implement KvTablet RocksDB Lazy Open
1 parent b03cec4 commit b4d75b8

15 files changed

Lines changed: 4408 additions & 295 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1919,6 +1919,23 @@ public class ConfigOptions {
19191919
.withDescription(
19201920
"The max fetch size for fetching log to apply to kv during recovering kv.");
19211921

1922+
// ------------------------------------------------------------------------
1923+
// ConfigOptions for KV lazy open
1924+
// ------------------------------------------------------------------------
1925+
1926+
public static final ConfigOption<Boolean> KV_LAZY_OPEN_ENABLED =
1927+
key("kv.lazy-open.enabled")
1928+
.booleanType()
1929+
.defaultValue(false)
1930+
.withDescription("Whether to enable KvTablet lazy open.");
1931+
1932+
public static final ConfigOption<Duration> KV_LAZY_OPEN_IDLE_TIMEOUT =
1933+
key("kv.lazy-open.idle-timeout")
1934+
.durationType()
1935+
.defaultValue(Duration.ofHours(24))
1936+
.withDescription(
1937+
"Idle time before an open KvTablet is eligible for release back to lazy state.");
1938+
19221939
// ------------------------------------------------------------------------
19231940
// ConfigOptions for metrics
19241941
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ public class MetricNames {
147147
"preWriteBufferTruncateAsDuplicatedPerSecond";
148148
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
149149
"preWriteBufferTruncateAsErrorPerSecond";
150+
public static final String KV_TABLET_OPEN_COUNT = "kvTabletOpenCount";
151+
public static final String KV_TABLET_LAZY_COUNT = "kvTabletLazyCount";
152+
public static final String KV_TABLET_FAILED_COUNT = "kvTabletFailedCount";
150153

151154
// --------------------------------------------------------------------------------------------
152155
// RocksDB metrics
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.kv;
19+
20+
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.utils.clock.Clock;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.io.Closeable;
27+
import java.util.Collection;
28+
import java.util.Comparator;
29+
import java.util.List;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.ScheduledFuture;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.function.Supplier;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* Periodically checks OPEN KvTablets and releases idle ones back to LAZY state. Tablets are sorted
38+
* by last access time (LRU). Operates directly on {@link KvTablet} — no dependency on Replica
39+
* layer.
40+
*/
41+
public class KvIdleReleaseController implements Closeable {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(KvIdleReleaseController.class);
44+
45+
private final ScheduledExecutorService scheduler;
46+
private final Supplier<Collection<KvTablet>> tabletSupplier;
47+
private final Clock clock;
48+
49+
private final long checkIntervalMs;
50+
private final long idleIntervalMs;
51+
52+
private volatile ScheduledFuture<?> scheduledTask;
53+
54+
public KvIdleReleaseController(
55+
ScheduledExecutorService scheduler,
56+
Supplier<Collection<KvTablet>> tabletSupplier,
57+
Clock clock,
58+
long checkIntervalMs,
59+
long idleIntervalMs) {
60+
this.scheduler = scheduler;
61+
this.tabletSupplier = tabletSupplier;
62+
this.clock = clock;
63+
this.checkIntervalMs = checkIntervalMs;
64+
this.idleIntervalMs = idleIntervalMs;
65+
}
66+
67+
public void start() {
68+
scheduledTask =
69+
scheduler.scheduleWithFixedDelay(
70+
this::checkAndRelease,
71+
checkIntervalMs,
72+
checkIntervalMs,
73+
TimeUnit.MILLISECONDS);
74+
LOG.info(
75+
"KvIdleReleaseController started: checkInterval={}ms, idleInterval={}ms",
76+
checkIntervalMs,
77+
idleIntervalMs);
78+
}
79+
80+
@VisibleForTesting
81+
void checkAndRelease() {
82+
try {
83+
Collection<KvTablet> tablets = tabletSupplier.get();
84+
long now = clock.milliseconds();
85+
86+
// Collect only idle tablets first, then sort by coldest (LRU)
87+
List<ReleaseCandidate> idleCandidates =
88+
tablets.stream()
89+
.filter(t -> now - t.getLastAccessTimestamp() > idleIntervalMs)
90+
.map(t -> new ReleaseCandidate(t, t.getLastAccessTimestamp()))
91+
.sorted(Comparator.comparingLong(c -> c.lastAccessTimestamp))
92+
.collect(Collectors.toList());
93+
94+
if (idleCandidates.isEmpty()) {
95+
LOG.debug("Idle release round: no idle tablets found");
96+
return;
97+
}
98+
99+
LOG.info("Idle release round: open={}, idle={}", tablets.size(), idleCandidates.size());
100+
101+
int released = 0;
102+
for (ReleaseCandidate candidate : idleCandidates) {
103+
KvTablet tablet = candidate.tablet;
104+
105+
if (tablet.canRelease(idleIntervalMs, now)) {
106+
try {
107+
boolean success = tablet.releaseKv();
108+
if (success) {
109+
released++;
110+
LOG.debug(
111+
"Released KvTablet for {} (idle {}ms)",
112+
tablet.getTableBucket(),
113+
now - candidate.lastAccessTimestamp);
114+
}
115+
} catch (Exception e) {
116+
LOG.warn("Failed to release KvTablet for {}", tablet.getTableBucket(), e);
117+
}
118+
}
119+
}
120+
121+
if (released > 0) {
122+
LOG.info(
123+
"Idle release round complete: released={}/{}",
124+
released,
125+
idleCandidates.size());
126+
}
127+
} catch (Exception e) {
128+
LOG.error("Error during idle release check", e);
129+
}
130+
}
131+
132+
@Override
133+
public void close() {
134+
if (scheduledTask != null) {
135+
scheduledTask.cancel(false);
136+
}
137+
}
138+
139+
private static class ReleaseCandidate {
140+
final KvTablet tablet;
141+
final long lastAccessTimestamp;
142+
143+
ReleaseCandidate(KvTablet tablet, long lastAccessTimestamp) {
144+
this.tablet = tablet;
145+
this.lastAccessTimestamp = lastAccessTimestamp;
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)