Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 115 additions & 24 deletions jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -78,8 +79,12 @@ private enum State {
private Deque<Record<T>> incomingList;
private AtomicBoolean isFlushing;
private int batchSize;
private int maxQueueSize;
private ScheduledExecutorService flushExecutor;
private ScheduledFuture<?> scheduledFlushTask;
private SinkContext sinkContext;
private Properties connectionProperties;
private volatile boolean queueFullLogged = false;
private final AtomicReference<State> state = new AtomicReference<>(State.OPEN);

@Override
Expand All @@ -93,17 +98,17 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
throw new IllegalArgumentException("Required jdbc Url not set.");
}

Properties properties = new Properties();
connectionProperties = new Properties();
String username = jdbcSinkConfig.getUserName();
String password = jdbcSinkConfig.getPassword();
if (username != null) {
properties.setProperty("user", username);
connectionProperties.setProperty("user", username);
}
if (password != null) {
properties.setProperty("password", password);
connectionProperties.setProperty("password", password);
}

connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties);
connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());

Expand All @@ -114,12 +119,21 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc

int timeoutMs = jdbcSinkConfig.getTimeoutMs();
batchSize = jdbcSinkConfig.getBatchSize();
maxQueueSize = jdbcSinkConfig.getMaxQueueSize();
if (maxQueueSize == 0) {
// Auto-size: default to 10x batch size (overflow-safe)
long calculated = batchSize > 0 ? (long) batchSize * 10L : 10000L;
maxQueueSize = calculated > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) calculated;
}
// maxQueueSize < 0 (i.e. -1) means unbounded (legacy behavior)
log.info("JDBC sink queue capacity: {}", maxQueueSize > 0 ? maxQueueSize : "unbounded");
incomingList = new LinkedList<>();
isFlushing = new AtomicBoolean(false);

flushExecutor = Executors.newScheduledThreadPool(1);
if (timeoutMs > 0) {
flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
scheduledFlushTask = flushExecutor.scheduleAtFixedRate(
this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -158,6 +172,10 @@ private static List<String> getListFromConfig(String jdbcSinkConfig) {
@Override
public void close() throws Exception {
state.set(State.CLOSED);
if (scheduledFlushTask != null) {
scheduledFlushTask.cancel(false);
scheduledFlushTask = null;
}
if (flushExecutor != null) {
int timeoutMs = jdbcSinkConfig.getTimeoutMs() * 2;
flushExecutor.shutdown();
Expand Down Expand Up @@ -188,10 +206,35 @@ public void close() throws Exception {

@Override
public void write(Record<T> record) throws Exception {
int number;
if (state.get() != State.OPEN) {
log.warn("Sink is not in OPEN state (current: {}), failing record", state.get());
record.fail();
return;
}
int number = 0;
boolean shouldFail = false;
boolean shouldLogQueueFull = false;
int queueSizeSnapshot = 0;
synchronized (incomingList) {
incomingList.add(record);
number = incomingList.size();
if (maxQueueSize > 0 && incomingList.size() >= maxQueueSize) {
if (!queueFullLogged) {
queueFullLogged = true;
shouldLogQueueFull = true;
queueSizeSnapshot = incomingList.size();
}
shouldFail = true;
} else {
incomingList.add(record);
number = incomingList.size();
}
}
if (shouldFail) {
if (shouldLogQueueFull) {
log.warn("Internal queue is full ({} >= {}), failing records to apply back-pressure",
queueSizeSnapshot, maxQueueSize);
}
record.fail();
return;
}
Comment thread
harangozop marked this conversation as resolved.
if (batchSize > 0 && number >= batchSize) {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -239,26 +282,42 @@ protected enum MutationType {


private void flush() {
if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) {
boolean needAnotherRound;
final Deque<Record<T>> swapList = new LinkedList<>();

synchronized (incomingList) {
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
if (state.get() == State.CLOSED) {
return;
}
if (!isFlushing.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
synchronized (incomingList) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}
final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
incomingList.size();
}
return;
}
boolean needAnotherRound;
final Deque<Record<T>> swapList = new LinkedList<>();

for (int i = 0; i < actualBatchSize; i++) {
swapList.add(incomingList.removeFirst());
}
needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
synchronized (incomingList) {
if (incomingList.isEmpty()) {
isFlushing.set(false);
return;
}
if (log.isDebugEnabled()) {
log.debug("Starting flush, queue size: {}", incomingList.size());
}
final int actualBatchSize = batchSize > 0 ? Math.min(incomingList.size(), batchSize) :
incomingList.size();

for (int i = 0; i < actualBatchSize; i++) {
swapList.add(incomingList.removeFirst());
}
needAnotherRound = batchSize > 0 && !incomingList.isEmpty() && incomingList.size() >= batchSize;
}
long start = System.nanoTime();

int count = 0;
try {
ensureConnection();

PreparedStatement currentBatch = null;
final List<Mutation> mutations = swapList
.stream()
Expand Down Expand Up @@ -308,6 +367,7 @@ private void flush() {
} else {
internalFlush(swapList);
}
queueFullLogged = false;
} catch (Exception e) {
log.error("Got exception {} after {} ms, failing {} messages",
e.getMessage(),
Expand All @@ -329,10 +389,37 @@ private void flush() {
if (needAnotherRound) {
flush();
}
} else {
if (log.isDebugEnabled()) {
log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size());
}

private void ensureConnection() throws Exception {
try {
if (connection != null && connection.isValid(2)) {
return;
}
} catch (SQLException e) {
log.warn("Connection validation failed: {}", e.getMessage());
}

log.info("JDBC connection is invalid, attempting to reconnect to: {}", jdbcUrl);
closeConnectionQuietly();

connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), connectionProperties);
connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());

tableId = JdbcUtils.getTableId(connection, tableName);
initStatement();

log.info("Successfully reconnected to: {}", jdbcUrl);
}

private void closeConnectionQuietly() {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
log.debug("Error closing stale connection", e);
}
connection = null;
}
}

Expand Down Expand Up @@ -404,6 +491,10 @@ private static boolean isBatchItemFailed(int returnCode) {
*/
private void fatal(Exception e) {
if (sinkContext != null && state.compareAndSet(State.OPEN, State.FAILED)) {
log.error("Fatal error in JDBC sink, signaling framework for shutdown", e);
if (scheduledFlushTask != null) {
scheduledFlushTask.cancel(false);
}
sinkContext.fatal(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ public class JdbcSinkConfig implements Serializable {
)
private NullValueAction nullValueAction = NullValueAction.FAIL;

@FieldDoc(
required = false,
defaultValue = "-1",
help = "Maximum number of records to buffer in the internal queue before applying back-pressure. "
+ "When the queue is full, incoming records will be failed (negatively acknowledged) so that "
+ "the Pulsar consumer can redeliver them later. This prevents out-of-memory errors when the "
+ "database connection is slow or broken. "
+ "A value of 0 auto-sizes to batchSize * 10. "
+ "A value of -1 (default) disables the limit (unbounded, legacy behavior)."
)
private int maxQueueSize = -1;

public enum InsertMode {
INSERT,
UPSERT,
Expand All @@ -155,6 +167,10 @@ public void validate() {
if (timeoutMs <= 0 && batchSize <= 0) {
throw new IllegalArgumentException("timeoutMs or batchSize must be set to a positive value.");
}
if (maxQueueSize < -1) {
throw new IllegalArgumentException("maxQueueSize must be -1 (unbounded), 0 (auto-size), "
+ "or a positive value.");
}
}

}
Loading