Skip to content

Commit 5db1ce4

Browse files
committed
instead of reseting round robin after a sniff let it count onwards
1 parent 16f7904 commit 5db1ce4

File tree

4 files changed

+39
-24
lines changed

4 files changed

+39
-24
lines changed

src/Elasticsearch.Net.Tests.Unit/Connection/ConcurrencyTests.cs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ public ConcurrencyTests()
3939
_connectionPool = new SniffingConnectionPool(_uris);
4040
_config = new ConnectionConfiguration(_connectionPool)
4141
.SnifsOnConnectionFault()
42-
.SniffOnStartup();
42+
.SniffOnStartup()
43+
.SetMaxRetries(5);
4344
}
4445

4546
private void ProvideTransport(AutoFake fake)
@@ -58,12 +59,9 @@ public void CallInfo40000TimesOnMultipleThreads()
5859
//prove a real HttpTransport with its unspecified dependencies
5960
//as fakes
6061

61-
//set up fake for a call on IConnection.GetSync so that it always throws
62-
//an exception
6362
var connection = fake.Provide<IConnection>(new ConcurrencyTestConnection(this._config));
6463
this.ProvideTransport(fake);
65-
//create a real ElasticsearchClient with it unspecified dependencies
66-
//as fakes
64+
//create a real ElasticsearchClient with it unspecified dependencies as fakes
6765
var client = fake.Resolve<ElasticsearchClient>();
6866
int seen = 0;
6967
Assert.DoesNotThrow(()=>
@@ -93,7 +91,14 @@ public void CallInfo40000TimesOnMultipleThreads()
9391
seen.Should().Be(40000);
9492
}
9593
}
96-
94+
/// <summary>
95+
/// This simulates a super flakey elasticsearch cluster.
96+
/// - if random 1-9 is a muliple of 3 throw a 503
97+
/// - never throws on node 9202 though so that all calls can be expected to always succeed.
98+
/// - Sniff can either get back the full cluster or a sufficient subset of it.
99+
/// - Our cluster have 5 nodes the recommendation is to have N/2+1 masters so we should atleast see 3 nodes
100+
/// - anything less would cause a node to be unavailable which is covered in other tests
101+
/// </summary>
97102
public class ConcurrencyTestConnection : InMemoryConnection
98103
{
99104
private static Uri[] _uris = new[]
@@ -102,6 +107,14 @@ public class ConcurrencyTestConnection : InMemoryConnection
102107
new Uri("http://localhost:9201"),
103108
new Uri("http://localhost:9202"),
104109
new Uri("http://localhost:9203"),
110+
new Uri("http://localhost:9206"),
111+
};
112+
113+
private static Uri[] _uris2 = new[]
114+
{
115+
new Uri("http://localhost:9202"),
116+
new Uri("http://localhost:9201"),
117+
new Uri("http://localhost:9206"),
105118
};
106119
private readonly Random _rnd = new Random();
107120
public ConcurrencyTestConnection(IConnectionConfigurationValues settings) : base(settings)
@@ -110,12 +123,12 @@ public ConcurrencyTestConnection(IConnectionConfigurationValues settings) : base
110123

111124
public override IList<Uri> Sniff(Uri uri, int connectTimeout)
112125
{
113-
return _uris;
126+
return _rnd.Next(1, 11) % 3 == 0 ? _uris : _uris2;
114127
}
115128

116129
public override ElasticsearchResponse GetSync(Uri uri)
117130
{
118-
var statusCode = _rnd.Next(1, 11) % 7 == 0 ? 503 : 200;
131+
var statusCode = _rnd.Next(1, 9) % 3 == 0 ? 503 : 200;
119132
if (uri.Port == 9202)
120133
statusCode = 200;
121134

src/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,11 @@ public void HostsReturnedBySniffAreVisited()
205205
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 2 retry
206206
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 3
207207
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 4
208-
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null) //info 5
208+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 5
209+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 6
210+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 7
211+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 8
212+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null) //info 9
209213
);
210214
getCall.Invokes((Uri u) => seenNodes.Add(u));
211215

@@ -215,16 +219,21 @@ public void HostsReturnedBySniffAreVisited()
215219
client1.Info(); //info call 3
216220
client1.Info(); //info call 4
217221
client1.Info(); //info call 5
222+
client1.Info(); //info call 6
223+
client1.Info(); //info call 7
224+
client1.Info(); //info call 8
225+
client1.Info(); //info call 9
218226

219227
sniffCall.MustHaveHappened(Repeated.Exactly.Once);
220-
seenNodes.Should().NotBeEmpty().And.HaveCount(6);
228+
seenNodes.Should().NotBeEmpty().And.HaveCount(10);
221229
seenNodes[0].Port.Should().Be(9200);
222230
seenNodes[1].Port.Should().Be(9201);
223231
//after sniff
224-
seenNodes[2].Port.Should().Be(9204);
225-
seenNodes[3].Port.Should().Be(9203);
226-
seenNodes[4].Port.Should().Be(9202);
227-
seenNodes[5].Port.Should().Be(9201);
232+
seenNodes[2].Port.Should().Be(9202);
233+
seenNodes[3].Port.Should().Be(9204, string.Join(",", seenNodes.Select(n=>n.Port)));
234+
seenNodes[4].Port.Should().Be(9203);
235+
seenNodes[5].Port.Should().Be(9202);
236+
seenNodes[6].Port.Should().Be(9201);
228237

229238
//var nowCall = A.CallTo(() => fake.Resolve<IDateTimeProvider>().Sniff(A<Uri>._, A<int>._));
230239
}

src/Elasticsearch.Net/ConnectionPool/SniffingConnectionPool.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public override void Sniff(IConnection connection, bool fromStartupHint = false)
4040
this._uriLookup = nodes.ToDictionary(k => k, v => new EndpointState());
4141
if (fromStartupHint)
4242
this._seenStartup = true;
43-
this._current = -1;
4443

4544
}
4645
finally

src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,15 @@ public StaticConnectionPool(
3333

3434
public virtual Uri GetNext(int? initialSeed, out int seed)
3535
{
36-
//if _current has been reset make sure we ignore any seed
37-
//that was given out before the reset
38-
if (_current == -1 && initialSeed.HasValue)
39-
initialSeed = 0;
40-
//always increment the initialSeed so we advance further
41-
else if (initialSeed.HasValue)
36+
var count = _nodeUris.Count;
37+
if (initialSeed.HasValue)
4238
initialSeed += 1;
4339

4440
//always increment our round robin counter
4541
int increment = Interlocked.Increment(ref _current);
4642
var initialOffset = initialSeed ?? increment;
47-
seed = initialOffset;
48-
var count = _nodeUris.Count;
4943
int i = initialOffset % count, attempts = 0;
44+
seed = i;
5045
Uri uri = null;
5146
do
5247
{
@@ -94,7 +89,6 @@ public virtual void MarkAlive(Uri uri)
9489

9590
public virtual void Sniff(IConnection connection, bool fromStartupHint = false)
9691
{
97-
this._current = -1;
9892
//NOOP on static connection class
9993
}
10094
}

0 commit comments

Comments
 (0)