From a1058283e3cd3a28986a1c6e61a62ec9950bcfc7 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Fri, 16 Jan 2026 22:12:30 -0800 Subject: [PATCH 1/3] [Enhancement] Reduce lock contention in DatabaseTransactionMgr Move edit log operations outside the write lock to improve transaction throughput for concurrent operations on different tables within the same database. Changes: - Add unprotectUpdateInMemoryState() for in-memory updates inside lock - Add persistTransactionState() for edit log writes outside lock - Refactor beginTransaction, commitTransaction, finishTransaction, abortTransaction, and removeUselessTxns to use the new pattern - Refactor unprotectUpsertTransactionState to delegate to new methods This reduces lock hold time from milliseconds (I/O bound) to microseconds (memory only), enabling higher concurrency for multi-table workloads. Safety is maintained because: - In-memory state is updated atomically within the write lock - Edit log failures call System.exit(-1), preventing inconsistent state - Replay path remains unchanged (uses isReplay=true) Fixes: https://github.com/apache/doris/issues/53642 --- .../transaction/DatabaseTransactionMgr.java | 130 ++++++++++++++---- 1 file changed, 102 insertions(+), 28 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index ade04e95b156f1..802508211c9b83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -331,6 +331,7 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req FeNameFormat.checkLabel(label); long tid = 0L; + TransactionState transactionState = null; writeLock(); try { /* @@ -368,10 +369,10 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req checkRunningTxnExceedLimit(tableIdList); tid = idGenerator.getNextTransactionId(); - TransactionState transactionState = new TransactionState(dbId, tableIdList, + transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); - unprotectUpsertTransactionState(transactionState, false); + unprotectUpdateInMemoryState(transactionState, false); if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_BEGIN.increase(1L); @@ -379,6 +380,8 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req } finally { writeUnlock(); } + // Persist edit log outside lock to reduce lock contention + persistTransactionState(transactionState); LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listener id: {}", tid, label, coordinator, listenerId); return tid; @@ -455,6 +458,8 @@ public void preCommitTransaction2PC(List tableList, long transactionId, } finally { writeUnlock(); } + // Persist edit log outside lock to reduce lock contention + persistTransactionState(transactionState); LOG.info("transaction:[{}] successfully pre-committed", transactionState); } @@ -820,6 +825,10 @@ public void commitTransaction(List
tableList, long transactionId, List tableList, LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } + // Persist edit log outside lock to reduce lock contention + if (txnOperated) { + persistTransactionState(transactionState); + } // update nextVersion because of the failure of persistent transaction resulting in error version updateCatalogAfterCommitted(transactionState, db, false); @@ -1176,7 +1189,7 @@ public void finishTransaction(long transactionId, Map partitionVisib transactionState.clearErrorMsg(); transactionState.setTransactionStatus(TransactionStatus.VISIBLE); setTableVersion(transactionState, db); - unprotectUpsertTransactionState(transactionState, false); + unprotectUpdateInMemoryState(transactionState, false); txnOperated = true; // TODO(cmy): We found a very strange problem. When delete-related transactions are processed here, // subsequent `updateCatalogAfterVisible()` is called, but it does not seem to be executed here @@ -1193,6 +1206,11 @@ public void finishTransaction(long transactionId, Map partitionVisib LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } + // Persist edit log outside transaction lock to reduce lock contention + // (still inside table locks for atomicity) + if (txnOperated) { + persistTransactionState(transactionState); + } updateCatalogAfterVisible(transactionState, db, partitionVisibleVersions, backendPartitions); } finally { MetaLockUtils.writeUnlockTables(tableList); @@ -1527,8 +1545,8 @@ protected void unprotectedPreCommitTransaction2PC(TransactionState transactionSt } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // persist transactionState - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state; edit log will be persisted by caller outside the lock + unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1565,8 +1583,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // persist transactionState - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state; edit log will be persisted by caller outside the lock + unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1626,8 +1644,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S transactionState.addSubTxnTableCommitInfo(subTransactionState, tableCommitInfo); } } - // persist transactionState - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state; edit log will be persisted by caller outside the lock + unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1674,23 +1692,37 @@ protected void unprotectedCommitTransaction2PC(TransactionState transactionState partitionCommitInfo.setVersionTime(System.currentTimeMillis()); } } - // persist transactionState - editLog.logInsertTransactionState(transactionState); + // Update in-memory state; edit log will be persisted by caller outside the lock + unprotectUpdateInMemoryState(transactionState, false); } - // for add/update/delete TransactionState + /** + * Updates transaction state both in-memory and persists to edit log (if not replay). + * This is the legacy method that performs both operations inside the lock. + * For new code that needs reduced lock contention, use {@link #unprotectUpdateInMemoryState} + * inside the lock and {@link #persistTransactionState} outside the lock. + * + * @param transactionState the transaction state to upsert + * @param isReplay true if this is a replay operation (edit log already contains this state) + */ protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) { - // if this is a replay operation, we should not log it + // Persist to edit log first (if not replay), then update in-memory state if (!isReplay) { - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE - || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) { - // if this is a prepare txn, and load source type is not FRONTEND - // no need to persist it. if prepare txn lost, the following commit will just be failed. - // user only need to retry this txn. - // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. - editLog.logInsertTransactionState(transactionState); - } + persistTransactionState(transactionState); } + unprotectUpdateInMemoryState(transactionState, isReplay); + } + + /** + * Updates only in-memory transaction state without persisting to edit log. + * This method must be called while holding the write lock. + * Use this method in combination with {@link #persistTransactionState} to reduce lock contention + * by moving edit log writes outside the lock. + * + * @param transactionState the transaction state to update + * @param isReplay true if this is a replay operation (edit log already contains this state) + */ + protected void unprotectUpdateInMemoryState(TransactionState transactionState, boolean isReplay) { if (!transactionState.getTransactionStatus().isFinalStatus()) { if (idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState) == null) { runningTxnNums++; @@ -1722,6 +1754,34 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState updateTxnLabels(transactionState); } + /** + * Persists the transaction state to edit log. + * This method should be called OUTSIDE the write lock to reduce lock contention. + * + *

SAFETY GUARANTEES: + *

    + *
  • In-memory state must be updated atomically within the write lock before calling this method
  • + *
  • If edit log write fails, System.exit(-1) is called, preventing inconsistent state
  • + *
  • Concurrent readers will see the new in-memory state immediately (correct behavior)
  • + *
  • Follower nodes will receive the edit log entry and replay it to update their state
  • + *
+ * + *

Note: For PREPARE transactions with non-FRONTEND source type, persistence is skipped + * because losing them only requires the client to retry the transaction. + * + * @param transactionState the transaction state to persist + */ + protected void persistTransactionState(TransactionState transactionState) { + if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE + || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) { + // if this is a prepare txn, and load source type is not FRONTEND + // no need to persist it. if prepare txn lost, the following commit will just be failed. + // user only need to retry this txn. + // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. + editLog.logInsertTransactionState(transactionState); + } + } + public int getRunningTxnNumsWithLock() { readLock(); try { @@ -1777,6 +1837,10 @@ public void abortTransaction(long transactionId, String reason, TxnCommitAttachm writeUnlock(); transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); } + // Persist edit log outside lock to reduce lock contention + if (txnOperated) { + persistTransactionState(transactionState); + } // send clear txn task to BE to clear the transactions on BE. // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared @@ -1819,6 +1883,10 @@ public void abortTransaction2PC(long transactionId) throws UserException { writeUnlock(); transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); } + // Persist edit log outside lock to reduce lock contention + if (txnOperated) { + persistTransactionState(transactionState); + } // send clear txn task to BE to clear the transactions on BE. // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared @@ -1849,7 +1917,8 @@ private boolean unprotectAbortTransaction(long transactionId, String reason) transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state; edit log will be persisted by caller outside the lock + unprotectUpdateInMemoryState(transactionState, false); return true; } @@ -1953,6 +2022,8 @@ protected List> getPartitionTransInfo(long txnId, long tableId) public void removeUselessTxns(long currentMillis) { // delete expired txns + BatchRemoveTransactionsOperationV2 op = null; + int numOfClearedTransaction = 0; writeLock(); try { Pair expiredTxnsInfoForShort = unprotectedRemoveUselessTxns(currentMillis, @@ -1960,18 +2031,21 @@ public void removeUselessTxns(long currentMillis) { Pair expiredTxnsInfoForLong = unprotectedRemoveUselessTxns(currentMillis, finalStatusTransactionStateDequeLong, MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second); - int numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; + numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; if (numOfClearedTransaction > 0) { - BatchRemoveTransactionsOperationV2 op = new BatchRemoveTransactionsOperationV2(dbId, + op = new BatchRemoveTransactionsOperationV2(dbId, expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first); - editLog.logBatchRemoveTransactions(op); - if (LOG.isDebugEnabled()) { - LOG.debug("Remove {} expired transactions", numOfClearedTransaction); - } } } finally { writeUnlock(); } + // Persist edit log outside lock to reduce lock contention + if (op != null) { + editLog.logBatchRemoveTransactions(op); + if (LOG.isDebugEnabled()) { + LOG.debug("Remove {} expired transactions", numOfClearedTransaction); + } + } } private Pair unprotectedRemoveUselessTxns(long currentMillis, From 1bc315693abdf7f73472432a9a23af26c4bd5dd3 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Fri, 6 Feb 2026 23:37:05 -0800 Subject: [PATCH 2/3] [Enhancement](txn) Add config flag for transaction edit log outside-lock optimization Add enable_txn_log_outside_lock config (default true) to control whether transaction edit log writes happen inside or outside the write lock. When enabled, edit log entries are enqueued (FIFO) inside the write lock and awaited outside it, preserving ordering via the batch queue while reducing lock hold time. This resolves the potential out-of-order edit log issue from the previous outside-lock optimization. - Add submitEdit() split API to EditLog for enqueue-only writes - Make EditLogItem public with await() method - Add enqueueTransactionState/awaitTransactionState helpers - Update all 8 persist call sites in DatabaseTransactionMgr --- .../java/org/apache/doris/common/Config.java | 12 ++ .../main/java/org/apache/doris/DorisFE.java | 6 + .../org/apache/doris/persist/EditLog.java | 68 ++++++- .../transaction/DatabaseTransactionMgr.java | 169 ++++++++++++------ .../org/apache/doris/catalog/FakeEditLog.java | 11 ++ 5 files changed, 212 insertions(+), 54 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 492d1296331a85..b632501ede539d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -744,6 +744,18 @@ public class Config extends ConfigBase { "Txn manager will reject coming txns."}) public static int max_running_txn_num_per_db = 10000; + @ConfField(mutable = true, masterOnly = true, description = { + "是否将事务的 edit log 写入移到写锁之外以减少锁竞争。" + + "开启后,edit log 条目在写锁内入队(FIFO 保证顺序)," + + "在写锁外等待持久化完成,从而降低写锁持有时间,提高并发事务吞吐量。" + + "默认开启。关闭后使用传统的锁内同步写入模式。", + "Whether to move transaction edit log writes outside the write lock to reduce lock contention. " + + "When enabled, edit log entries are enqueued inside the write lock (FIFO preserves ordering) " + + "and awaited outside the lock, reducing write lock hold time " + + "and improving concurrent transaction throughput. " + + "Default is true. Set to false to use the traditional in-lock synchronous write mode."}) + public static boolean enable_txn_log_outside_lock = true; + @ConfField(masterOnly = true, description = {"pending load task 执行线程数。这个配置可以限制当前等待的导入作业数。" + "并且应小于 `max_running_txn_num_per_db`。", "The pending load task executor pool size. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index a81f259dd5c0b7..200243d4af3aa2 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -66,6 +66,7 @@ import java.nio.file.StandardOpenOption; import java.time.LocalDate; import java.util.List; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -593,6 +594,11 @@ private static void fuzzyConfigs() { LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", Config.random_add_cluster_keys_for_mow); } + Config.enable_txn_log_outside_lock = new Random().nextBoolean(); + LOG.info("fuzzy set enable_txn_log_outside_lock={}", Config.enable_txn_log_outside_lock); + Config.enable_batch_editlog = new Random().nextBoolean(); + LOG.info("fuzzy set enable_batch_editlog={}", Config.enable_batch_editlog); + setFuzzyForCatalog(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 4f126fcf21422c..d506b474ed70b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -124,13 +124,14 @@ public class EditLog { public static final Logger LOG = LogManager.getLogger(EditLog.class); - // Helper class to hold log edit requests - private static class EditLogItem { + // Helper class to hold log edit requests. + // Public so that callers can enqueue inside a lock and await outside it. + public static class EditLogItem { static AtomicLong nextUid = new AtomicLong(0); final short op; final Writable writable; final Object lock = new Object(); - boolean finished = false; + volatile boolean finished = false; long logId = -1; long uid = -1; @@ -139,6 +140,24 @@ private static class EditLogItem { this.writable = writable; uid = nextUid.getAndIncrement(); } + + /** + * Wait for this edit log entry to be flushed to persistent storage. + * Returns the assigned log ID. + */ + public long await() { + synchronized (lock) { + while (!finished) { + try { + lock.wait(); + } catch (InterruptedException e) { + LOG.error("Fatal Error : write stream Exception"); + System.exit(-1); + } + } + } + return logId; + } } private final BlockingQueue logEditQueue = new LinkedBlockingQueue<>(); @@ -1534,6 +1553,49 @@ public long logEditWithQueue(short op, Writable writable) { return req.logId; } + /** + * Submit an edit log entry to the batch queue without waiting for it to be flushed. + * The entry is enqueued in FIFO order, so calling this inside a write lock guarantees + * that edit log entries are ordered by lock acquisition order. + * + *

The caller MUST call {@link EditLogItem#await()} after releasing the lock to ensure + * the entry is persisted before proceeding. + * + *

If batch edit log is disabled, this falls back to a synchronous direct write + * and the returned item is already completed. + * + * @return an {@link EditLogItem} handle to await completion + */ + public EditLogItem submitEdit(short op, Writable writable) { + if (this.getNumEditStreams() == 0) { + LOG.error("Fatal Error : no editLog stream", new Exception()); + throw new Error("Fatal Error : no editLog stream"); + } + + EditLogItem req = new EditLogItem(op, writable); + if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) { + while (true) { + try { + logEditQueue.put(req); + break; + } catch (InterruptedException e) { + LOG.warn("Interrupted during put, will sleep and retry."); + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + LOG.warn("interrupted during sleep, will retry.", ex); + } + } + } + } else { + // Non-batch mode: write directly (synchronous) + long logId = logEditDirectly(op, writable); + req.logId = logId; + req.finished = true; + } + return req; + } + private synchronized long logEditDirectly(short op, Writable writable) { long logId = -1; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 802508211c9b83..720f96fb6422ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -18,6 +18,7 @@ package org.apache.doris.transaction; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.binlog.UpsertRecord; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -55,6 +56,7 @@ import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.CleanLabelOperationLog; import org.apache.doris.persist.EditLog; +import org.apache.doris.persist.OperationType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.task.AgentBatchTask; @@ -332,6 +334,7 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req long tid = 0L; TransactionState transactionState = null; + EditLog.EditLogItem logItem = null; writeLock(); try { /* @@ -373,6 +376,11 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); unprotectUpdateInMemoryState(transactionState, false); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_BEGIN.increase(1L); @@ -380,8 +388,7 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req } finally { writeUnlock(); } - // Persist edit log outside lock to reduce lock contention - persistTransactionState(transactionState); + awaitTransactionState(logItem, transactionState); LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listener id: {}", tid, label, coordinator, listenerId); return tid; @@ -451,15 +458,20 @@ public void preCommitTransaction2PC(List

tableList, long transactionId, checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds, tableToPartition, totalInvolvedBackends); + EditLog.EditLogItem logItem = null; writeLock(); try { unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } } finally { writeUnlock(); } - // Persist edit log outside lock to reduce lock contention - persistTransactionState(transactionState); + awaitTransactionState(logItem, transactionState); LOG.info("transaction:[{}] successfully pre-committed", transactionState); } @@ -807,6 +819,7 @@ public void commitTransaction(List
tableList, long transactionId, List tableList, long transactionId, List tableList, long transactionId, List tableList, transactionState.beforeStateTransform(TransactionStatus.COMMITTED); // transaction state transform boolean txnOperated = false; + EditLog.EditLogItem logItem = null; writeLock(); try { unprotectedCommitTransaction(transactionState, errorReplicaIds, subTxnToPartition, totalInvolvedBackends, subTransactionStates, db); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } txnOperated = true; } finally { writeUnlock(); @@ -892,9 +915,8 @@ protected void commitTransaction(long transactionId, List
tableList, LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } - // Persist edit log outside lock to reduce lock contention if (txnOperated) { - persistTransactionState(transactionState); + awaitTransactionState(logItem, transactionState); } // update nextVersion because of the failure of persistent transaction resulting in error version @@ -1182,6 +1204,7 @@ public void finishTransaction(long transactionId, Map partitionVisib } } boolean txnOperated = false; + EditLog.EditLogItem logItem = null; writeLock(); try { transactionState.setErrorReplicas(errorReplicaIds); @@ -1190,6 +1213,11 @@ public void finishTransaction(long transactionId, Map partitionVisib transactionState.setTransactionStatus(TransactionStatus.VISIBLE); setTableVersion(transactionState, db); unprotectUpdateInMemoryState(transactionState, false); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } txnOperated = true; // TODO(cmy): We found a very strange problem. When delete-related transactions are processed here, // subsequent `updateCatalogAfterVisible()` is called, but it does not seem to be executed here @@ -1206,10 +1234,8 @@ public void finishTransaction(long transactionId, Map partitionVisib LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } } - // Persist edit log outside transaction lock to reduce lock contention - // (still inside table locks for atomicity) if (txnOperated) { - persistTransactionState(transactionState); + awaitTransactionState(logItem, transactionState); } updateCatalogAfterVisible(transactionState, db, partitionVisibleVersions, backendPartitions); } finally { @@ -1545,7 +1571,7 @@ protected void unprotectedPreCommitTransaction2PC(TransactionState transactionSt } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // Update in-memory state; edit log will be persisted by caller outside the lock + // Update in-memory state only; caller handles edit log persistence unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1583,7 +1609,7 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // Update in-memory state; edit log will be persisted by caller outside the lock + // Update in-memory state only; caller handles edit log persistence unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1644,7 +1670,7 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S transactionState.addSubTxnTableCommitInfo(subTransactionState, tableCommitInfo); } } - // Update in-memory state; edit log will be persisted by caller outside the lock + // Update in-memory state only; caller handles edit log persistence unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1692,24 +1718,18 @@ protected void unprotectedCommitTransaction2PC(TransactionState transactionState partitionCommitInfo.setVersionTime(System.currentTimeMillis()); } } - // Update in-memory state; edit log will be persisted by caller outside the lock + // Update in-memory state only; caller handles edit log persistence unprotectUpdateInMemoryState(transactionState, false); } /** - * Updates transaction state both in-memory and persists to edit log (if not replay). - * This is the legacy method that performs both operations inside the lock. - * For new code that needs reduced lock contention, use {@link #unprotectUpdateInMemoryState} - * inside the lock and {@link #persistTransactionState} outside the lock. + * Replays a transaction state update from the edit log. + * Only called during replay (follower sync or restart), so no edit log persistence is needed. * - * @param transactionState the transaction state to upsert - * @param isReplay true if this is a replay operation (edit log already contains this state) + * @param transactionState the transaction state to replay + * @param isReplay must be true (only used in replay path) */ protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) { - // Persist to edit log first (if not replay), then update in-memory state - if (!isReplay) { - persistTransactionState(transactionState); - } unprotectUpdateInMemoryState(transactionState, isReplay); } @@ -1755,21 +1775,9 @@ protected void unprotectUpdateInMemoryState(TransactionState transactionState, b } /** - * Persists the transaction state to edit log. - * This method should be called OUTSIDE the write lock to reduce lock contention. - * - *

SAFETY GUARANTEES: - *

    - *
  • In-memory state must be updated atomically within the write lock before calling this method
  • - *
  • If edit log write fails, System.exit(-1) is called, preventing inconsistent state
  • - *
  • Concurrent readers will see the new in-memory state immediately (correct behavior)
  • - *
  • Follower nodes will receive the edit log entry and replay it to update their state
  • - *
- * - *

Note: For PREPARE transactions with non-FRONTEND source type, persistence is skipped - * because losing them only requires the client to retry the transaction. - * - * @param transactionState the transaction state to persist + * Persists the transaction state to edit log synchronously. + * For PREPARE transactions with non-FRONTEND source type, persistence is skipped + * because losing them only requires the client to retry. */ protected void persistTransactionState(TransactionState transactionState) { if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE @@ -1782,6 +1790,46 @@ protected void persistTransactionState(TransactionState transactionState) { } } + /** + * Enqueue a transaction state edit log entry without waiting for persistence. + * Must be called inside the write lock to preserve ordering via the FIFO queue. + * + * @return an {@link EditLog.EditLogItem} handle to await completion, or null if persistence is skipped + */ + protected EditLog.EditLogItem enqueueTransactionState(TransactionState transactionState) { + if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE + || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) { + return editLog.submitEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState); + } + return null; + } + + /** + * Await completion of a previously enqueued transaction state edit log entry. + * Should be called outside the write lock. Handles binlog and timing logic. + * + * @param item the handle returned by {@link #enqueueTransactionState}, may be null + * @param transactionState the transaction state (for binlog and timing) + */ + protected void awaitTransactionState(EditLog.EditLogItem item, TransactionState transactionState) { + if (item == null) { + return; + } + long start = System.currentTimeMillis(); + long logId = item.await(); + long logEditEnd = System.currentTimeMillis(); + long end = logEditEnd; + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + UpsertRecord record = new UpsertRecord(logId, transactionState); + Env.getCurrentEnv().getBinlogManager().addUpsertRecord(record); + end = System.currentTimeMillis(); + } + if (end - start > Config.lock_reporting_threshold_ms) { + LOG.warn("edit log insert transaction take a lot time, write bdb {} ms, write binlog {} ms", + logEditEnd - start, end - logEditEnd); + } + } + public int getRunningTxnNumsWithLock() { readLock(); try { @@ -1830,16 +1878,23 @@ public void abortTransaction(long transactionId, String reason, TxnCommitAttachm // before state transform transactionState.beforeStateTransform(TransactionStatus.ABORTED); boolean txnOperated = false; + EditLog.EditLogItem logItem = null; writeLock(); try { txnOperated = unprotectAbortTransaction(transactionId, reason); + if (txnOperated) { + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } + } } finally { writeUnlock(); transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); } - // Persist edit log outside lock to reduce lock contention if (txnOperated) { - persistTransactionState(transactionState); + awaitTransactionState(logItem, transactionState); } // send clear txn task to BE to clear the transactions on BE. @@ -1876,16 +1931,23 @@ public void abortTransaction2PC(long transactionId) throws UserException { // before state transform transactionState.beforeStateTransform(TransactionStatus.ABORTED); boolean txnOperated = false; + EditLog.EditLogItem logItem = null; writeLock(); try { txnOperated = unprotectAbortTransaction(transactionId, "User Abort"); + if (txnOperated) { + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } + } } finally { writeUnlock(); transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); } - // Persist edit log outside lock to reduce lock contention if (txnOperated) { - persistTransactionState(transactionState); + awaitTransactionState(logItem, transactionState); } // send clear txn task to BE to clear the transactions on BE. @@ -1917,7 +1979,7 @@ private boolean unprotectAbortTransaction(long transactionId, String reason) transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); transactionState.setTransactionStatus(TransactionStatus.ABORTED); - // Update in-memory state; edit log will be persisted by caller outside the lock + // Update in-memory state only; caller handles edit log persistence unprotectUpdateInMemoryState(transactionState, false); return true; } @@ -2023,6 +2085,7 @@ protected List> getPartitionTransInfo(long txnId, long tableId) public void removeUselessTxns(long currentMillis) { // delete expired txns BatchRemoveTransactionsOperationV2 op = null; + EditLog.EditLogItem logItem = null; int numOfClearedTransaction = 0; writeLock(); try { @@ -2035,16 +2098,20 @@ public void removeUselessTxns(long currentMillis) { if (numOfClearedTransaction > 0) { op = new BatchRemoveTransactionsOperationV2(dbId, expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first); + if (Config.enable_txn_log_outside_lock) { + logItem = editLog.submitEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op); + } else { + editLog.logBatchRemoveTransactions(op); + } } } finally { writeUnlock(); } - // Persist edit log outside lock to reduce lock contention - if (op != null) { - editLog.logBatchRemoveTransactions(op); - if (LOG.isDebugEnabled()) { - LOG.debug("Remove {} expired transactions", numOfClearedTransaction); - } + if (logItem != null) { + logItem.await(); + } + if (op != null && LOG.isDebugEnabled()) { + LOG.debug("Remove {} expired transactions", numOfClearedTransaction); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java index e2b113958366c8..73ed9f255cd78b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -20,9 +20,11 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.BatchAlterJobPersistInfo; import org.apache.doris.cluster.Cluster; +import org.apache.doris.common.io.Writable; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; +import org.apache.doris.persist.OperationType; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.persist.TableInfo; import org.apache.doris.system.Backend; @@ -110,6 +112,15 @@ public int getNumEditStreams() { return 1; // fake that we have streams } + @Mock + public EditLog.EditLogItem submitEdit(short op, Writable writable) { + if (op == OperationType.OP_UPSERT_TRANSACTION_STATE && writable instanceof TransactionState) { + TransactionState transactionState = (TransactionState) writable; + allTransactionState.put(transactionState.getTransactionId(), transactionState); + } + return null; + } + public TransactionState getTransaction(long transactionId) { return allTransactionState.get(transactionId); } From db3454055403966237c33ef0b6bad0216f8fa109 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG Date: Sat, 7 Feb 2026 13:56:17 -0800 Subject: [PATCH 3/3] [Enhancement](txn) Per-transaction locking and parallel publish within a database Replace the database-wide write lock with synchronized(transactionState) for commit, preCommit, abort, and finish paths, so independent transactions can proceed concurrently instead of being serialized by the db write lock. Key changes in DatabaseTransactionMgr: - Extract updateTxnLabels from unprotectUpdateInMemoryState (only needed at beginTransaction and replay time) - Convert runningTxnNums from volatile int to AtomicInteger - Replace ArrayDeque with ConcurrentLinkedDeque for final-status deques - Replace writeLock/writeUnlock with synchronized(transactionState) in preCommitTransaction2PC, commitTransaction, finishTransaction, abortTransaction, and abortTransaction2PC Key changes in PublishVersionDaemon: - Route publish executor by transactionId instead of dbId when enable_per_txn_publish=true (default), enabling intra-DB parallel publish - Fix race condition: use local variables in tryFinishTxnSync instead of shared instance fields for partitionVisibleVersions/backendPartitions - Rename dbExecutors to publishExecutors New config flag: - enable_per_txn_publish: controls publish routing (true=parallel within DB, false=sequential per DB fallback). Mutable at runtime. Co-Authored-By: Claude Opus 4.6 --- .../java/org/apache/doris/common/Config.java | 10 ++ .../transaction/DatabaseTransactionMgr.java | 154 ++++++++---------- .../transaction/PublishVersionDaemon.java | 20 ++- 3 files changed, 92 insertions(+), 92 deletions(-) diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index b632501ede539d..f16d371c31895d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -756,6 +756,16 @@ public class Config extends ConfigBase { + "Default is true. Set to false to use the traditional in-lock synchronous write mode."}) public static boolean enable_txn_log_outside_lock = true; + @ConfField(mutable = true, description = { + "是否启用按事务级别并行发布。开启后,同一数据库内的不同事务可以在不同的执行器线程上并行完成发布," + + "而不是按数据库顺序执行。关闭后回退到按数据库路由(旧行为),同一数据库内的事务顺序发布。", + "Whether to enable per-transaction parallel publish. When enabled, different transactions " + + "in the same database can finish publishing in parallel across executor threads, " + + "instead of being serialized per database. " + + "When disabled, falls back to per-database routing (old behavior) " + + "where transactions within a DB are published sequentially."}) + public static boolean enable_per_txn_publish = true; + @ConfField(masterOnly = true, description = {"pending load task 执行线程数。这个配置可以限制当前等待的导入作业数。" + "并且应小于 `max_running_txn_num_per_db`。", "The pending load task executor pool size. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index 720f96fb6422ef..eb905f3eda155c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -80,7 +80,6 @@ import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -92,7 +91,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -136,8 +137,10 @@ private enum PublishResult { // The "Short" queue is used to store the txns of the expire time // controlled by Config.streaming_label_keep_max_second. // The "Long" queue is used to store the txns of the expire time controlled by Config.label_keep_max_second. - private final ArrayDeque finalStatusTransactionStateDequeShort = new ArrayDeque<>(); - private final ArrayDeque finalStatusTransactionStateDequeLong = new ArrayDeque<>(); + private final ConcurrentLinkedDeque finalStatusTransactionStateDequeShort + = new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque finalStatusTransactionStateDequeLong + = new ConcurrentLinkedDeque<>(); // label -> txn ids // this is used for checking if label already used. a label may correspond to multiple txns, @@ -152,7 +155,7 @@ private enum PublishResult { private Long lastCommittedTxnCountUpdateTime = 0L; // count the number of running txns of database - private volatile int runningTxnNums = 0; + private final AtomicInteger runningTxnNums = new AtomicInteger(0); private final Env env; @@ -224,7 +227,7 @@ protected Set unprotectedGetTxnIdsByLabel(String label) { } protected int getRunningTxnNums() { - return runningTxnNums; + return runningTxnNums.get(); } @VisibleForTesting @@ -376,6 +379,7 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); unprotectUpdateInMemoryState(transactionState, false); + updateTxnLabels(transactionState); if (Config.enable_txn_log_outside_lock) { logItem = enqueueTransactionState(transactionState); } else { @@ -459,8 +463,7 @@ public void preCommitTransaction2PC(List

tableList, long transactionId, tableToPartition, totalInvolvedBackends); EditLog.EditLogItem logItem = null; - writeLock(); - try { + synchronized (transactionState) { unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db); if (Config.enable_txn_log_outside_lock) { @@ -468,8 +471,6 @@ public void preCommitTransaction2PC(List
tableList, long transactionId, } else { persistTransactionState(transactionState); } - } finally { - writeUnlock(); } awaitTransactionState(logItem, transactionState); LOG.info("transaction:[{}] successfully pre-committed", transactionState); @@ -820,8 +821,7 @@ public void commitTransaction(List
tableList, long transactionId, List tableList, long transactionId, List tableList, // transaction state transform boolean txnOperated = false; EditLog.EditLogItem logItem = null; - writeLock(); - try { + synchronized (transactionState) { unprotectedCommitTransaction(transactionState, errorReplicaIds, subTxnToPartition, totalInvolvedBackends, subTransactionStates, db); if (Config.enable_txn_log_outside_lock) { @@ -906,14 +903,12 @@ protected void commitTransaction(long transactionId, List
tableList, persistTransactionState(transactionState); } txnOperated = true; - } finally { - writeUnlock(); - // after state transform - try { - transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated); - } catch (Throwable e) { - LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); - } + } + // after state transform + try { + transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated); + } catch (Throwable e) { + LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } if (txnOperated) { awaitTransactionState(logItem, transactionState); @@ -965,15 +960,15 @@ public void replayDeleteTransaction(TransactionState transactionState) { // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque, // it must at the front of the finalStatusTransactionStateDeque. // check both "short" and "long" queue. - if (!finalStatusTransactionStateDequeShort.isEmpty() - && transactionState.getTransactionId() - == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeShort.pop(); + TransactionState shortHead = finalStatusTransactionStateDequeShort.peekFirst(); + TransactionState longHead = finalStatusTransactionStateDequeLong.peekFirst(); + if (shortHead != null + && transactionState.getTransactionId() == shortHead.getTransactionId()) { + finalStatusTransactionStateDequeShort.pollFirst(); clearTransactionState(transactionState.getTransactionId()); - } else if (!finalStatusTransactionStateDequeLong.isEmpty() - && transactionState.getTransactionId() - == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeLong.pop(); + } else if (longHead != null + && transactionState.getTransactionId() == longHead.getTransactionId()) { + finalStatusTransactionStateDequeLong.pollFirst(); clearTransactionState(transactionState.getTransactionId()); } } finally { @@ -988,13 +983,13 @@ public void replayBatchRemoveTransaction(List txnIds) { // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque, // it must at the front of the finalStatusTransactionStateDeque // check both "short" and "long" queue. - if (!finalStatusTransactionStateDequeShort.isEmpty() - && txnId == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeShort.pop(); + TransactionState shortHead = finalStatusTransactionStateDequeShort.peekFirst(); + TransactionState longHead = finalStatusTransactionStateDequeLong.peekFirst(); + if (shortHead != null && txnId == shortHead.getTransactionId()) { + finalStatusTransactionStateDequeShort.pollFirst(); clearTransactionState(txnId); - } else if (!finalStatusTransactionStateDequeLong.isEmpty() - && txnId == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeLong.pop(); + } else if (longHead != null && txnId == longHead.getTransactionId()) { + finalStatusTransactionStateDequeLong.pollFirst(); clearTransactionState(txnId); } } @@ -1007,8 +1002,8 @@ public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 oper writeLock(); try { if (operation.getLatestTxnIdForShort() != -1) { - while (!finalStatusTransactionStateDequeShort.isEmpty()) { - TransactionState transactionState = finalStatusTransactionStateDequeShort.pop(); + TransactionState transactionState; + while ((transactionState = finalStatusTransactionStateDequeShort.pollFirst()) != null) { clearTransactionState(transactionState.getTransactionId()); if (operation.getLatestTxnIdForShort() == transactionState.getTransactionId()) { break; @@ -1017,8 +1012,8 @@ public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 oper } if (operation.getLatestTxnIdForLong() != -1) { - while (!finalStatusTransactionStateDequeLong.isEmpty()) { - TransactionState transactionState = finalStatusTransactionStateDequeLong.pop(); + TransactionState transactionState; + while ((transactionState = finalStatusTransactionStateDequeLong.pollFirst()) != null) { clearTransactionState(transactionState.getTransactionId()); if (operation.getLatestTxnIdForLong() == transactionState.getTransactionId()) { break; @@ -1205,8 +1200,7 @@ public void finishTransaction(long transactionId, Map partitionVisib } boolean txnOperated = false; EditLog.EditLogItem logItem = null; - writeLock(); - try { + synchronized (transactionState) { transactionState.setErrorReplicas(errorReplicaIds); transactionState.setFinishTime(System.currentTimeMillis()); transactionState.clearErrorMsg(); @@ -1226,13 +1220,11 @@ public void finishTransaction(long transactionId, Map partitionVisib if (LOG.isDebugEnabled()) { LOG.debug("after set transaction {} to visible", transactionState); } - } finally { - writeUnlock(); - try { - transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated); - } catch (Throwable e) { - LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); - } + } + try { + transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated); + } catch (Throwable e) { + LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); } if (txnOperated) { awaitTransactionState(logItem, transactionState); @@ -1745,7 +1737,7 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState protected void unprotectUpdateInMemoryState(TransactionState transactionState, boolean isReplay) { if (!transactionState.getTransactionStatus().isFinalStatus()) { if (idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState) == null) { - runningTxnNums++; + runningTxnNums.incrementAndGet(); } if (isReplay && transactionState.getSubTxnIds() != null) { LOG.info("add sub transactions for txn_id={}, status={}, sub_txn_ids={}", @@ -1757,7 +1749,7 @@ protected void unprotectUpdateInMemoryState(TransactionState transactionState, b } } else { if (idToRunningTransactionState.remove(transactionState.getTransactionId()) != null) { - runningTxnNums--; + runningTxnNums.decrementAndGet(); } idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState); if (transactionState.isShortTxn()) { @@ -1771,7 +1763,9 @@ protected void unprotectUpdateInMemoryState(TransactionState transactionState, b cleanSubTransactions(transactionState.getTransactionId()); } } - updateTxnLabels(transactionState); + if (isReplay) { + updateTxnLabels(transactionState); + } } /** @@ -1831,12 +1825,7 @@ protected void awaitTransactionState(EditLog.EditLogItem item, TransactionState } public int getRunningTxnNumsWithLock() { - readLock(); - try { - return runningTxnNums; - } finally { - readUnlock(); - } + return runningTxnNums.get(); } private void updateTxnLabels(TransactionState transactionState) { @@ -1879,8 +1868,7 @@ public void abortTransaction(long transactionId, String reason, TxnCommitAttachm transactionState.beforeStateTransform(TransactionStatus.ABORTED); boolean txnOperated = false; EditLog.EditLogItem logItem = null; - writeLock(); - try { + synchronized (transactionState) { txnOperated = unprotectAbortTransaction(transactionId, reason); if (txnOperated) { if (Config.enable_txn_log_outside_lock) { @@ -1889,10 +1877,8 @@ public void abortTransaction(long transactionId, String reason, TxnCommitAttachm persistTransactionState(transactionState); } } - } finally { - writeUnlock(); - transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); } + transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); if (txnOperated) { awaitTransactionState(logItem, transactionState); } @@ -1932,8 +1918,7 @@ public void abortTransaction2PC(long transactionId) throws UserException { transactionState.beforeStateTransform(TransactionStatus.ABORTED); boolean txnOperated = false; EditLog.EditLogItem logItem = null; - writeLock(); - try { + synchronized (transactionState) { txnOperated = unprotectAbortTransaction(transactionId, "User Abort"); if (txnOperated) { if (Config.enable_txn_log_outside_lock) { @@ -1942,10 +1927,8 @@ public void abortTransaction2PC(long transactionId) throws UserException { persistTransactionState(transactionState); } } - } finally { - writeUnlock(); - transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); } + transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); if (txnOperated) { awaitTransactionState(logItem, transactionState); } @@ -2116,13 +2099,14 @@ public void removeUselessTxns(long currentMillis) { } private Pair unprotectedRemoveUselessTxns(long currentMillis, - ArrayDeque finalStatusTransactionStateDeque, int left) { + ConcurrentLinkedDeque finalStatusTransactionStateDeque, int left) { long latestTxnId = -1; int numOfClearedTransaction = 0; - while (!finalStatusTransactionStateDeque.isEmpty() && numOfClearedTransaction < left) { - TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); + TransactionState transactionState; + while ((transactionState = finalStatusTransactionStateDeque.peekFirst()) != null + && numOfClearedTransaction < left) { if (transactionState.isExpired(currentMillis)) { - finalStatusTransactionStateDeque.pop(); + finalStatusTransactionStateDeque.pollFirst(); clearTransactionState(transactionState.getTransactionId()); latestTxnId = transactionState.getTransactionId(); numOfClearedTransaction++; @@ -2132,9 +2116,9 @@ private Pair unprotectedRemoveUselessTxns(long currentMillis, } while ((Config.label_num_threshold > 0 && finalStatusTransactionStateDeque.size() > Config.label_num_threshold) && numOfClearedTransaction < left) { - TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); - if (transactionState.getFinishTime() != -1) { - finalStatusTransactionStateDeque.pop(); + transactionState = finalStatusTransactionStateDeque.peekFirst(); + if (transactionState != null && transactionState.getFinishTime() != -1) { + finalStatusTransactionStateDeque.pollFirst(); clearTransactionState(transactionState.getTransactionId()); latestTxnId = transactionState.getTransactionId(); numOfClearedTransaction++; @@ -2246,9 +2230,9 @@ protected void checkRunningTxnExceedLimit(List tableIdList) throws BeginTransactionException, MetaNotFoundException { long txnQuota = env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize(); - if (runningTxnNums >= txnQuota) { + if (runningTxnNums.get() >= txnQuota) { throw new BeginTransactionException("current running txns on db " + dbId + " is " - + runningTxnNums + ", larger than limit " + txnQuota); + + runningTxnNums.get() + ", larger than limit " + txnQuota); } // Check if committed txn count on any table exceeds the configured limit @@ -2665,7 +2649,7 @@ public List> getDbTransStateInfo() { readLock(); try { infos.add(Lists.newArrayList("running", String.valueOf( - runningTxnNums))); + runningTxnNums.get()))); long finishedNum = getFinishedTxnNums(); infos.add(Lists.newArrayList("finished", String.valueOf(finishedNum))); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 4200e0c5e8961d..fcbd6705ac248e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -61,7 +61,7 @@ public class PublishVersionDaemon extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(PublishVersionDaemon.class); - private static ArrayList dbExecutors = new ArrayList(Config.publish_thread_pool_num); + private static ArrayList publishExecutors = new ArrayList(Config.publish_thread_pool_num); private Set publishingTxnIds = Sets.newConcurrentHashSet(); @@ -72,7 +72,7 @@ public class PublishVersionDaemon extends MasterDaemon { public PublishVersionDaemon() { super("PUBLISH_VERSION", Config.publish_version_interval_ms); for (int i = 0; i < Config.publish_thread_pool_num; i++) { - dbExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, Config.publish_queue_size, + publishExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, Config.publish_queue_size, "PUBLISH_VERSION_EXEC-" + i, true)); } } @@ -245,7 +245,13 @@ private void tryFinishTxnAsync(TransactionState transactionState, GlobalTransact LOG.info("try to finish transaction {}, dbId: {}, txnId: {}", transactionState.getTransactionId(), transactionState.getDbId(), transactionState.getTransactionId()); try { - dbExecutors.get((int) (transactionState.getDbId() % Config.publish_thread_pool_num)).execute(() -> { + // When enable_per_txn_publish is true, route by transactionId so different + // transactions in the same database can finish in parallel across executor threads. + // When false, route by dbId (old behavior) so transactions within a DB are sequential. + long routingKey = Config.enable_per_txn_publish + ? transactionState.getTransactionId() + : transactionState.getDbId(); + publishExecutors.get((int) (routingKey % Config.publish_thread_pool_num)).execute(() -> { try { tryFinishTxnSync(transactionState, globalTransactionMgr); } catch (Throwable e) { @@ -269,12 +275,12 @@ private void tryFinishTxnSync(TransactionState transactionState, GlobalTransacti } try { - partitionVisibleVersions = Maps.newHashMap(); - backendPartitions = Maps.newHashMap(); + Map txnPartitionVersions = Maps.newHashMap(); + Map> txnBackendPartitions = Maps.newHashMap(); // one transaction exception should not affect other transaction globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); - addBackendVisibleVersions(partitionVisibleVersions, backendPartitions); + transactionState.getTransactionId(), txnPartitionVersions, txnBackendPartitions); + addBackendVisibleVersions(txnPartitionVersions, txnBackendPartitions); } catch (Exception e) { LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); }