From 19cc5929c8e48fbe1c56a2dd48ee0d4a03ccb5ae Mon Sep 17 00:00:00 2001 From: edricming Date: Mon, 19 Aug 2019 19:34:13 +0800 Subject: [PATCH] FLUME-3341 Fix:Taildir source may cause file handle leak and data duplication --- .../source/taildir/ReliableTaildirEventReader.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 633d3c1903..5b51957e72 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -244,9 +244,17 @@ public List updateTailFiles(boolean skipToEnd) throws IOException { for (File f : taildir.getMatchingFiles()) { long inode = getInode(f); TailFile tf = tailFiles.get(inode); - if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { + if (tf == null) { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos); + } else if (!tf.getPath().equals(f.getAbsolutePath())) { + if (tf.getRaf() != null) { + tf.close(); + logger.info("Close file at " + tf.getPath() + ", it no longer exists."); + } + long endPos = f.length() < tf.getPos() ? 0 : tf.getPos(); + long startPos = skipToEnd ? f.length() : endPos; + tf = openFile(f, headers, inode, startPos); } else { boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length(); if (updated) {