[fix][io] JDBC sink: prevent OOM from unbounded queue on connection failure#9
Merged
david-streamlio merged 2 commits intoapache:masterfrom Apr 3, 2026
Conversation
3 tasks
c64e0a3 to
5f178e7
Compare
There was a problem hiding this comment.
Pull request overview
This PR addresses a production OOM scenario in the Pulsar JDBC sink when the database connection is slow/broken by adding queue back-pressure, improving sink state handling, and adding connection validation/reconnect behavior.
Changes:
- Add
maxQueueSizeto bound the internal record queue (with auto-sizing behavior) and reject records when full. - Fail records immediately when the sink is not
OPEN, and cancel periodic flush tasks onfatal()/close(). - Add
ensureConnection()to validate the JDBC connection before flushing and reconnect when invalid; add SQLite tests for the new behaviors.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java |
Adds bounded-queue back-pressure, state checks in write(), scheduled flush cancellation, and connection validation/reconnect in flush(). |
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java |
Introduces maxQueueSize configuration and its FieldDoc. |
jdbc/sqlite/src/test/java/org/apache/pulsar/io/jdbc/SqliteJdbcSinkTest.java |
Adds tests covering failing writes after fatal, bounded-queue back-pressure, and reconnection on invalid connection. |
Comments suppressed due to low confidence (1)
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java:233
- There is a race between the initial state check and enqueuing/scheduling: close()/fatal() can flip state to CLOSED/FAILED and shut down the flushExecutor after the check at line 208, causing write() to enqueue a record and then throw RejectedExecutionException when scheduling flush (or leave a queued record that will never be acked/failed because flush() returns when CLOSED). Re-check state (and executor availability) under the same synchronization used for enqueuing, and handle schedule rejection by failing the record (or flushing synchronously).
public void write(Record<T> record) throws Exception {
if (state.get() != State.OPEN) {
log.warn("Sink is not in OPEN state (current: {}), failing record", state.get());
record.fail();
return;
}
int number;
synchronized (incomingList) {
if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
if (!queueFullLogged) {
log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure",
incomingList.size(), maxQueueSize);
queueFullLogged = true;
}
record.fail();
return;
}
incomingList.add(record);
number = incomingList.size();
}
if (batchSize > 0 && number >= batchSize) {
if (log.isDebugEnabled()) {
log.debug("flushing by batches, hit batch size {}", batchSize);
}
flushExecutor.schedule(this::flush, 0, TimeUnit.MILLISECONDS);
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
Outdated
Show resolved
Hide resolved
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
Outdated
Show resolved
Hide resolved
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
Outdated
Show resolved
Hide resolved
jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
Outdated
Show resolved
Hide resolved
…ailure The JDBC sink's internal queue (incomingList) is unbounded. When the database connection drops, executeBatch() hangs until the TCP socket times out. During this period, isFlushing stays true, preventing any draining, while write() continues accepting records without limit. This causes OutOfMemoryError in production. This commit fixes 4 issues: 1. Bounded internal queue: write() now rejects records when queue exceeds maxQueueSize (configurable, defaults to 10x batchSize), applying Pulsar-level back-pressure via negative acknowledgment. 2. State check in write(): records are failed immediately when the sink state is not OPEN (after fatal() or close()). 3. Connection validation and reconnection: ensureConnection() validates the JDBC connection before each flush and reconnects automatically on failure, allowing recovery from transient database outages. 4. Scheduled flush cancellation: fatal() and close() now cancel the periodic flush task to prevent repeated failures on a broken connection. Fixes apache/pulsar#25030
- Move record.fail() outside synchronized(incomingList) to avoid holding the lock during framework callbacks - Move incomingList.size() check inside synchronized in flush() to fix data race on non-thread-safe LinkedList - Change maxQueueSize default from 0 (auto-bounded) to -1 (unbounded) to preserve backwards-compatible legacy behavior; users opt-in to bounded queue by setting maxQueueSize=0 (auto) or a positive value - Add overflow-safe auto-sizing (long arithmetic capped at MAX_VALUE) - Validate maxQueueSize in JdbcSinkConfig.validate() — reject < -1 - Add test for invalid maxQueueSize rejection
fb9f552 to
c141a2d
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Fixes apache/pulsar#25030
The JDBC sink's internal queue (
incomingList) is an unboundedLinkedList. When the database connection drops:executeBatch()hangs until TCP socket timeout (can be minutes)isFlushingstaystrue, blocking all queue drainingwrite()keeps accepting records without any limitincomingListgrows untilOutOfMemoryError: Java heap spaceThis is a production issue observed with both PostgreSQL and MariaDB JDBC sinks under Pulsar FunctionMesh.
Modifications
JdbcAbstractSink.java:write()rejects records when queue exceedsmaxQueueSize, applying Pulsar-level back-pressure viarecord.fail()(negative ack → consumer redelivers later)write(): records are failed immediately when state != OPEN (afterfatal()orclose())ensureConnection()validates the JDBC connection viaConnection.isValid()before each flush and reconnects automatically, allowing recovery from transient DB outages without pod restartfatal()andclose()cancel the periodic flushScheduledFutureto stop repeated failures on a broken connectionJdbcSinkConfig.java:maxQueueSizeconfig (default: 0 = auto, which resolves tobatchSize * 10). Users can override for their workload.Verifying this change
testWriteRejectsRecordsAfterFatal— verifieswrite()fails records when state is FAILEDtestBoundedQueueBackPressure— verifies records are failed when internal queue is fullDoes this pull request potentially affect one of the following parts of the Pulsar project?
maxQueueSize=0preserves legacy unbounded behavior)Documentation
doc-not-needed— Bug fix with backward-compatible config addition