Skip to content

Commit 7e0837b

Browse files
authored
Add an interface to support flushing by TsFileResource (#17203)
* Add an interface to support flushing by TsFileResource * Fix review * spotless
1 parent fe0d62e commit 7e0837b

File tree

3 files changed

+111
-63
lines changed

3 files changed

+111
-63
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1939,38 +1939,48 @@ private String getNewTsFileName(long time, long version, int mergeCnt, int unseq
19391939
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt);
19401940
}
19411941

1942+
/**
1943+
* close the TsFile represented by the given resource, thread-safe
1944+
*
1945+
* @param tsFileResource TsFile to be closed
1946+
* @return a future related to the close task
1947+
*/
1948+
public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) {
1949+
writeLock("asyncCloseOneTsFileProcessor");
1950+
try {
1951+
return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), tsFileResource.getProcessor());
1952+
} finally {
1953+
writeUnlock();
1954+
}
1955+
}
1956+
19421957
/**
19431958
* close one tsfile processor, thread-safety should be ensured by caller
19441959
*
19451960
* @param sequence whether this tsfile processor is sequence or not
19461961
* @param tsFileProcessor tsfile processor
19471962
*/
19481963
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
1949-
// for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
1950-
// for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
1951-
if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
1952-
|| closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
1953-
|| tsFileProcessor.alreadyMarkedClosing()) {
1964+
if (tsFileProcessor == null) {
19541965
return CompletableFuture.completedFuture(null);
19551966
}
1956-
Future<?> future;
1957-
if (sequence) {
1958-
closingSequenceTsFileProcessor.add(tsFileProcessor);
1959-
future = tsFileProcessor.asyncClose();
1960-
if (future.isDone()) {
1961-
closingSequenceTsFileProcessor.remove(tsFileProcessor);
1962-
}
1967+
if (tsFileProcessor.getCloseFuture() != null) {
1968+
return tsFileProcessor.getCloseFuture();
1969+
}
19631970

1964-
workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1965-
} else {
1966-
closingUnSequenceTsFileProcessor.add(tsFileProcessor);
1967-
future = tsFileProcessor.asyncClose();
1968-
if (future.isDone()) {
1969-
closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
1970-
}
1971+
Future<?> future;
1972+
Set<TsFileProcessor> closingTsFileProcessors =
1973+
sequence ? closingSequenceTsFileProcessor : closingUnSequenceTsFileProcessor;
1974+
TreeMap<Long, TsFileProcessor> workTsFileProcessors =
1975+
sequence ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;
19711976

1972-
workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1977+
closingTsFileProcessors.add(tsFileProcessor);
1978+
future = tsFileProcessor.asyncClose();
1979+
if (future.isDone()) {
1980+
closingTsFileProcessors.remove(tsFileProcessor);
19731981
}
1982+
workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
1983+
19741984
TsFileResource resource = tsFileProcessor.getTsFileResource();
19751985
logger.info(
19761986
"Async close tsfile: {}, file start time: {}, file end time: {}",

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@ public class TsFileProcessor {
210210

211211
private int walEntryNum = 0;
212212

213+
private volatile Future<?> closeFuture;
214+
213215
@SuppressWarnings("squid:S107")
214216
public TsFileProcessor(
215217
String dataRegionName,
@@ -1250,10 +1252,13 @@ public void syncClose() throws ExecutionException {
12501252
logger.info(
12511253
"Sync close file: {}, will firstly async close it",
12521254
tsFileResource.getTsFile().getAbsolutePath());
1253-
if (shouldClose) {
1254-
return;
1255-
}
1255+
12561256
try {
1257+
if (closeFuture != null) {
1258+
closeFuture.get();
1259+
return;
1260+
}
1261+
12571262
asyncClose().get();
12581263
logger.info("Start to wait until file {} is closed", tsFileResource);
12591264
// if this TsFileProcessor is closing, asyncClose().get() of this thread will return quickly,
@@ -1273,6 +1278,10 @@ public Future<?> asyncClose() {
12731278
flushQueryLock.writeLock().lock();
12741279
logFlushQueryWriteLocked();
12751280
try {
1281+
if (closeFuture != null) {
1282+
return closeFuture;
1283+
}
1284+
12761285
if (logger.isDebugEnabled()) {
12771286
if (workMemTable != null) {
12781287
logger.debug(
@@ -1293,10 +1302,6 @@ public Future<?> asyncClose() {
12931302
tsFileResource.getTsFileSize());
12941303
}
12951304
}
1296-
1297-
if (shouldClose) {
1298-
return CompletableFuture.completedFuture(null);
1299-
}
13001305
// when a flush thread serves this TsFileProcessor (because the processor is submitted by
13011306
// registerTsFileProcessor()), the thread will seal the corresponding TsFile and
13021307
// execute other cleanup works if "shouldClose == true and flushingMemTables is empty".
@@ -1315,6 +1320,7 @@ public Future<?> asyncClose() {
13151320
// flushing memTable in System module.
13161321
Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
13171322
shouldClose = true;
1323+
closeFuture = future;
13181324
return future;
13191325
} catch (Exception e) {
13201326
logger.error(
@@ -1794,6 +1800,9 @@ public boolean isManagedByFlushManager() {
17941800

17951801
public void setManagedByFlushManager(boolean managedByFlushManager) {
17961802
this.managedByFlushManager = managedByFlushManager;
1803+
if (!managedByFlushManager) {
1804+
closeFuture = CompletableFuture.completedFuture(null);
1805+
}
17971806
}
17981807

17991808
/** Close this tsfile */
@@ -2384,4 +2393,12 @@ private void logFlushQueryReadUnlocked() {
23842393
public String toString() {
23852394
return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}';
23862395
}
2396+
2397+
public Future<?> getCloseFuture() {
2398+
return closeFuture;
2399+
}
2400+
2401+
public void setCloseFuture(Future<?> closeFuture) {
2402+
this.closeFuture = closeFuture;
2403+
}
23872404
}

0 commit comments

Comments
 (0)