diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 492d1296331a85..f16d371c31895d 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -744,6 +744,28 @@ public class Config extends ConfigBase { "Txn manager will reject coming txns."}) public static int max_running_txn_num_per_db = 10000; + @ConfField(mutable = true, masterOnly = true, description = { + "是否将事务的 edit log 写入移到写锁之外以减少锁竞争。" + + "开启后,edit log 条目在写锁内入队(FIFO 保证顺序)," + + "在写锁外等待持久化完成,从而降低写锁持有时间,提高并发事务吞吐量。" + + "默认开启。关闭后使用传统的锁内同步写入模式。", + "Whether to move transaction edit log writes outside the write lock to reduce lock contention. " + + "When enabled, edit log entries are enqueued inside the write lock (FIFO preserves ordering) " + + "and awaited outside the lock, reducing write lock hold time " + + "and improving concurrent transaction throughput. " + + "Default is true. Set to false to use the traditional in-lock synchronous write mode."}) + public static boolean enable_txn_log_outside_lock = true; + + @ConfField(mutable = true, description = { + "是否启用按事务级别并行发布。开启后,同一数据库内的不同事务可以在不同的执行器线程上并行完成发布," + + "而不是按数据库顺序执行。关闭后回退到按数据库路由(旧行为),同一数据库内的事务顺序发布。", + "Whether to enable per-transaction parallel publish. When enabled, different transactions " + + "in the same database can finish publishing in parallel across executor threads, " + + "instead of being serialized per database. " + + "When disabled, falls back to per-database routing (old behavior) " + + "where transactions within a DB are published sequentially."}) + public static boolean enable_per_txn_publish = true; + @ConfField(masterOnly = true, description = {"pending load task 执行线程数。这个配置可以限制当前等待的导入作业数。" + "并且应小于 `max_running_txn_num_per_db`。", "The pending load task executor pool size. " diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java index a81f259dd5c0b7..200243d4af3aa2 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java @@ -66,6 +66,7 @@ import java.nio.file.StandardOpenOption; import java.time.LocalDate; import java.util.List; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -593,6 +594,11 @@ private static void fuzzyConfigs() { LOG.info("fuzzy set random_add_cluster_keys_for_mow={}", Config.random_add_cluster_keys_for_mow); } + Config.enable_txn_log_outside_lock = new Random().nextBoolean(); + LOG.info("fuzzy set enable_txn_log_outside_lock={}", Config.enable_txn_log_outside_lock); + Config.enable_batch_editlog = new Random().nextBoolean(); + LOG.info("fuzzy set enable_batch_editlog={}", Config.enable_batch_editlog); + setFuzzyForCatalog(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 4f126fcf21422c..d506b474ed70b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -124,13 +124,14 @@ public class EditLog { public static final Logger LOG = LogManager.getLogger(EditLog.class); - // Helper class to hold log edit requests - private static class EditLogItem { + // Helper class to hold log edit requests. + // Public so that callers can enqueue inside a lock and await outside it. + public static class EditLogItem { static AtomicLong nextUid = new AtomicLong(0); final short op; final Writable writable; final Object lock = new Object(); - boolean finished = false; + volatile boolean finished = false; long logId = -1; long uid = -1; @@ -139,6 +140,24 @@ private static class EditLogItem { this.writable = writable; uid = nextUid.getAndIncrement(); } + + /** + * Wait for this edit log entry to be flushed to persistent storage. + * Returns the assigned log ID. + */ + public long await() { + synchronized (lock) { + while (!finished) { + try { + lock.wait(); + } catch (InterruptedException e) { + LOG.error("Fatal Error : write stream Exception"); + System.exit(-1); + } + } + } + return logId; + } } private final BlockingQueue logEditQueue = new LinkedBlockingQueue<>(); @@ -1534,6 +1553,49 @@ public long logEditWithQueue(short op, Writable writable) { return req.logId; } + /** + * Submit an edit log entry to the batch queue without waiting for it to be flushed. + * The entry is enqueued in FIFO order, so calling this inside a write lock guarantees + * that edit log entries are ordered by lock acquisition order. + * + *

The caller MUST call {@link EditLogItem#await()} after releasing the lock to ensure + * the entry is persisted before proceeding. + * + *

If batch edit log is disabled, this falls back to a synchronous direct write + * and the returned item is already completed. + * + * @return an {@link EditLogItem} handle to await completion + */ + public EditLogItem submitEdit(short op, Writable writable) { + if (this.getNumEditStreams() == 0) { + LOG.error("Fatal Error : no editLog stream", new Exception()); + throw new Error("Fatal Error : no editLog stream"); + } + + EditLogItem req = new EditLogItem(op, writable); + if (Config.enable_batch_editlog && op != OperationType.OP_TIMESTAMP) { + while (true) { + try { + logEditQueue.put(req); + break; + } catch (InterruptedException e) { + LOG.warn("Interrupted during put, will sleep and retry."); + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + LOG.warn("interrupted during sleep, will retry.", ex); + } + } + } + } else { + // Non-batch mode: write directly (synchronous) + long logId = logEditDirectly(op, writable); + req.logId = logId; + req.finished = true; + } + return req; + } + private synchronized long logEditDirectly(short op, Writable writable) { long logId = -1; try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java index ade04e95b156f1..eb905f3eda155c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java @@ -18,6 +18,7 @@ package org.apache.doris.transaction; import org.apache.doris.alter.AlterJobV2; +import org.apache.doris.binlog.UpsertRecord; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; @@ -55,6 +56,7 @@ import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.CleanLabelOperationLog; import org.apache.doris.persist.EditLog; +import org.apache.doris.persist.OperationType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.AnalysisManager; import org.apache.doris.task.AgentBatchTask; @@ -78,7 +80,6 @@ import java.io.DataOutput; import java.io.IOException; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Comparator; @@ -90,7 +91,9 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -134,8 +137,10 @@ private enum PublishResult { // The "Short" queue is used to store the txns of the expire time // controlled by Config.streaming_label_keep_max_second. // The "Long" queue is used to store the txns of the expire time controlled by Config.label_keep_max_second. - private final ArrayDeque finalStatusTransactionStateDequeShort = new ArrayDeque<>(); - private final ArrayDeque finalStatusTransactionStateDequeLong = new ArrayDeque<>(); + private final ConcurrentLinkedDeque finalStatusTransactionStateDequeShort + = new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque finalStatusTransactionStateDequeLong + = new ConcurrentLinkedDeque<>(); // label -> txn ids // this is used for checking if label already used. a label may correspond to multiple txns, @@ -150,7 +155,7 @@ private enum PublishResult { private Long lastCommittedTxnCountUpdateTime = 0L; // count the number of running txns of database - private volatile int runningTxnNums = 0; + private final AtomicInteger runningTxnNums = new AtomicInteger(0); private final Env env; @@ -222,7 +227,7 @@ protected Set unprotectedGetTxnIdsByLabel(String label) { } protected int getRunningTxnNums() { - return runningTxnNums; + return runningTxnNums.get(); } @VisibleForTesting @@ -331,6 +336,8 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req FeNameFormat.checkLabel(label); long tid = 0L; + TransactionState transactionState = null; + EditLog.EditLogItem logItem = null; writeLock(); try { /* @@ -368,10 +375,16 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req checkRunningTxnExceedLimit(tableIdList); tid = idGenerator.getNextTransactionId(); - TransactionState transactionState = new TransactionState(dbId, tableIdList, + transactionState = new TransactionState(dbId, tableIdList, tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000); transactionState.setPrepareTime(System.currentTimeMillis()); - unprotectUpsertTransactionState(transactionState, false); + unprotectUpdateInMemoryState(transactionState, false); + updateTxnLabels(transactionState); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } if (MetricRepo.isInit) { MetricRepo.COUNTER_TXN_BEGIN.increase(1L); @@ -379,6 +392,7 @@ public long beginTransaction(List tableIdList, String label, TUniqueId req } finally { writeUnlock(); } + awaitTransactionState(logItem, transactionState); LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listener id: {}", tid, label, coordinator, listenerId); return tid; @@ -448,13 +462,17 @@ public void preCommitTransaction2PC(List tableList, long transactionId, checkCommitStatus(tableList, transactionState, tabletCommitInfos, txnCommitAttachment, errorReplicaIds, tableToPartition, totalInvolvedBackends); - writeLock(); - try { + EditLog.EditLogItem logItem = null; + synchronized (transactionState) { unprotectedPreCommitTransaction2PC(transactionState, errorReplicaIds, tableToPartition, totalInvolvedBackends, db); - } finally { - writeUnlock(); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } } + awaitTransactionState(logItem, transactionState); LOG.info("transaction:[{}] successfully pre-committed", transactionState); } @@ -802,23 +820,29 @@ public void commitTransaction(List
tableList, long transactionId, List tableList, transactionState.beforeStateTransform(TransactionStatus.COMMITTED); // transaction state transform boolean txnOperated = false; - writeLock(); - try { + EditLog.EditLogItem logItem = null; + synchronized (transactionState) { unprotectedCommitTransaction(transactionState, errorReplicaIds, subTxnToPartition, totalInvolvedBackends, subTransactionStates, db); - txnOperated = true; - } finally { - writeUnlock(); - // after state transform - try { - transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated); - } catch (Throwable e) { - LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); } + txnOperated = true; + } + // after state transform + try { + transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated); + } catch (Throwable e) { + LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); + } + if (txnOperated) { + awaitTransactionState(logItem, transactionState); } // update nextVersion because of the failure of persistent transaction resulting in error version @@ -930,15 +960,15 @@ public void replayDeleteTransaction(TransactionState transactionState) { // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque, // it must at the front of the finalStatusTransactionStateDeque. // check both "short" and "long" queue. - if (!finalStatusTransactionStateDequeShort.isEmpty() - && transactionState.getTransactionId() - == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeShort.pop(); + TransactionState shortHead = finalStatusTransactionStateDequeShort.peekFirst(); + TransactionState longHead = finalStatusTransactionStateDequeLong.peekFirst(); + if (shortHead != null + && transactionState.getTransactionId() == shortHead.getTransactionId()) { + finalStatusTransactionStateDequeShort.pollFirst(); clearTransactionState(transactionState.getTransactionId()); - } else if (!finalStatusTransactionStateDequeLong.isEmpty() - && transactionState.getTransactionId() - == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeLong.pop(); + } else if (longHead != null + && transactionState.getTransactionId() == longHead.getTransactionId()) { + finalStatusTransactionStateDequeLong.pollFirst(); clearTransactionState(transactionState.getTransactionId()); } } finally { @@ -953,13 +983,13 @@ public void replayBatchRemoveTransaction(List txnIds) { // here we only delete the oldest element, so if element exist in finalStatusTransactionStateDeque, // it must at the front of the finalStatusTransactionStateDeque // check both "short" and "long" queue. - if (!finalStatusTransactionStateDequeShort.isEmpty() - && txnId == finalStatusTransactionStateDequeShort.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeShort.pop(); + TransactionState shortHead = finalStatusTransactionStateDequeShort.peekFirst(); + TransactionState longHead = finalStatusTransactionStateDequeLong.peekFirst(); + if (shortHead != null && txnId == shortHead.getTransactionId()) { + finalStatusTransactionStateDequeShort.pollFirst(); clearTransactionState(txnId); - } else if (!finalStatusTransactionStateDequeLong.isEmpty() - && txnId == finalStatusTransactionStateDequeLong.getFirst().getTransactionId()) { - finalStatusTransactionStateDequeLong.pop(); + } else if (longHead != null && txnId == longHead.getTransactionId()) { + finalStatusTransactionStateDequeLong.pollFirst(); clearTransactionState(txnId); } } @@ -972,8 +1002,8 @@ public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 oper writeLock(); try { if (operation.getLatestTxnIdForShort() != -1) { - while (!finalStatusTransactionStateDequeShort.isEmpty()) { - TransactionState transactionState = finalStatusTransactionStateDequeShort.pop(); + TransactionState transactionState; + while ((transactionState = finalStatusTransactionStateDequeShort.pollFirst()) != null) { clearTransactionState(transactionState.getTransactionId()); if (operation.getLatestTxnIdForShort() == transactionState.getTransactionId()) { break; @@ -982,8 +1012,8 @@ public void replayBatchRemoveTransaction(BatchRemoveTransactionsOperationV2 oper } if (operation.getLatestTxnIdForLong() != -1) { - while (!finalStatusTransactionStateDequeLong.isEmpty()) { - TransactionState transactionState = finalStatusTransactionStateDequeLong.pop(); + TransactionState transactionState; + while ((transactionState = finalStatusTransactionStateDequeLong.pollFirst()) != null) { clearTransactionState(transactionState.getTransactionId()); if (operation.getLatestTxnIdForLong() == transactionState.getTransactionId()) { break; @@ -1169,14 +1199,19 @@ public void finishTransaction(long transactionId, Map partitionVisib } } boolean txnOperated = false; - writeLock(); - try { + EditLog.EditLogItem logItem = null; + synchronized (transactionState) { transactionState.setErrorReplicas(errorReplicaIds); transactionState.setFinishTime(System.currentTimeMillis()); transactionState.clearErrorMsg(); transactionState.setTransactionStatus(TransactionStatus.VISIBLE); setTableVersion(transactionState, db); - unprotectUpsertTransactionState(transactionState, false); + unprotectUpdateInMemoryState(transactionState, false); + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } txnOperated = true; // TODO(cmy): We found a very strange problem. When delete-related transactions are processed here, // subsequent `updateCatalogAfterVisible()` is called, but it does not seem to be executed here @@ -1185,13 +1220,14 @@ public void finishTransaction(long transactionId, Map partitionVisib if (LOG.isDebugEnabled()) { LOG.debug("after set transaction {} to visible", transactionState); } - } finally { - writeUnlock(); - try { - transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated); - } catch (Throwable e) { - LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); - } + } + try { + transactionState.afterStateTransform(TransactionStatus.VISIBLE, txnOperated); + } catch (Throwable e) { + LOG.warn("afterStateTransform txn {} failed. exception: ", transactionState, e); + } + if (txnOperated) { + awaitTransactionState(logItem, transactionState); } updateCatalogAfterVisible(transactionState, db, partitionVisibleVersions, backendPartitions); } finally { @@ -1527,8 +1563,8 @@ protected void unprotectedPreCommitTransaction2PC(TransactionState transactionSt } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // persist transactionState - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state only; caller handles edit log persistence + unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1565,8 +1601,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S } transactionState.putIdToTableCommitInfo(tableId, tableCommitInfo); } - // persist transactionState - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state only; caller handles edit log persistence + unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1626,8 +1662,8 @@ protected void unprotectedCommitTransaction(TransactionState transactionState, S transactionState.addSubTxnTableCommitInfo(subTransactionState, tableCommitInfo); } } - // persist transactionState - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state only; caller handles edit log persistence + unprotectUpdateInMemoryState(transactionState, false); transactionState.setInvolvedBackends(totalInvolvedBackends); } @@ -1674,26 +1710,34 @@ protected void unprotectedCommitTransaction2PC(TransactionState transactionState partitionCommitInfo.setVersionTime(System.currentTimeMillis()); } } - // persist transactionState - editLog.logInsertTransactionState(transactionState); + // Update in-memory state only; caller handles edit log persistence + unprotectUpdateInMemoryState(transactionState, false); } - // for add/update/delete TransactionState + /** + * Replays a transaction state update from the edit log. + * Only called during replay (follower sync or restart), so no edit log persistence is needed. + * + * @param transactionState the transaction state to replay + * @param isReplay must be true (only used in replay path) + */ protected void unprotectUpsertTransactionState(TransactionState transactionState, boolean isReplay) { - // if this is a replay operation, we should not log it - if (!isReplay) { - if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE - || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) { - // if this is a prepare txn, and load source type is not FRONTEND - // no need to persist it. if prepare txn lost, the following commit will just be failed. - // user only need to retry this txn. - // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. - editLog.logInsertTransactionState(transactionState); - } - } + unprotectUpdateInMemoryState(transactionState, isReplay); + } + + /** + * Updates only in-memory transaction state without persisting to edit log. + * This method must be called while holding the write lock. + * Use this method in combination with {@link #persistTransactionState} to reduce lock contention + * by moving edit log writes outside the lock. + * + * @param transactionState the transaction state to update + * @param isReplay true if this is a replay operation (edit log already contains this state) + */ + protected void unprotectUpdateInMemoryState(TransactionState transactionState, boolean isReplay) { if (!transactionState.getTransactionStatus().isFinalStatus()) { if (idToRunningTransactionState.put(transactionState.getTransactionId(), transactionState) == null) { - runningTxnNums++; + runningTxnNums.incrementAndGet(); } if (isReplay && transactionState.getSubTxnIds() != null) { LOG.info("add sub transactions for txn_id={}, status={}, sub_txn_ids={}", @@ -1705,7 +1749,7 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState } } else { if (idToRunningTransactionState.remove(transactionState.getTransactionId()) != null) { - runningTxnNums--; + runningTxnNums.decrementAndGet(); } idToFinalStatusTransactionState.put(transactionState.getTransactionId(), transactionState); if (transactionState.isShortTxn()) { @@ -1719,18 +1763,71 @@ protected void unprotectUpsertTransactionState(TransactionState transactionState cleanSubTransactions(transactionState.getTransactionId()); } } - updateTxnLabels(transactionState); + if (isReplay) { + updateTxnLabels(transactionState); + } } - public int getRunningTxnNumsWithLock() { - readLock(); - try { - return runningTxnNums; - } finally { - readUnlock(); + /** + * Persists the transaction state to edit log synchronously. + * For PREPARE transactions with non-FRONTEND source type, persistence is skipped + * because losing them only requires the client to retry. + */ + protected void persistTransactionState(TransactionState transactionState) { + if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE + || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) { + // if this is a prepare txn, and load source type is not FRONTEND + // no need to persist it. if prepare txn lost, the following commit will just be failed. + // user only need to retry this txn. + // The FRONTEND type txn is committed and running asynchronously, so we have to persist it. + editLog.logInsertTransactionState(transactionState); + } + } + + /** + * Enqueue a transaction state edit log entry without waiting for persistence. + * Must be called inside the write lock to preserve ordering via the FIFO queue. + * + * @return an {@link EditLog.EditLogItem} handle to await completion, or null if persistence is skipped + */ + protected EditLog.EditLogItem enqueueTransactionState(TransactionState transactionState) { + if (transactionState.getTransactionStatus() != TransactionStatus.PREPARE + || transactionState.getSourceType() == TransactionState.LoadJobSourceType.FRONTEND) { + return editLog.submitEdit(OperationType.OP_UPSERT_TRANSACTION_STATE, transactionState); + } + return null; + } + + /** + * Await completion of a previously enqueued transaction state edit log entry. + * Should be called outside the write lock. Handles binlog and timing logic. + * + * @param item the handle returned by {@link #enqueueTransactionState}, may be null + * @param transactionState the transaction state (for binlog and timing) + */ + protected void awaitTransactionState(EditLog.EditLogItem item, TransactionState transactionState) { + if (item == null) { + return; + } + long start = System.currentTimeMillis(); + long logId = item.await(); + long logEditEnd = System.currentTimeMillis(); + long end = logEditEnd; + if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) { + UpsertRecord record = new UpsertRecord(logId, transactionState); + Env.getCurrentEnv().getBinlogManager().addUpsertRecord(record); + end = System.currentTimeMillis(); + } + if (end - start > Config.lock_reporting_threshold_ms) { + LOG.warn("edit log insert transaction take a lot time, write bdb {} ms, write binlog {} ms", + logEditEnd - start, end - logEditEnd); } } + public int getRunningTxnNumsWithLock() { + return runningTxnNums.get(); + } + private void updateTxnLabels(TransactionState transactionState) { Set txnIds = labelToTxnIds.computeIfAbsent(transactionState.getLabel(), k -> Sets.newHashSet()); txnIds.add(transactionState.getTransactionId()); @@ -1770,12 +1867,20 @@ public void abortTransaction(long transactionId, String reason, TxnCommitAttachm // before state transform transactionState.beforeStateTransform(TransactionStatus.ABORTED); boolean txnOperated = false; - writeLock(); - try { + EditLog.EditLogItem logItem = null; + synchronized (transactionState) { txnOperated = unprotectAbortTransaction(transactionId, reason); - } finally { - writeUnlock(); - transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); + if (txnOperated) { + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } + } + } + transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason); + if (txnOperated) { + awaitTransactionState(logItem, transactionState); } // send clear txn task to BE to clear the transactions on BE. @@ -1812,12 +1917,20 @@ public void abortTransaction2PC(long transactionId) throws UserException { // before state transform transactionState.beforeStateTransform(TransactionStatus.ABORTED); boolean txnOperated = false; - writeLock(); - try { + EditLog.EditLogItem logItem = null; + synchronized (transactionState) { txnOperated = unprotectAbortTransaction(transactionId, "User Abort"); - } finally { - writeUnlock(); - transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); + if (txnOperated) { + if (Config.enable_txn_log_outside_lock) { + logItem = enqueueTransactionState(transactionState); + } else { + persistTransactionState(transactionState); + } + } + } + transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, "User Abort"); + if (txnOperated) { + awaitTransactionState(logItem, transactionState); } // send clear txn task to BE to clear the transactions on BE. @@ -1849,7 +1962,8 @@ private boolean unprotectAbortTransaction(long transactionId, String reason) transactionState.setFinishTime(System.currentTimeMillis()); transactionState.setReason(reason); transactionState.setTransactionStatus(TransactionStatus.ABORTED); - unprotectUpsertTransactionState(transactionState, false); + // Update in-memory state only; caller handles edit log persistence + unprotectUpdateInMemoryState(transactionState, false); return true; } @@ -1953,6 +2067,9 @@ protected List> getPartitionTransInfo(long txnId, long tableId) public void removeUselessTxns(long currentMillis) { // delete expired txns + BatchRemoveTransactionsOperationV2 op = null; + EditLog.EditLogItem logItem = null; + int numOfClearedTransaction = 0; writeLock(); try { Pair expiredTxnsInfoForShort = unprotectedRemoveUselessTxns(currentMillis, @@ -1960,28 +2077,36 @@ public void removeUselessTxns(long currentMillis) { Pair expiredTxnsInfoForLong = unprotectedRemoveUselessTxns(currentMillis, finalStatusTransactionStateDequeLong, MAX_REMOVE_TXN_PER_ROUND - expiredTxnsInfoForShort.second); - int numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; + numOfClearedTransaction = expiredTxnsInfoForShort.second + expiredTxnsInfoForLong.second; if (numOfClearedTransaction > 0) { - BatchRemoveTransactionsOperationV2 op = new BatchRemoveTransactionsOperationV2(dbId, + op = new BatchRemoveTransactionsOperationV2(dbId, expiredTxnsInfoForShort.first, expiredTxnsInfoForLong.first); - editLog.logBatchRemoveTransactions(op); - if (LOG.isDebugEnabled()) { - LOG.debug("Remove {} expired transactions", numOfClearedTransaction); + if (Config.enable_txn_log_outside_lock) { + logItem = editLog.submitEdit(OperationType.OP_BATCH_REMOVE_TXNS_V2, op); + } else { + editLog.logBatchRemoveTransactions(op); } } } finally { writeUnlock(); } + if (logItem != null) { + logItem.await(); + } + if (op != null && LOG.isDebugEnabled()) { + LOG.debug("Remove {} expired transactions", numOfClearedTransaction); + } } private Pair unprotectedRemoveUselessTxns(long currentMillis, - ArrayDeque finalStatusTransactionStateDeque, int left) { + ConcurrentLinkedDeque finalStatusTransactionStateDeque, int left) { long latestTxnId = -1; int numOfClearedTransaction = 0; - while (!finalStatusTransactionStateDeque.isEmpty() && numOfClearedTransaction < left) { - TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); + TransactionState transactionState; + while ((transactionState = finalStatusTransactionStateDeque.peekFirst()) != null + && numOfClearedTransaction < left) { if (transactionState.isExpired(currentMillis)) { - finalStatusTransactionStateDeque.pop(); + finalStatusTransactionStateDeque.pollFirst(); clearTransactionState(transactionState.getTransactionId()); latestTxnId = transactionState.getTransactionId(); numOfClearedTransaction++; @@ -1991,9 +2116,9 @@ private Pair unprotectedRemoveUselessTxns(long currentMillis, } while ((Config.label_num_threshold > 0 && finalStatusTransactionStateDeque.size() > Config.label_num_threshold) && numOfClearedTransaction < left) { - TransactionState transactionState = finalStatusTransactionStateDeque.getFirst(); - if (transactionState.getFinishTime() != -1) { - finalStatusTransactionStateDeque.pop(); + transactionState = finalStatusTransactionStateDeque.peekFirst(); + if (transactionState != null && transactionState.getFinishTime() != -1) { + finalStatusTransactionStateDeque.pollFirst(); clearTransactionState(transactionState.getTransactionId()); latestTxnId = transactionState.getTransactionId(); numOfClearedTransaction++; @@ -2105,9 +2230,9 @@ protected void checkRunningTxnExceedLimit(List tableIdList) throws BeginTransactionException, MetaNotFoundException { long txnQuota = env.getInternalCatalog().getDbOrMetaException(dbId).getTransactionQuotaSize(); - if (runningTxnNums >= txnQuota) { + if (runningTxnNums.get() >= txnQuota) { throw new BeginTransactionException("current running txns on db " + dbId + " is " - + runningTxnNums + ", larger than limit " + txnQuota); + + runningTxnNums.get() + ", larger than limit " + txnQuota); } // Check if committed txn count on any table exceeds the configured limit @@ -2524,7 +2649,7 @@ public List> getDbTransStateInfo() { readLock(); try { infos.add(Lists.newArrayList("running", String.valueOf( - runningTxnNums))); + runningTxnNums.get()))); long finishedNum = getFinishedTxnNums(); infos.add(Lists.newArrayList("finished", String.valueOf(finishedNum))); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java index 4200e0c5e8961d..fcbd6705ac248e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/PublishVersionDaemon.java @@ -61,7 +61,7 @@ public class PublishVersionDaemon extends MasterDaemon { private static final Logger LOG = LogManager.getLogger(PublishVersionDaemon.class); - private static ArrayList dbExecutors = new ArrayList(Config.publish_thread_pool_num); + private static ArrayList publishExecutors = new ArrayList(Config.publish_thread_pool_num); private Set publishingTxnIds = Sets.newConcurrentHashSet(); @@ -72,7 +72,7 @@ public class PublishVersionDaemon extends MasterDaemon { public PublishVersionDaemon() { super("PUBLISH_VERSION", Config.publish_version_interval_ms); for (int i = 0; i < Config.publish_thread_pool_num; i++) { - dbExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, Config.publish_queue_size, + publishExecutors.add(ThreadPoolManager.newDaemonFixedThreadPool(1, Config.publish_queue_size, "PUBLISH_VERSION_EXEC-" + i, true)); } } @@ -245,7 +245,13 @@ private void tryFinishTxnAsync(TransactionState transactionState, GlobalTransact LOG.info("try to finish transaction {}, dbId: {}, txnId: {}", transactionState.getTransactionId(), transactionState.getDbId(), transactionState.getTransactionId()); try { - dbExecutors.get((int) (transactionState.getDbId() % Config.publish_thread_pool_num)).execute(() -> { + // When enable_per_txn_publish is true, route by transactionId so different + // transactions in the same database can finish in parallel across executor threads. + // When false, route by dbId (old behavior) so transactions within a DB are sequential. + long routingKey = Config.enable_per_txn_publish + ? transactionState.getTransactionId() + : transactionState.getDbId(); + publishExecutors.get((int) (routingKey % Config.publish_thread_pool_num)).execute(() -> { try { tryFinishTxnSync(transactionState, globalTransactionMgr); } catch (Throwable e) { @@ -269,12 +275,12 @@ private void tryFinishTxnSync(TransactionState transactionState, GlobalTransacti } try { - partitionVisibleVersions = Maps.newHashMap(); - backendPartitions = Maps.newHashMap(); + Map txnPartitionVersions = Maps.newHashMap(); + Map> txnBackendPartitions = Maps.newHashMap(); // one transaction exception should not affect other transaction globalTransactionMgr.finishTransaction(transactionState.getDbId(), - transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions); - addBackendVisibleVersions(partitionVisibleVersions, backendPartitions); + transactionState.getTransactionId(), txnPartitionVersions, txnBackendPartitions); + addBackendVisibleVersions(txnPartitionVersions, txnBackendPartitions); } catch (Exception e) { LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java index e2b113958366c8..73ed9f255cd78b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/FakeEditLog.java @@ -20,9 +20,11 @@ import org.apache.doris.alter.AlterJobV2; import org.apache.doris.alter.BatchAlterJobPersistInfo; import org.apache.doris.cluster.Cluster; +import org.apache.doris.common.io.Writable; import org.apache.doris.persist.BatchRemoveTransactionsOperationV2; import org.apache.doris.persist.EditLog; import org.apache.doris.persist.ModifyTablePropertyOperationLog; +import org.apache.doris.persist.OperationType; import org.apache.doris.persist.RoutineLoadOperation; import org.apache.doris.persist.TableInfo; import org.apache.doris.system.Backend; @@ -110,6 +112,15 @@ public int getNumEditStreams() { return 1; // fake that we have streams } + @Mock + public EditLog.EditLogItem submitEdit(short op, Writable writable) { + if (op == OperationType.OP_UPSERT_TRANSACTION_STATE && writable instanceof TransactionState) { + TransactionState transactionState = (TransactionState) writable; + allTransactionState.put(transactionState.getTransactionId(), transactionState); + } + return null; + } + public TransactionState getTransaction(long transactionId) { return allTransactionState.get(transactionId); }