Skip to content
This repository was archived by the owner on Oct 15, 2025. It is now read-only.

Commit 67924f2

Browse files
author
Juuso Mäyränen
committed
Refactoring and fixes
1 parent 5988ce7 commit 67924f2

File tree

1 file changed

+19
-13
lines changed

1 file changed

+19
-13
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ def launch_worker(key)
246246
end
247247
end
248248

249-
def clear_workers
249+
def clear_finished_workers
250250
finished_threads = []
251251
@worker_threads.each do |key, thread|
252252
next if thread.alive?
@@ -260,32 +260,38 @@ def ensure_workers
260260
return if free_slots == 0
261261
keys = @redis.keys(@key)
262262
keys.shuffle
263-
keys = keys[0..free_slots - 1]
264263
keys.each do |key|
264+
next if @worker_threads.has_key?(key)
265265
@worker_threads[key] = launch_worker(key)
266+
free_slots -= 1
267+
break if free_slots == 0
266268
end
267269
end
268270

271+
def init_list_pattern_runner
272+
@events ||= Queue.new
273+
@worker_threads ||= {}
274+
@redis ||= connect
275+
clear_finished_workers
276+
ensure_workers
277+
end
278+
269279
# private
270280
def list_pattern_runner(output_queue)
271-
begin
272-
@events = Queue.new
273-
@worker_threads = {}
274-
@redis ||= connect
275-
ensure_workers
276-
rescue ::Redis::BaseError => e
277-
@logger.warn("Redis connection problem", :exception => e)
278-
@redis = nil
279-
end
281+
items = 0
280282
while !stop?
281283
begin
282-
clear_workers
283-
ensure_workers
284+
init_list_pattern_runner
284285
item = @events.pop(true)
285286
next unless item
286287
queue_event(item, output_queue)
288+
items += 1
287289
rescue ThreadError => e
290+
@logger.warn("Items processed: #{items}")
288291
sleep(0.1)
292+
rescue ::Redis::BaseError => e
293+
@logger.warn("Redis connection problem", :exception => e)
294+
@redis = nil
289295
end
290296
end
291297
end

0 commit comments

Comments
 (0)