Skip to content

Commit 1c97da4

Browse files
fix LocalJumpError exception
1 parent 6c68674 commit 1c97da4

File tree

2 files changed

+91
-55
lines changed

2 files changed

+91
-55
lines changed

lib/logstash/outputs/redis.rb

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
# The RPUSH command is supported in Redis v0.0.7+. Using
88
# PUBLISH to a channel requires at least v1.3.8+.
99
# While you may be able to make these Redis versions work,
10-
# the best performance and stability will be found in more
10+
# the best performance and stability will be found in more
1111
# recent stable versions. Versions 2.6.0+ are recommended.
1212
#
1313
# For more information, see http://redis.io/[the Redis homepage]
@@ -144,14 +144,17 @@ def register
144144
end # def register
145145

146146
def receive(event)
147-
148-
149147
# TODO(sissel): We really should not drop an event, but historically
150148
# we have dropped events that fail to be converted to json.
151149
# TODO(sissel): Find a way to continue passing events through even
152150
# if they fail to convert properly.
153151
begin
154152
@codec.encode(event)
153+
rescue LocalJumpError
154+
# This LocalJumpError rescue clause is required to test for regressions
155+
# for https://github.com/logstash-plugins/logstash-output-redis/issues/26
156+
# see specs. Without it the LocalJumpError is rescued by the StandardError
157+
raise
155158
rescue StandardError => e
156159
@logger.warn("Error encoding event", :exception => e,
157160
:event => event)
@@ -229,11 +232,11 @@ def identity
229232
def send_to_redis(event, payload)
230233
# How can I do this sort of thing with codecs?
231234
key = event.sprintf(@key)
232-
233-
if @batch and @data_type == 'list' # Don't use batched method for pubsub.
235+
236+
if @batch && @data_type == 'list' # Don't use batched method for pubsub.
234237
# Stud::Buffer
235238
buffer_receive(payload, key)
236-
next
239+
return
237240
end
238241

239242
begin

spec/outputs/redis_spec.rb

Lines changed: 82 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,70 +4,103 @@
44
require "redis"
55
require "flores/random"
66

7-
describe LogStash::Outputs::Redis, :redis => true do
7+
describe LogStash::Outputs::Redis do
88

9-
shared_examples_for "writing to redis list" do |extra_config|
10-
let(:key) { 10.times.collect { rand(10).to_s }.join("") }
11-
let(:event_count) { Flores::Random.integer(0..10000) }
12-
let(:message) { Flores::Random.text(0..100) }
13-
let(:default_config) {
14-
{
15-
"key" => key,
16-
"data_type" => "list",
17-
"host" => "localhost"
9+
context "integration tests", :redis => true do
10+
shared_examples_for "writing to redis list" do |extra_config|
11+
let(:key) { 10.times.collect { rand(10).to_s }.join("") }
12+
let(:event_count) { Flores::Random.integer(0..10000) }
13+
let(:message) { Flores::Random.text(0..100) }
14+
let(:default_config) {
15+
{
16+
"key" => key,
17+
"data_type" => "list",
18+
"host" => "localhost"
19+
}
1820
}
19-
}
20-
let(:redis_config) {
21-
default_config.merge(extra_config || {})
22-
}
23-
let(:redis_output) { described_class.new(redis_config) }
21+
let(:redis_config) {
22+
default_config.merge(extra_config || {})
23+
}
24+
let(:redis_output) { described_class.new(redis_config) }
2425

25-
before do
26-
redis_output.register
27-
event_count.times do |i|
28-
event = LogStash::Event.new("sequence" => i, "message" => message)
29-
redis_output.receive(event)
26+
before do
27+
redis_output.register
28+
event_count.times do |i|
29+
event = LogStash::Event.new("sequence" => i, "message" => message)
30+
redis_output.receive(event)
31+
end
32+
redis_output.close
3033
end
31-
redis_output.close
32-
end
3334

34-
it "should successfully send all events to redis" do
35-
redis = Redis.new(:host => "127.0.0.1")
36-
37-
# The list should contain the number of elements our agent pushed up.
38-
insist { redis.llen(key) } == event_count
35+
it "should successfully send all events to redis" do
36+
redis = Redis.new(:host => "127.0.0.1")
37+
38+
# The list should contain the number of elements our agent pushed up.
39+
insist { redis.llen(key) } == event_count
3940

40-
# Now check all events for order and correctness.
41-
event_count.times do |value|
42-
id, element = redis.blpop(key, 0)
43-
event = LogStash::Event.new(LogStash::Json.load(element))
44-
insist { event["sequence"] } == value
45-
insist { event["message"] } == message
41+
# Now check all events for order and correctness.
42+
event_count.times do |value|
43+
id, element = redis.blpop(key, 0)
44+
event = LogStash::Event.new(LogStash::Json.load(element))
45+
insist { event["sequence"] } == value
46+
insist { event["message"] } == message
47+
end
48+
49+
# The list should now be empty
50+
insist { redis.llen(key) } == 0
4651
end
52+
end
4753

48-
# The list should now be empty
49-
insist { redis.llen(key) } == 0
54+
context "when batch_mode is false" do
55+
include_examples "writing to redis list"
5056
end
51-
end
5257

53-
context "when batch_mode is false" do
54-
include_examples "writing to redis list"
58+
context "when batch_mode is true" do
59+
batch_events = Flores::Random.integer(1..1000)
60+
batch_settings = {
61+
"batch" => true,
62+
"batch_events" => batch_events
63+
}
64+
65+
include_examples "writing to redis list", batch_settings do
66+
67+
# A canary to make sure we're actually enabling batch mode
68+
# in this shared example.
69+
it "should have batch mode enabled" do
70+
expect(redis_config).to include("batch")
71+
expect(redis_config["batch"]).to be_truthy
72+
end
73+
end
74+
end
5575
end
5676

57-
context "when batch_mode is true" do
58-
batch_events = Flores::Random.integer(1..1000)
59-
batch_settings = {
60-
"batch" => true,
61-
"batch_events" => batch_events
77+
context "Redis#receive in batch mode" do
78+
# this is a regression test harness to verify fix for https://github.com/logstash-plugins/logstash-output-redis/issues/26
79+
# TODO: refactor specs above and probably rely on a Redis mock to correctly test the code expected behaviour, the actual
80+
# tests agains Redis should be moved into integration tests.
81+
let(:key) { "thekey" }
82+
let(:payload) { "somepayload"}
83+
let(:event) { LogStash::Event.new({"message" => "test"}) }
84+
let(:config) {
85+
{
86+
"key" => key,
87+
"data_type" => "list",
88+
"batch" => true,
89+
"batch_events" => 50,
90+
}
6291
}
92+
let(:redis) { described_class.new(config) }
6393

64-
include_examples "writing to redis list", batch_settings do
94+
it "should call buffer_receive" do
95+
redis.register
96+
expect(redis).to receive(:buffer_receive).exactly(10000).times.and_call_original
97+
expect(redis).to receive(:flush).exactly(200).times
6598

66-
# A canary to make sure we're actually enabling batch mode
67-
# in this shared example.
68-
it "should have batch mode enabled" do
69-
expect(redis_config).to include("batch")
70-
expect(redis_config["batch"]).to be_truthy
99+
# I was able to reproduce the LocalJumpError: unexpected next exception at around 50
100+
# consicutive invocations. setting to 10000 should reproduce it for any environment
101+
# I have no clue at this point why this problem does not happen at every invocation
102+
1.upto(10000) do
103+
expect{redis.receive(event)}.to_not raise_error
71104
end
72105
end
73106
end

0 commit comments

Comments
 (0)