@@ -38,6 +38,8 @@ public abstract class DirectoryPollingModule extends PollingModule {
3838 private Map <String , Long > trackedFiles = new HashMap <String , Long >();
3939 // Files that have been passed to a processor - key is the absolute file path and value is the processing start timestamp
4040 private Map <String , Long > processingFiles = new HashMap <String , Long >();
41+ // Files that have been processed in thread mode and need removing from the tracked files maps by the main thread
42+ Collection <String > threadProcessedFiles = Collections .synchronizedCollection (new ArrayList <String >());
4143
4244 private String errorDir = null ;
4345 private String sentDir = null ;
@@ -217,24 +219,37 @@ protected void processSingleFile(File file, String fileEntryKey) {
217219 //forceStop(e1);
218220 return ;
219221 }
220- } finally {
221- // Remove trackedFiles entry first to avoid race condition in parallel processing
222- trackedFiles .remove (fileEntryKey );
223- processingFiles .remove (fileEntryKey );
224- }
222+ }
225223 }
226224
227- protected void triggerFileProcessing (File file , String fileEntryKey ) {
228- // Add the in processing flag on the file now otherwise in threaded mode there is a race condition
229- processingFiles .put (fileEntryKey , System .currentTimeMillis ());
230- if (processFilesAsThreads ) {
231- processFileInThread (file , fileEntryKey );
232- } else {
233- processSingleFile (file , fileEntryKey );
225+ private void processFileInThread (File file , String fileEntryKey ) {
226+ if (logger .isDebugEnabled ()) {
227+ logger .debug ("Parallel processing mode handling file: " + file .getName ());
234228 }
229+ executorService .execute (new Runnable () {
230+ @ Override
231+ public void run () {
232+ try {
233+ processSingleFile (file , fileEntryKey );
234+ } finally {
235+ // Add to list for removal from tracking maps when updateTracking is called
236+ threadProcessedFiles .add (fileEntryKey );
237+ }
238+ }
239+ });
235240 }
236241
237242 private void updateTracking () {
243+ // first remove processed files if we are running in parallel processing mode to avoid concurrency issues
244+ if (processFilesAsThreads ) {
245+ synchronized (threadProcessedFiles ) {
246+ threadProcessedFiles .forEach ((fileEntryKey ) -> {
247+ trackedFiles .remove (fileEntryKey );
248+ processingFiles .remove (fileEntryKey );
249+ });
250+ threadProcessedFiles .clear ();
251+ }
252+ }
238253 // Use an iterator to be able to remove entries whilst iterating over the map.
239254 Iterator <Map .Entry <String , Long >> iter = trackedFiles .entrySet ().iterator ();
240255 while (iter .hasNext ()) {
@@ -264,24 +279,24 @@ private void updateTracking() {
264279 trackedFiles .put (fileEntryKey , Long .valueOf (newLength ));
265280 } else {
266281 // no change in file length so process the file
267- triggerFileProcessing (file , fileEntryKey );
282+ // Add the processing flag on the file now otherwise in threaded mode there is a race condition
283+ processingFiles .put (fileEntryKey , System .currentTimeMillis ());
284+ if (processFilesAsThreads ) {
285+ processFileInThread (file , fileEntryKey );
286+ } else {
287+ try {
288+ processSingleFile (file , fileEntryKey );
289+ } finally {
290+ iter .remove ();
291+ processingFiles .remove (fileEntryKey );
292+ }
293+ }
294+
268295 }
269296 }
270297 }
271298 }
272299
273- private void processFileInThread (File file , String fileEntryKey ) {
274- if (logger .isDebugEnabled ()) {
275- logger .debug ("Parallel processing mode handling file: " + file .getName ());
276- }
277- executorService .execute (new Runnable () {
278- @ Override
279- public void run () {
280- processSingleFile (file , fileEntryKey );
281- }
282- });
283- }
284-
285300 protected abstract Message createMessage ();
286301
287302 protected void processFile (File file ) throws OpenAS2Exception {
0 commit comments