Skip to content

Commit 2fe8b8b

Browse files
committed
Improved named pipe capability, and spool bug fix.
1 parent 2722fc3 commit 2fe8b8b

7 files changed

Lines changed: 223 additions & 36 deletions

File tree

src/main/java/info/fetter/logstashforwarder/FileReader.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@
1919
*/
2020

2121
import info.fetter.logstashforwarder.util.AdapterException;
22+
import info.fetter.logstashforwarder.util.LogFile;
2223
import info.fetter.logstashforwarder.util.RandomAccessFile;
24+
import info.fetter.logstashforwarder.util.NamedPipe;
2325

2426
import java.io.File;
2527
import java.io.IOException;
26-
//import java.io.RandomAccessFile;
2728
import java.nio.ByteBuffer;
2829
import java.util.Arrays;
2930
import java.util.Collection;
@@ -75,11 +76,11 @@ private int readFile(FileState state, int spaceLeftInSpool) {
7576
long pointer = state.getPointer();
7677
int numberOfEvents = 0;
7778
try {
78-
if(state.isDeleted() || state.getRandomAccessFile() == null) { // Don't try to read this file
79+
if(state.isDeleted() || state.getLogFile() == null) { // Don't try to read this file
7980
if(logger.isTraceEnabled()) {
8081
logger.trace("File : " + file + " has been deleted");
8182
}
82-
} else if(state.getRandomAccessFile().isEmpty()) {
83+
} else if(state.getLogFile().isEmpty()) {
8384
if(logger.isTraceEnabled()) {
8485
logger.trace("File : " + file + " is empty");
8586
}
@@ -104,8 +105,10 @@ private int readFile(FileState state, int spaceLeftInSpool) {
104105
}
105106

106107
private boolean isCompressedFile(FileState state) {
107-
RandomAccessFile reader = state.getRandomAccessFile();
108-
108+
LogFile logFile = state.getLogFile();
109+
if (!(logFile instanceof RandomAccessFile)) return false;
110+
RandomAccessFile reader = (RandomAccessFile) logFile;
111+
109112
try {
110113
for(byte[] magic : MAGICS) {
111114
byte[] fileBytes = new byte[magic.length];
@@ -143,14 +146,15 @@ private static void copyLineToBuffer(byte[] line, ByteBuffer byteBuffer) {
143146
}
144147

145148
private long readLines(FileState state, int spaceLeftInSpool) {
146-
RandomAccessFile reader = state.getRandomAccessFile();
149+
LogFile reader = state.getLogFile();
147150
long pos = state.getPointer();
148151
Multiline multiline = state.getMultiline();
152+
if (spaceLeftInSpool < 1) return pos;
149153
try {
150154
reader.seek(pos);
151155
byte[] line = readLine(reader);
152156
bufferedLines.clear();
153-
157+
154158
if(multiline != null && multiline.isPrevious()) {
155159
spaceLeftInSpool--;
156160
}
@@ -201,7 +205,7 @@ private long readLines(FileState state, int spaceLeftInSpool) {
201205
}
202206
}
203207
}
204-
line = readLine(reader);
208+
if (spaceLeftInSpool > 0) line = readLine(reader);
205209
}
206210
if(bufferedLines.position() > 0) {
207211
addEvent(state, pos, extractBytes(bufferedLines)); // send any buffered lines left
@@ -213,7 +217,7 @@ private long readLines(FileState state, int spaceLeftInSpool) {
213217
return pos;
214218
}
215219

216-
private byte[] readLine(RandomAccessFile reader) throws IOException {
220+
private byte[] readLine(LogFile reader) throws IOException {
217221
byteBuffer.clear();
218222
int ch;
219223
boolean seenCR = false;

src/main/java/info/fetter/logstashforwarder/FileSigner.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
package info.fetter.logstashforwarder;
22

3-
import info.fetter.logstashforwarder.util.RandomAccessFile;
4-
53
import java.io.IOException;
6-
//import java.io.RandomAccessFile;
74
import java.util.zip.Adler32;
85

6+
import info.fetter.logstashforwarder.util.LogFile;
7+
import info.fetter.logstashforwarder.util.RandomAccessFile;
8+
99

1010
public class FileSigner {
1111
private static final Adler32 adler32 = new Adler32();
12-
13-
public static long computeSignature(RandomAccessFile file, int signatureLength) throws IOException {
12+
13+
public static long computeSignature(LogFile logFile, int signatureLength) throws IOException {
14+
if (!(logFile instanceof RandomAccessFile)) return 0;
15+
16+
RandomAccessFile reader = (RandomAccessFile) logFile;
1417
adler32.reset();
1518
byte[] input = new byte[signatureLength];
16-
file.seek(0);
17-
file.read(input);
19+
reader.seek(0);
20+
reader.read(input);
1821
adler32.update(input);
1922
return adler32.getValue();
2023
}

src/main/java/info/fetter/logstashforwarder/FileState.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
*
1818
*/
1919

20+
import info.fetter.logstashforwarder.util.LogFile;
2021
import info.fetter.logstashforwarder.util.RandomAccessFile;
22+
import info.fetter.logstashforwarder.util.NamedPipe;
2123

2224
import java.io.File;
2325
import java.io.FileNotFoundException;
@@ -47,7 +49,7 @@ public class FileState {
4749
@JsonIgnore
4850
private boolean changed = false;
4951
@JsonIgnore
50-
private RandomAccessFile randomAccessFile;
52+
private LogFile logFile;
5153
private long pointer = 0;
5254
@JsonIgnore
5355
private FileState oldFileState;
@@ -67,15 +69,19 @@ public FileState(File file) throws IOException {
6769
this.file = file;
6870
directory = file.getCanonicalFile().getParent();
6971
fileName = file.getName();
70-
randomAccessFile = new RandomAccessFile(file.getPath(), "r");
72+
if (file.isFile()) {
73+
logFile = new RandomAccessFile(file.getPath(), "r");
74+
} else {
75+
logFile = new NamedPipe(file);
76+
}
7177
lastModified = file.lastModified();
7278
size = file.length();
7379
}
7480

7581
private void setFileFromDirectoryAndName() throws FileNotFoundException {
7682
file = new File(directory + File.separator + fileName);
7783
if(file.exists()) {
78-
randomAccessFile = null;
84+
logFile = null;
7985
lastModified = file.lastModified();
8086
size = file.length();
8187
} else {
@@ -141,8 +147,8 @@ public void setSignature(long signature) {
141147
this.signature = signature;
142148
}
143149

144-
public RandomAccessFile getRandomAccessFile() {
145-
return randomAccessFile;
150+
public LogFile getLogFile() {
151+
return logFile;
146152
}
147153

148154
public long getPointer() {
@@ -172,7 +178,7 @@ public void setOldFileState(FileState oldFileState) {
172178

173179
public void deleteOldFileState() {
174180
try {
175-
oldFileState.getRandomAccessFile().close();
181+
oldFileState.getLogFile().close();
176182
oldFileState = null;
177183
} catch(Exception e) {}
178184
}

src/main/java/info/fetter/logstashforwarder/FileWatcher.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
/*
44
* Copyright 2015 Didier Fetter
5+
* Copyright 2017 Alberto González Palomo https://sentido-labs.com
56
*
67
* Licensed under the Apache License, Version 2.0 (the "License");
78
* you may not use this file except in compliance with the License.
@@ -136,7 +137,7 @@ private void processModifications() throws IOException {
136137
logger.trace("Same signature size and value : file is the same");
137138
continue;
138139
} else if(oldState.getSignatureLength() < state.getSignatureLength()){
139-
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), oldState.getSignatureLength());
140+
long signature = FileSigner.computeSignature(state.getLogFile(), oldState.getSignatureLength());
140141
if(signature == oldState.getSignature()) {
141142
state.setOldFileState(oldState);
142143
logger.trace("Same signature : file is the same");
@@ -163,7 +164,7 @@ private void processModifications() throws IOException {
163164
logger.trace("Same signature size and value : file is the same");
164165
break;
165166
} else if(otherState.getSignatureLength() < state.getSignatureLength()){
166-
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), otherState.getSignatureLength());
167+
long signature = FileSigner.computeSignature(state.getLogFile(), otherState.getSignatureLength());
167168
if(signature == otherState.getSignature()) {
168169
state.setOldFileState(otherState);
169170
logger.trace("Same signature : file is the same");
@@ -194,7 +195,7 @@ private void processModifications() throws IOException {
194195
logger.debug("File " + state.getFile() + " has been replaced and not renamed, removing from watchMap");
195196
}
196197
try {
197-
oldState.getRandomAccessFile().close();
198+
oldState.getLogFile().close();
198199
} catch(Exception e) {}
199200
oldWatchMap.remove(state.getFile());
200201
}
@@ -223,12 +224,33 @@ private void processModifications() throws IOException {
223224
removeMarkedFilesFromWatchMap();
224225
}
225226

227+
// This filter will accept anything that is not a directory,
228+
// including named pipes (FIFOs), sockets and device files.
229+
// The standard org.apache.commons.io.filefilter.FileFileFilter excludes
230+
// them even if their documentation says
231+
// "This filter accepts Files that are files (not directories)."
232+
protected class FileFileFilter implements IOFileFilter
233+
{
234+
@Override
235+
public boolean accept(File file) {
236+
return !file.isDirectory();
237+
}
238+
239+
@Override
240+
public boolean accept(File dir, String name) {
241+
return accept(new File(dir, name));
242+
}
243+
}
244+
protected IOFileFilter fileFileFilter() {
245+
return new FileFileFilter();
246+
}
247+
226248
private void addSingleFile(String fileToWatch, Event fields, long deadTime, Multiline multiline, Filter filter) throws Exception {
227249
logger.info("Watching file : " + new File(fileToWatch).getCanonicalPath());
228250
String directory = FilenameUtils.getFullPath(fileToWatch);
229251
String fileName = FilenameUtils.getName(fileToWatch);
230252
IOFileFilter fileFilter = FileFilterUtils.and(
231-
FileFilterUtils.fileFileFilter(),
253+
fileFileFilter(),
232254
FileFilterUtils.nameFileFilter(fileName),
233255
new LastModifiedFileFilter(deadTime));
234256
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
@@ -240,7 +262,7 @@ private void addWildCardFiles(String filesToWatch, Event fields, long deadTime,
240262
String wildcard = FilenameUtils.getName(filesToWatch);
241263
logger.trace("Directory : " + new File(directory).getCanonicalPath() + ", wildcard : " + wildcard);
242264
IOFileFilter fileFilter = FileFilterUtils.and(
243-
FileFilterUtils.fileFileFilter(),
265+
fileFileFilter(),
244266
new WildcardFileFilter(wildcard),
245267
new LastModifiedFileFilter(deadTime));
246268
initializeWatchMap(new File(directory), fileFilter, fields, multiline, filter);
@@ -273,7 +295,7 @@ private void addFileToWatchMap(Map<File,FileState> map, File file, Event fields,
273295
state.setFields(fields);
274296
int signatureLength = (int) (state.getSize() > maxSignatureLength ? maxSignatureLength : state.getSize());
275297
state.setSignatureLength(signatureLength);
276-
long signature = FileSigner.computeSignature(state.getRandomAccessFile(), signatureLength);
298+
long signature = FileSigner.computeSignature(state.getLogFile(), signatureLength);
277299
state.setSignature(signature);
278300
logger.trace("Setting signature of size : " + signatureLength + " on file : " + file + " : " + signature);
279301
state.setMultiline(multiline);
@@ -331,7 +353,7 @@ private void removeMarkedFilesFromWatchMap() throws IOException {
331353
List<File> markedList = null;
332354
for(File file : oldWatchMap.keySet()) {
333355
FileState state = oldWatchMap.get(file);
334-
if(state.getRandomAccessFile() == null) {
356+
if(state.getLogFile() == null) {
335357
state.setDeleted();
336358
}
337359
if(state.isDeleted()) {
@@ -342,7 +364,7 @@ private void removeMarkedFilesFromWatchMap() throws IOException {
342364
markedList.add(file);
343365
}
344366
try {
345-
state.getRandomAccessFile().close();
367+
state.getLogFile().close();
346368
} catch(Exception e) {}
347369
}
348370
}
@@ -358,7 +380,7 @@ public void close() throws IOException {
358380
logger.debug("Closing all files");
359381
for(File file : oldWatchMap.keySet()) {
360382
FileState state = oldWatchMap.get(file);
361-
state.getRandomAccessFile().close();
383+
state.getLogFile().close();
362384
}
363385
}
364386

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package info.fetter.logstashforwarder.util;
2+
3+
/*
4+
* Copyright 2018 Alberto González Palomo https://sentido-labs.com
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
*/
19+
20+
import java.io.File;
21+
import java.io.FileInputStream;
22+
import java.io.InputStream;
23+
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
26+
import org.apache.log4j.Logger;
27+
28+
public interface LogFile {
29+
/**
30+
* Check whether the file is empty: normal files are empty
31+
* when their size is zero, but other kinds of files like
32+
* named pipes / FIFOs do not report a size.
33+
*/
34+
public abstract boolean isEmpty() throws IOException;
35+
public abstract void seek(long pos) throws IOException;
36+
public abstract long getFilePointer() throws IOException;
37+
public abstract int read() throws IOException;
38+
public abstract void close() throws IOException;
39+
}

0 commit comments

Comments
 (0)