Skip to content

Commit 3da23a1

Browse files
committed
[CLIENT] Synchronize access to the connections collection and mutation of @current instance variable
1 parent 0071be9 commit 3da23a1

File tree

2 files changed

+27
-12
lines changed

2 files changed

+27
-12
lines changed

elasticsearch-transport/lib/elasticsearch/transport/transport/connections/selector.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,30 @@ def select(options={})
6161
class RoundRobin
6262
include Base
6363

64+
# @option arguments [Connections::Collection] :connections Collection with connections.
65+
#
66+
def initialize(arguments = {})
67+
super
68+
@mutex = Mutex.new
69+
@current = nil
70+
end
71+
6472
# Returns the next connection from the collection, rotating them in round-robin fashion.
6573
#
6674
# @return [Connections::Connection]
6775
#
6876
def select(options={})
69-
# On Ruby 1.9, Array#rotate could be used instead
70-
@current = !defined?(@current) || @current.nil? ? 0 : @current+1
71-
@current = 0 if @current >= connections.size
72-
connections[@current]
77+
@mutex.synchronize do
78+
conns = connections
79+
if @current && (@current < conns.size-1)
80+
@current += 1
81+
else
82+
@current = 0
83+
end
84+
conns[@current]
85+
end
7386
end
7487
end
75-
7688
end
7789
end
7890
end

elasticsearch-transport/spec/elasticsearch/connections/collection_spec.rb

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@
228228
context 'when multiple threads are used' do
229229

230230
let(:connections) do
231-
100.times.collect do |i|
231+
20.times.collect do |i|
232232
Elasticsearch::Transport::Transport::Connections::Connection.new(host: { host: i })
233233
end
234234
end
@@ -238,13 +238,16 @@
238238
end
239239

240240
it 'allows threads to select connections in parallel' do
241-
threads = []
242-
threads << Thread.new do
243-
2000.times do |i|
244-
collection.get_connection
241+
expect(10.times.collect do
242+
threads = []
243+
20.times do
244+
threads << Thread.new do
245+
collection.get_connection
246+
end
245247
end
246-
end
247-
threads.collect {|t| t.join}
248+
threads.collect { |t| t.join }
249+
collection.get_connection.host[:host]
250+
end).to all(eq(0))
248251
end
249252
end
250253
end

0 commit comments

Comments
 (0)