Skip to content
This repository was archived by the owner on Oct 15, 2025. It is now read-only.

Commit 5988ce7

Browse files
author
Juuso Mäyränen
committed
Support list patterns
1 parent 73f0577 commit 5988ce7

File tree

1 file changed

+74
-1
lines changed

1 file changed

+74
-1
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#
1919
module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
2020
BATCH_EMPTY_SLEEP = 0.25
21+
MAX_ITEMS_PER_WORKER = 1000
2122

2223
config_name "redis"
2324

@@ -51,14 +52,16 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
5152
# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
5253
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
5354
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
54-
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
55+
config :data_type, :validate => [ "list", "list_pattern", "channel", "pattern_channel" ], :required => true
5556

5657
# The number of events to return from Redis using EVAL.
5758
config :batch_count, :validate => :number, :default => 125
5859

5960
# Redefined Redis commands to be passed to the Redis client.
6061
config :command_map, :validate => :hash, :default => {}
6162

63+
config :worker_thread_count, :validate => :number, :default => 20
64+
6265
public
6366
# public API
6467
# use to store a proc that can provide a Redis instance or mock
@@ -86,6 +89,9 @@ def register
8689
if @data_type == 'list' || @data_type == 'dummy'
8790
@run_method = method(:list_runner)
8891
@stop_method = method(:list_stop)
92+
elsif @data_type == 'list_pattern'
93+
@run_method = method(:list_pattern_runner)
94+
@stop_method = method(:list_pattern_stop)
8995
elsif @data_type == 'channel'
9096
@run_method = method(:channel_runner)
9197
@stop_method = method(:subscribe_stop)
@@ -217,6 +223,73 @@ def list_runner(output_queue)
217223
end
218224
end
219225

226+
# private
227+
def list_pattern_stop
228+
return if @redis.nil? || !@redis.connected?
229+
230+
@redis.quit rescue nil
231+
@redis = nil
232+
end
233+
234+
def launch_worker(key)
235+
Thread.new do
236+
redis = new_redis_instance
237+
begin
238+
(0..MAX_ITEMS_PER_WORKER).each do
239+
value = redis.lpop(key)
240+
break if value.nil?
241+
@events << value
242+
end
243+
ensure
244+
redis.quit rescue nil
245+
end
246+
end
247+
end
248+
249+
def clear_workers
250+
finished_threads = []
251+
@worker_threads.each do |key, thread|
252+
next if thread.alive?
253+
finished_threads << key
254+
end
255+
finished_threads.each { |key| @worker_threads.delete(key) }
256+
end
257+
258+
def ensure_workers
259+
free_slots = @worker_thread_count - @worker_threads.length
260+
return if free_slots == 0
261+
keys = @redis.keys(@key)
262+
keys.shuffle
263+
keys = keys[0..free_slots - 1]
264+
keys.each do |key|
265+
@worker_threads[key] = launch_worker(key)
266+
end
267+
end
268+
269+
# private
270+
def list_pattern_runner(output_queue)
271+
begin
272+
@events = Queue.new
273+
@worker_threads = {}
274+
@redis ||= connect
275+
ensure_workers
276+
rescue ::Redis::BaseError => e
277+
@logger.warn("Redis connection problem", :exception => e)
278+
@redis = nil
279+
end
280+
while !stop?
281+
begin
282+
clear_workers
283+
ensure_workers
284+
item = @events.pop(true)
285+
next unless item
286+
queue_event(item, output_queue)
287+
rescue ThreadError => e
288+
sleep(0.1)
289+
end
290+
end
291+
end
292+
220293
def list_batch_listener(redis, output_queue)
221294
begin
222295
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])

0 commit comments

Comments
 (0)