feat: XA mode 'busy buffer' and 'bad connection' errors causing rollbacks#1052
feat: XA mode 'busy buffer' and 'bad connection' errors causing rollbacks#1052YuZhangLarry wants to merge 13 commits intoapache:masterfrom
Conversation
This change optimizes XA transaction handling when autoCommit=true, allowing multiple SQL statements to reuse the same XA branch instead of creating a new branch for each statement. This resolves the "busy buffer" issue reported in apache#904. Changes: - Add xaRegistry to manage XA branch lifecycle and support reuse - BeginTx: save original autoCommit state to GlobalLockRequire - createNewTxOnExecIfNeed: implement XA branch reuse logic - Commit: skip actual XA commit in autoCommit mode (defer to TC Phase2) - Rollback: add conditional rollback and registry cleanup - BuildExecutor: filter out undoLogSQLHook for XA mode - undoLogSQLHook: skip undo log generation for XA mode - Tx.Rollback: handle nil target case for XA mode Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Change the PR title to "Feat: " format. |
|
Fix the CI problem. |
thunguo
left a comment
There was a problem hiding this comment.
Consider whether these suggestions need modification. Then clean up non-standard debug logs and other unnecessary logs, ensuring only logs for critical action nodes are maintained.
| c.txCtx.XID = tm.GetXID(ctx) | ||
| c.txCtx.TransactionMode = types.XAMode | ||
| // Store the original autoCommit state for later use in commit logic | ||
| c.txCtx.GlobalLockRequire = wasAutoCommit |
There was a problem hiding this comment.
Is there any semantic confusion here? Would defining a new field like IsAutoCommitXABranch be better?
| // On error (but not ErrSkip), rollback the entire branch | ||
| isErrSkip := err == driver.ErrSkip || err.Error() == "driver: skip fast-path; continue as if unimplemented" | ||
| if !isErrSkip && c.xaActive { | ||
| if rollbackErr := c.Rollback(ctx); rollbackErr != nil { |
There was a problem hiding this comment.
c.Rollback(ctx) performs an XA-level rollback (XA END + XA ROLLBACK), but does not set c.tx to nil. This causes the defer statement at the top of the function to subsequently call c.tx.Rollback() again, resulting in a duplicate rollback of the already terminated underlying connection. Could this potentially lead to a bad connection?
|
|
||
| // For XA mode, use a plain executor without AT-specific hooks (like undo log generation) | ||
| // XA transactions don't need undo logs - they use the two-phase commit protocol managed by TC | ||
| if transactionMode == types.XAMode { |
There was a problem hiding this comment.
Could you double-check if this change is necessary? The undo_log_hook.go file handles this conditional branch, meaning the hook should occur earlier than the logic in the executor. It's likely that this part won't actually be executed. I understand that the current changes in undo_log_hook.go are sufficient.
| registry := getXARegistry() | ||
| registry.setState(c.txCtx.XID, xaStateEnded) |
There was a problem hiding this comment.
Should the status modification operation be performed after the XA END action?
| * limitations under the License. | ||
| */ | ||
|
|
||
| package sql |
There was a problem hiding this comment.
We need to consider whether the registry is truly necessary. My understanding is that as long as there is an active XA branch on the current connection, it can be reused. Perhaps it would be better to check this situation?
- Fix apache#904: busy buffer error when using SELECT FOR UPDATE followed by UPDATE in XA mode - Add IsAutoCommitXABranch field to replace GlobalLockRequire for better semantics - Fix duplicate rollback issue by tracking xaRollbacked flag - Remove global xaRegistry, use DBResource holder instead - Simplify executor.go for XA mode (undo_log_hook.go already handles this) - Clean up non-standard debug logs, keep only critical error logs Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Fix CI problem. |
|
Please fix the CI issue. |
thunguo
left a comment
There was a problem hiding this comment.
I left some comments, you fix the ci issue when you have time, and handle the content related to the comments.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package test |
There was a problem hiding this comment.
The following issues need to be paid attention to:
- The test case should be organized in a standardized table-driven manner as much as possible.
- Any comments on the warehouse code should be in English.
- The name of the test file should have a specific meaning, and consider whether this test should be moved to an existing test file.
There was a problem hiding this comment.
Pull request overview
This PR aims to fix XA-mode “busy buffer / bad connection” failures (issue #904) by reusing the same XA branch across multiple SQL statements when running inside a global transaction in auto-commit mode.
Changes:
- Add XA branch reuse tracking (xid → XAConn) and propagate “autoCommit-created branch” state via
TransactionContext. - Adjust XA execution/commit/rollback behavior to support multi-statement auto-commit flows and avoid AT-specific behavior (undo logs).
- Add a reproducer test and update dev change log.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
test/xa_issue904_test.go |
Adds an issue #904 reproducer test (currently requires a real MySQL + Seata runtime). |
pkg/datasource/sql/types/types.go |
Adds IsAutoCommitXABranch flag to the transaction context. |
pkg/datasource/sql/tx.go |
Makes rollback a no-op when target is nil (XA path). |
pkg/datasource/sql/hook/undo_log_hook.go |
Skips undo-log generation for XA mode. |
pkg/datasource/sql/exec/executor.go |
Returns a plain executor for XA mode to avoid AT-specific executor requirements. |
pkg/datasource/sql/db.go |
Adds xaConnsByXID registry for XA branch reuse. |
pkg/datasource/sql/conn_xa.go |
Implements the XA branch reuse logic and alters XA start/commit/rollback flow. |
changes/dev.md |
Notes the new XA branch reuse feature. |
Comments suppressed due to low confidence (1)
pkg/datasource/sql/conn_xa.go:208
- The panic/error rollback path in the deferred cleanup only calls
c.tx.Rollback()(local driver tx) and does not perform XA cleanup (XA END/XA ROLLBACK) nor unregister the branch fromxaConnsByXID. Iff()panics, this can leave an active XA branch open and leak the registry entry. Consider invokingc.Rollback(ctx)(or equivalent XA cleanup +UnregisterXABranch) whenrecoverErr != nil, and ensure it’s safe to call only once (you already trackxaRollbacked).
defer func() {
recoverErr := recover()
// Check if error is ErrSkip - don't rollback for this special error
isErrSkip := err != nil && (err == driver.ErrSkip || err.Error() == "driver: skip fast-path; continue as if unimplemented")
if (err != nil && !isErrSkip) || recoverErr != nil {
// Don't try to rollback if tx is nil or XA rollback was already done
if c.tx != nil && !xaRollbacked {
rollbackErr := c.tx.Rollback()
if rollbackErr != nil {
log.Errorf("conn at rollback error:%v", rollbackErr)
}
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Check if we already have an active XA branch for this transaction | ||
| heldConn := c.res.GetXABranch(xid) | ||
| if heldConn != nil && heldConn.xaActive && heldConn.txCtx.XID == xid { | ||
| if heldConn != c { | ||
| // Delegate to the connection that holds the XA branch | ||
| return heldConn.createNewTxOnExecIfNeed(ctx, f) | ||
| } | ||
| // Current connection already has the XA branch, execute SQL directly | ||
| // Skip creating a new branch | ||
| } else { | ||
| // Create new XA branch |
There was a problem hiding this comment.
The delegation path for XA branch reuse is incorrect: when heldConn != c, you call heldConn.createNewTxOnExecIfNeed(ctx, f) but f is a closure created in QueryContext/ExecContext that executes on the original receiver c.Conn, not on heldConn.Conn. This means the SQL still runs on the wrong physical connection while the XA branch is held on heldConn, defeating branch reuse and potentially breaking transaction semantics. Restructure this so the SQL execution is performed on heldConn (e.g., make createNewTxOnExecIfNeed accept a callback that receives the connection to use, or handle the heldConn selection before constructing the callback).
| // Check if we already have an active XA branch for this transaction | |
| heldConn := c.res.GetXABranch(xid) | |
| if heldConn != nil && heldConn.xaActive && heldConn.txCtx.XID == xid { | |
| if heldConn != c { | |
| // Delegate to the connection that holds the XA branch | |
| return heldConn.createNewTxOnExecIfNeed(ctx, f) | |
| } | |
| // Current connection already has the XA branch, execute SQL directly | |
| // Skip creating a new branch | |
| } else { | |
| // Create new XA branch | |
| // Check if we already have an active XA branch for this transaction on the *same* connection | |
| heldConn := c.res.GetXABranch(xid) | |
| if heldConn != nil && heldConn == c && heldConn.xaActive && heldConn.txCtx.XID == xid { | |
| // Current connection already has the XA branch, execute SQL directly | |
| // Skip creating a new branch | |
| } else { | |
| // Create new XA branch bound to the current connection |
| func (c *XAConn) Commit(ctx context.Context) error { | ||
| // If this XA branch was created in autoCommit mode (multi-statement transaction), | ||
| // don't do the actual XA commit here. The TC will handle it in Phase 2. | ||
| if c.txCtx.IsAutoCommitXABranch { |
There was a problem hiding this comment.
IsAutoCommitXABranch causes an early return here, but the XA branch reuse registry (xaConnsByXID) is not cleaned up on the success path for auto-commit multi-statement transactions. Since UnregisterXABranch is only called in Rollback and the non-autoCommit prepare path, this can leak entries indefinitely. Ensure the xid->conn mapping is removed when the global transaction finishes (e.g., during phase2 BranchCommit/BranchRollback, or in XaCommit/XaRollback after the RM completes).
| if c.txCtx.IsAutoCommitXABranch { | |
| if c.txCtx.IsAutoCommitXABranch { | |
| // The global transaction is finishing; clean up the XA branch reuse registry entry. | |
| c.res.UnregisterXABranch(c.txCtx.XID) |
| log.Errorf("conn at rollback error:%v or recoverErr:%v", err, recoverErr) | ||
| if c.tx != nil { | ||
| // Check if error is ErrSkip - don't rollback for this special error | ||
| isErrSkip := err != nil && (err == driver.ErrSkip || err.Error() == "driver: skip fast-path; continue as if unimplemented") |
There was a problem hiding this comment.
driver.ErrSkip handling relies partly on comparing err.Error() to a hard-coded string. This is brittle and can break if the error gets wrapped or the message changes. Prefer errors.Is(err, driver.ErrSkip) (or a direct equality check if you know the driver returns the sentinel) and remove the string comparison.
| isErrSkip := err != nil && (err == driver.ErrSkip || err.Error() == "driver: skip fast-path; continue as if unimplemented") | |
| isErrSkip := err != nil && errors.Is(err, driver.ErrSkip) |
| // MySQL 连接信息 | ||
| const mysqlDSN = "root:1234@tcp(127.0.0.1:3306)/seata_test?charset=utf8mb4" | ||
|
|
||
| t.Log("=== 测试 issue #904: XA 模式 SELECT FOR UPDATE 后 UPDATE ===") | ||
|
|
||
| // 初始化 Seata 客户端 | ||
| client.Init() | ||
| t.Log("Seata client initialized") | ||
|
|
||
| // 创建 XA 数据源 | ||
| db, err := sql.Open("seata-xa-mysql", mysqlDSN) | ||
| if err != nil { | ||
| t.Fatalf("Failed to open database: %v", err) |
There was a problem hiding this comment.
This test hard-codes a local MySQL DSN and assumes a running Seata server/config, which will make go test ./... fail or hang in CI and for most dev environments. Convert this into an opt-in integration test (e.g., guard with an env var and t.Skip when not set), and read DSN/config from env instead of embedding credentials/hostnames in the test.
| t.Logf("Warning: create table failed: %v", err) | ||
| } | ||
|
|
||
| // 重置数据 | ||
| nativeDB.Exec("DELETE FROM account_tbl WHERE user_id = 'user1'") | ||
| _, err = nativeDB.Exec("INSERT INTO account_tbl (user_id, money) VALUES ('user1', 100)") | ||
| if err != nil { | ||
| t.Logf("Warning: insert data failed: %v", err) |
There was a problem hiding this comment.
resetTestData ignores errors from the DELETE statement, and downgrades CREATE TABLE / INSERT failures to warnings, which can lead to false positives (the test continues even when setup failed). Make the setup fail fast by checking and returning/fatal-ing on all SQL execution errors.
| t.Logf("Warning: create table failed: %v", err) | |
| } | |
| // 重置数据 | |
| nativeDB.Exec("DELETE FROM account_tbl WHERE user_id = 'user1'") | |
| _, err = nativeDB.Exec("INSERT INTO account_tbl (user_id, money) VALUES ('user1', 100)") | |
| if err != nil { | |
| t.Logf("Warning: insert data failed: %v", err) | |
| t.Fatalf("Failed to create table account_tbl: %v", err) | |
| } | |
| // 重置数据 | |
| _, err = nativeDB.Exec("DELETE FROM account_tbl WHERE user_id = 'user1'") | |
| if err != nil { | |
| t.Fatalf("Failed to delete existing test data: %v", err) | |
| } | |
| _, err = nativeDB.Exec("INSERT INTO account_tbl (user_id, money) VALUES ('user1', 100)") | |
| if err != nil { | |
| t.Fatalf("Failed to insert test data: %v", err) |
| // For XA mode, use a plain executor without AT-specific hooks | ||
| // XA transactions don't need undo logs - they use the two-phase commit protocol managed by TC | ||
| // Note: undo_log_hook.go already handles skipping undo log generation for XA mode | ||
| if transactionMode == types.XAMode { | ||
| e := &BaseExecutor{} | ||
| e.Interceptors(hooks) | ||
| return e, nil | ||
| } |
There was a problem hiding this comment.
The new XA-mode branch in BuildExecutor isn’t covered by tests (current executor_test.go only exercises AT mode). Add a test case for transactionMode == types.XAMode to ensure a BaseExecutor is returned and interceptors are still applied as expected.
| // In XA mode, target might be nil (set with withOriginTx(nil)) | ||
| // The XA transaction is managed separately, so just return nil | ||
| if tx.target == nil { | ||
| return nil |
There was a problem hiding this comment.
Rollback() now silently succeeds whenever tx.target is nil. Since a nil target is expected only for XA mode (see withOriginTx(nil)), this should be guarded (e.g., only allow nil when tx.tranCtx != nil && tx.tranCtx.TransactionMode == types.XAMode) and otherwise return an error to avoid masking unexpected driver/initialization bugs.
| // In XA mode, target might be nil (set with withOriginTx(nil)) | |
| // The XA transaction is managed separately, so just return nil | |
| if tx.target == nil { | |
| return nil | |
| // In XA mode, target might be nil (set with withOriginTx(nil)). | |
| // Only allow a nil target when we are explicitly in XA mode; otherwise, | |
| // treat it as an error to avoid masking unexpected driver/initialization bugs. | |
| if tx.target == nil { | |
| if tx.tranCtx != nil && tx.tranCtx.TransactionMode == types.XAMode { | |
| // XA transaction is managed separately. | |
| return nil | |
| } | |
| return fmt.Errorf("sql.Tx Rollback: nil underlying transaction in non-XA mode") |
| func (c *XAConn) Commit(ctx context.Context) error { | ||
| // If this XA branch was created in autoCommit mode (multi-statement transaction), | ||
| // don't do the actual XA commit here. The TC will handle it in Phase 2. | ||
| if c.txCtx.IsAutoCommitXABranch { |
There was a problem hiding this comment.
[P0] 这里直接 return nil 会把 autoCommit XA 分支的 phase-1 完整短路掉。当前分支里 RegisterXABranch/GetXABranch 只负责复用连接,没有任何后续路径会再执行 XA END + XA PREPARE 或向 TC 上报 PhaseOneDone/Failed;tm.GlobalTransactionManager.Commit 也只发全局提交请求,不会回调到这个本地分支。结果就是这类分支既没有 prepare,也不会从 xaConnsByXID 清理,事务提交路径会丢失本地分支。建议把“复用同一 XA branch”与“结束/prepare branch”的时机拆开:复用期间不要重复 BeginTx,但在全局事务真正结束前仍然需要一个明确的 finalize 路径来执行 END/PREPARE/BranchReport。
AlexStocks
left a comment
There was a problem hiding this comment.
Code Review: feat: XA mode 'busy buffer' and 'bad connection' errors causing rollbacks
[P0] 阻断级问题
1. pkg/datasource/sql/conn_xa.go:195-260 - 逻辑复杂度高
createNewTxOnExecIfNeed 方法包含多层嵌套条件判断、defer 处理和错误处理,维护成本高:
func (c *XAConn) createNewTxOnExecIfNeed(ctx context.Context, f func() (types.ExecResult, error)) (types.ExecResult, error) {
var (
tx driver.Tx
err error
xaRollbacked bool
)
defer func() {
// 复杂的 defer 逻辑
}()
// 多层嵌套条件
if c.txCtx.TransactionMode != types.Local && tm.IsGlobalTx(ctx) && c.autoCommit {
// ...
if heldConn != nil && heldConn.xaActive && heldConn.txCtx.XID == xid {
if heldConn != c {
// ...
}
} else {
// ...
}
}
// ...
}建议拆分为小方法:
func (c *XAConn) createNewTxOnExecIfNeed(ctx context.Context, f func() (types.ExecResult, error)) (types.ExecResult, error) {
if c.shouldReuseXABranch(ctx) {
return c.executeOnExistingBranch(ctx, f)
}
return c.createNewBranchAndExecute(ctx, f)
}2. pkg/datasource/sql/db.go:194-210 - 并发保护不完整
xaConnsByXID 的并发访问保护不完整:
func (db *DBResource) GetXABranch(xid string) *XAConn {
v, ok := db.xaConnsByXID.Load(xid)
if !ok {
return nil
}
return v.(*XAConn) // 返回的连接可能被并发修改
}返回的 *XAConn 可能被其他 goroutine 并发修改(如 xaActive 字段)。建议:
- 返回连接的副本
- 或添加连接级别的互斥锁
[P1] 规范级问题
1. 缺少 IsAutoCommitXABranch 标志的行为说明
TxCtx.IsAutoCommitXABranch 的行为在分布式事务传播场景下未明确说明:
- 嵌套事务如何处理?
- 事务传播时是否继承此标志?
2. test/xa_issue904_test.go - 测试依赖外部数据库
测试用例依赖外部 MySQL 数据库:
_ "github.com/go-sql-driver/mysql"建议:
- 使用 mock 或跳过标记
- 添加 CI 环境的数据库配置说明
3. 错误处理改进
createNewTxOnExecIfNeed 中对 driver.ErrSkip 的处理使用了字符串比较:
isErrSkip := err == driver.ErrSkip || err.Error() == "driver: skip fast-path; continue as if unimplemented"建议使用错误类型判断或自定义错误类型。
总体评价:问题定位正确,但代码复杂度较高,建议拆分重构后合并。
- Refactor complex defer logic into handlePanicCleanup helper method - Add GetXABranch and UnregisterXABranch methods for proper registry management - Add nil target protection in tx.Rollback() for non-XA modes - Skip undo log generation for XA mode in undo_log_hook - Handle XA mode in BuildExecutor without AT-specific hooks - Add XA mode test case in TestBuildExecutor - Add documentation for TransactionContext.TransactionMode - Fix double rollback issue by setting rollBacked flag Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Refactor complex defer logic into handlePanicCleanup helper method - Add GetXABranch and UnregisterXABranch methods for proper registry management - Add nil target protection in tx.Rollback() for non-XA modes - Skip undo log generation for XA mode in undo_log_hook - Handle XA mode in BuildExecutor without AT-specific hooks - Add XA mode test case in TestBuildExecutor - Add documentation for TransactionContext.TransactionMode - Fix double rollback issue by setting rollBacked flag Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Replace string comparison with errors.Is for driver.ErrSkip detection - Add XA mode check in tx.Rollback() nil target handling - Prevents masking unexpected nil target errors in non-XA modes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #1052 +/- ##
==========================================
- Coverage 57.52% 57.45% -0.07%
==========================================
Files 267 267
Lines 17656 17675 +19
==========================================
Hits 10156 10156
- Misses 6672 6688 +16
- Partials 828 831 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
The unit test part of CI failed, fix it. |
I have registered the PR changes.
What this PR does:
Support XA branch reuse in autoCommit mode. When autoCommit is enabled, each SQL statement was creating a new XA branch, which could lead to "busy buffer" issues ([BUG] XA Mode Transaction Failures: 'busy buffer' and 'bad connection' Errors Causing Rollbacks #904). This
change allows multiple SQL statements within the same transaction to reuse the same XA branch.
Which issue(s) this PR fixes:
Fixes [BUG] XA Mode Transaction Failures: 'busy buffer' and 'bad connection' Errors Causing Rollbacks #904
Special notes for your reviewer:
Key changes:
xaRegistryto manage XA branch lifecycle and support reuseBeginTx: save original autoCommit state to GlobalLockRequirecreateNewTxOnExecIfNeed: implement XA branch reuse logicCommit: skip actual XA commit in autoCommit mode (defer to TC Phase2)Rollback: add conditional rollback and registry cleanupBuildExecutor: filter out undoLogSQLHook for XA modeundoLogSQLHook: skip undo log generation for XA modeTx.Rollback: handle nil target case for XA modeDoes this PR introduce a user-facing change?: