diff --git a/lib/typesense/api_call.rb b/lib/typesense/api_call.rb index e41fff5..8c10477 100644 --- a/lib/typesense/api_call.rb +++ b/lib/typesense/api_call.rb @@ -25,6 +25,8 @@ def initialize(configuration) @logger = @configuration.logger + @nodes_mutex = Mutex.new + # Per-instance key for the thread-local connection cache so multiple # Typesense::Client instances in the same process do not share sockets. @thread_connections_key = :"_typesense_api_call_connections_#{object_id}" @@ -189,32 +191,34 @@ def build_keep_alive_connection(node) # But if no healthy nodes are found, it will just return the next node, even if it's unhealthy # so we can try the request for good measure, in case that node has become healthy since def next_node - # Check if nearest_node is set and is healthy, if so return it - unless @nearest_node.nil? - @logger.debug "Nodes health: Node #{@nearest_node[:index]} is #{@nearest_node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" - if @nearest_node[:is_healthy] == true || node_due_for_healthcheck?(@nearest_node) - @logger.debug "Updated current node to Node #{@nearest_node[:index]}" - return @nearest_node + @nodes_mutex.synchronize do + # Check if nearest_node is set and is healthy, if so return it + unless @nearest_node.nil? + @logger.debug "Nodes health: Node #{@nearest_node[:index]} is #{@nearest_node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" + if @nearest_node[:is_healthy] == true || node_due_for_healthcheck?(@nearest_node) + @logger.debug "Updated current node to Node #{@nearest_node[:index]}" + return @nearest_node + end + @logger.debug 'Falling back to individual nodes' end - @logger.debug 'Falling back to individual nodes' - end - # Fallback to nodes as usual - @logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}" - candidate_node = nil - (0..@nodes.length).each do |_i| - @current_node_index = (@current_node_index + 1) % @nodes.length - candidate_node = @nodes[@current_node_index] - if candidate_node[:is_healthy] == true || node_due_for_healthcheck?(candidate_node) - @logger.debug "Updated current node to Node #{candidate_node[:index]}" - return candidate_node + # Fallback to nodes as usual + @logger.debug "Nodes health: #{@nodes.each_with_index.map { |node, i| "Node #{i} is #{node[:is_healthy] == true ? 'Healthy' : 'Unhealthy'}" }.join(' || ')}" + candidate_node = nil + (0..@nodes.length).each do |_i| + @current_node_index = (@current_node_index + 1) % @nodes.length + candidate_node = @nodes[@current_node_index] + if candidate_node[:is_healthy] == true || node_due_for_healthcheck?(candidate_node) + @logger.debug "Updated current node to Node #{candidate_node[:index]}" + return candidate_node + end end - end - # None of the nodes are marked healthy, but some of them could have become healthy since last health check. - # So we will just return the next node. - @logger.debug "No healthy nodes were found. Returning the next node, Node #{candidate_node[:index]}" - candidate_node + # None of the nodes are marked healthy, but some of them could have become healthy since last health check. + # So we will just return the next node. + @logger.debug "No healthy nodes were found. Returning the next node, Node #{candidate_node[:index]}" + candidate_node + end end def node_due_for_healthcheck?(node) @@ -235,8 +239,10 @@ def initialize_metadata_for_nodes end def set_node_healthcheck(node, is_healthy:) - node[:is_healthy] = is_healthy - node[:last_access_timestamp] = Time.now.to_i + @nodes_mutex.synchronize do + node[:is_healthy] = is_healthy + node[:last_access_timestamp] = Time.now.to_i + end end def custom_exception_klass_for(response) diff --git a/spec/typesense/api_call_spec.rb b/spec/typesense/api_call_spec.rb index 959d901..040e665 100644 --- a/spec/typesense/api_call_spec.rb +++ b/spec/typesense/api_call_spec.rb @@ -259,6 +259,185 @@ it_behaves_like 'Node selection', :delete end + describe 'concurrent node rotation' do + it 'preserves round-robin ordering across nodes under concurrent calls' do + thread_count = 16 + iterations_per_thread = 90 + num_nodes = typesense.configuration.nodes.length + + counts = Array.new(num_nodes, 0) + counts_mutex = Mutex.new + + threads = Array.new(thread_count) do + Thread.new do + local_counts = Array.new(num_nodes, 0) + iterations_per_thread.times do + node = api_call.send(:next_node) + local_counts[node[:index]] += 1 + end + counts_mutex.synchronize do + local_counts.each_with_index { |c, i| counts[i] += c } + end + end + end + threads.each(&:join) + + expected_per_node = (thread_count * iterations_per_thread) / num_nodes + expect(counts).to all(eq(expected_per_node)) + end + + it 'never returns a node held unhealthy while next_node is called concurrently' do + unhealthy_node = api_call.instance_variable_get(:@nodes)[1] + api_call.send(:set_node_healthcheck, unhealthy_node, is_healthy: false) + + threads = Array.new(8) do + Thread.new do + Array.new(200) { api_call.send(:next_node)[:index] } + end + end + + results = threads.flat_map(&:value) + expect(results).not_to include(1) + expect(results).to include(0).and include(2) + end + + it 'still returns a node when every node is unhealthy under concurrent calls' do + nodes = api_call.instance_variable_get(:@nodes) + nodes.each { |node| api_call.send(:set_node_healthcheck, node, is_healthy: false) } + + threads = Array.new(8) do + Thread.new do + Array.new(50) { api_call.send(:next_node) } + end + end + + results = threads.flat_map(&:value) + expect(results.length).to eq(8 * 50) + expect(results).to all(be_a(Hash)) + expect(results.map { |n| n[:index] }).to all(be_between(0, nodes.length - 1).inclusive) + end + + it 'maintains invariants when health is toggled on multiple nodes concurrently with reads' do + nodes = api_call.instance_variable_get(:@nodes) + + writer_threads = nodes.map.with_index do |node, idx| + Thread.new do + 200.times { |i| api_call.send(:set_node_healthcheck, node, is_healthy: (i + idx).even?) } + end + end + + reader_threads = Array.new(8) do + Thread.new do + Array.new(200) { api_call.send(:next_node) } + end + end + + writer_threads.each(&:join) + reader_results = reader_threads.flat_map(&:value) + + expect(reader_results.length).to eq(8 * 200) + expect(reader_results).to all(be_a(Hash)) + expect(reader_results.map { |n| n[:index] }).to all(be_between(0, nodes.length - 1).inclusive) + nodes.each do |node| + expect(node[:is_healthy]).to be(true).or be(false) + expect(node[:last_access_timestamp]).to be_a(Integer) + end + end + + it 'picks a recovering node back up under concurrent reads after it is marked healthy again' do + nodes = api_call.instance_variable_get(:@nodes) + api_call.send(:set_node_healthcheck, nodes[1], is_healthy: false) + + stop = false + reader_threads = Array.new(4) do + Thread.new do + collected = [] + collected << api_call.send(:next_node)[:index] until stop + collected + end + end + + # Let readers spin while node 1 is unhealthy, then mark it healthy again. + sleep 0.05 + api_call.send(:set_node_healthcheck, nodes[1], is_healthy: true) + sleep 0.05 + stop = true + + results = reader_threads.flat_map(&:value) + expect(results).to include(1) + expect(results).to all(be_between(0, nodes.length - 1).inclusive) + end + + context 'with a single node' do + let(:typesense) do + Typesense::Client.new( + api_key: 'abcd', + nodes: [{ host: 'node0', port: 8108, protocol: 'http' }], + connection_timeout_seconds: 10, + retry_interval_seconds: 0.01, + log_level: Logger::ERROR + ) + end + + it 'returns the single node and keeps health state consistent under concurrent writes' do + node = typesense.configuration.nodes[0] + + threads = Array.new(8) do |i| + Thread.new do + 50.times do + api_call.send(:set_node_healthcheck, node, is_healthy: i.even?) + api_call.send(:next_node) + end + end + end + threads.each(&:join) + + expect(node[:is_healthy]).to be(true).or be(false) + expect(node[:last_access_timestamp]).to be_a(Integer) + end + end + + context 'with nearest_node configured' do + let(:typesense) do + Typesense::Client.new( + api_key: 'abcd', + nearest_node: { host: 'nearestNode', port: 6108, protocol: 'http' }, + nodes: [ + { host: 'node0', port: 8108, protocol: 'http' }, + { host: 'node1', port: 8108, protocol: 'http' }, + { host: 'node2', port: 8108, protocol: 'http' } + ], + connection_timeout_seconds: 10, + retry_interval_seconds: 0.01, + log_level: Logger::ERROR + ) + end + + it 'serializes reads and writes of nearest_node health state under concurrent access' do + nearest_node = api_call.instance_variable_get(:@nearest_node) + + writer_threads = Array.new(4) do |i| + Thread.new do + 100.times { api_call.send(:set_node_healthcheck, nearest_node, is_healthy: i.even?) } + end + end + + reader_threads = Array.new(8) do + Thread.new do + Array.new(100) { api_call.send(:next_node) } + end + end + + writer_threads.each(&:join) + reader_results = reader_threads.flat_map(&:value) + + expect(reader_results).to all(be_a(Hash)) + expect(nearest_node[:is_healthy]).to be(true).or be(false) + expect(nearest_node[:last_access_timestamp]).to be_a(Integer) + end + end + end + describe 'keep-alive connection caching' do subject(:api_call) { described_class.new(keep_alive_typesense.configuration) }