diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index 69878609ce..f7d8187c8e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -193,7 +193,7 @@ public void processEventBatch(List events) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; @@ -204,9 +204,7 @@ public void processEventBatch(List events) { "channel: " + reqChannel, t); } } finally { - if (tx != null) { - tx.close(); - } + tx.close(); } } @@ -225,15 +223,13 @@ public void processEventBatch(List events) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); LOG.error("Unable to put batch on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { - if (tx != null) { - tx.close(); - } + tx.close(); } } } @@ -269,7 +265,7 @@ public void processEvent(Event event) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; @@ -298,7 +294,7 @@ public void processEvent(Event event) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); LOG.error("Unable to put event on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; @@ -310,4 +306,14 @@ public void processEvent(Event event) { } } } + + private void tryRollbackForException(Transaction tx, Throwable cause) { + try { + tx.rollback(); + } catch (RuntimeException | Error e) { + LOG.error("Exception occurred during transaction rollback. " + + "Logging original exception and propagating this one", cause); + throw e; + } + } }