From f5a304f13ff1a79b6b776d4ff6b8d7116913cf8e Mon Sep 17 00:00:00 2001 From: AM K Date: Fri, 5 Jul 2019 20:54:50 +0200 Subject: [PATCH] FLUME-3337: Log original exception when rollback fails If a transaction's rollback method throwed an unchecked exception, it propagated up the stack and suppressed the exception that caused the transaction to fail. This change adds logging of the original exception but does not change behavior otherwise. --- .../flume/channel/ChannelProcessor.java | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index 69878609ce..f7d8187c8e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -193,7 +193,7 @@ public void processEventBatch(List events) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; @@ -204,9 +204,7 @@ public void processEventBatch(List events) { "channel: " + reqChannel, t); } } finally { - if (tx != null) { - tx.close(); - } + tx.close(); } } @@ -225,15 +223,13 @@ public void processEventBatch(List events) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); LOG.error("Unable to put batch on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; } } finally { - if (tx != null) { - tx.close(); - } + tx.close(); } } } @@ -269,7 +265,7 @@ public void processEvent(Event event) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); if (t instanceof Error) { LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; @@ -298,7 +294,7 @@ public void processEvent(Event event) { tx.commit(); } catch (Throwable t) { - tx.rollback(); + tryRollbackForException(tx, t); LOG.error("Unable to put event on optional channel: " + optChannel, t); if (t instanceof Error) { throw (Error) t; @@ -310,4 +306,14 @@ public void processEvent(Event event) { } } } + + private void tryRollbackForException(Transaction tx, Throwable cause) { + try { + tx.rollback(); + } catch (RuntimeException | Error e) { + LOG.error("Exception occurred during transaction rollback. " + + "Logging original exception and propagating this one", cause); + throw e; + } + } }