33require "logstash/inputs/base"
44require "logstash/inputs/threadable"
55require 'redis'
6+ require 'concurrent/atomics'
67
78# This input will read events from a Redis instance; it supports both Redis channels and lists.
89# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -229,16 +230,27 @@ def list_pattern_stop
229230
230231 @redis . quit rescue nil
231232 @redis = nil
233+ @worker_threads . each do |key , thread |
234+ if thread . alive?
235+ thread . join ( 5 )
236+ @logger . warn ( "Thread #{ key } joined" )
237+ end
238+ end
232239 end
233240
234- def launch_worker ( key )
241+ def launch_worker ( output_queue , key )
235242 Thread . new do
236243 redis = new_redis_instance
237244 begin
238245 ( 0 ..MAX_ITEMS_PER_WORKER ) . each do
246+ if stop?
247+ @logger . warn ( "Breaking from thread #{ key } as it was requested to stop" )
248+ break
249+ end
239250 value = redis . lpop ( key )
240251 break if value . nil?
241- @events << value
252+ queue_event ( value , output_queue )
253+ @items_processed . increment
242254 end
243255 ensure
244256 redis . quit rescue nil
@@ -255,45 +267,40 @@ def clear_finished_workers
255267 finished_threads . each { |key | @worker_threads . delete ( key ) }
256268 end
257269
258- def ensure_workers
270+ def ensure_workers ( output_queue )
259271 free_slots = @worker_thread_count - @worker_threads . length
260272 return if free_slots == 0
261273 keys = @redis . keys ( @key )
262274 keys . shuffle
263275 keys . each do |key |
264276 next if @worker_threads . has_key? ( key )
265- @worker_threads [ key ] = launch_worker ( key )
277+ @worker_threads [ key ] = launch_worker ( output_queue , key )
266278 free_slots -= 1
267279 break if free_slots == 0
268280 end
269281 end
270282
271- def init_list_pattern_runner
272- @events ||= Queue . new
283+ def init_list_pattern_runner ( output_queue )
273284 @worker_threads ||= { }
274285 @redis ||= connect
286+ @items_processed ||= Concurrent ::AtomicFixnum . new
275287 clear_finished_workers
276- ensure_workers
288+ ensure_workers ( output_queue )
277289 end
278290
279291 # private
280292 def list_pattern_runner ( output_queue )
281293 items = 0
282294 while !stop?
283295 begin
284- init_list_pattern_runner
285- item = @events . pop ( true )
286- next unless item
287- queue_event ( item , output_queue )
288- items += 1
289- rescue ThreadError => e
290- @logger . warn ( "Items processed: #{ items } " )
296+ init_list_pattern_runner ( output_queue )
291297 sleep ( 0.1 )
292298 rescue ::Redis ::BaseError => e
293299 @logger . warn ( "Redis connection problem" , :exception => e )
294300 @redis = nil
295301 end
296302 end
303+ @logger . warn ( "Total items processed: #{ @items_processed } " )
297304 end
298305
299306 def list_batch_listener ( redis , output_queue )
0 commit comments