Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void processEventBatch(List<Event> events) {

tx.commit();
} catch (Throwable t) {
tx.rollback();
tryRollbackForException(tx, t);
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
Expand All @@ -204,9 +204,7 @@ public void processEventBatch(List<Event> events) {
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close();
}
tx.close();
}
}

Expand All @@ -225,15 +223,13 @@ public void processEventBatch(List<Event> events) {

tx.commit();
} catch (Throwable t) {
tx.rollback();
tryRollbackForException(tx, t);
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close();
}
tx.close();
}
}
}
Expand Down Expand Up @@ -269,7 +265,7 @@ public void processEvent(Event event) {

tx.commit();
} catch (Throwable t) {
tx.rollback();
tryRollbackForException(tx, t);
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
Expand Down Expand Up @@ -298,7 +294,7 @@ public void processEvent(Event event) {

tx.commit();
} catch (Throwable t) {
tx.rollback();
tryRollbackForException(tx, t);
LOG.error("Unable to put event on optional channel: " + optChannel, t);
if (t instanceof Error) {
throw (Error) t;
Expand All @@ -310,4 +306,14 @@ public void processEvent(Event event) {
}
}
}

private void tryRollbackForException(Transaction tx, Throwable cause) {
try {
tx.rollback();
} catch (RuntimeException | Error e) {
LOG.error("Exception occurred during transaction rollback. " +
"Logging original exception and propagating this one", cause);
throw e;
}
}
}