Skip to content

Commit 16f7904

Browse files
committed
caught threading bug in a unit test and fixed it by exposing the initial seed for the round robin on the GetNext() call. This allows callee's to reseed themselves and will ensure all the indivual nodes are tried when max retries equals number of known hosts
1 parent 6aecc34 commit 16f7904

File tree

12 files changed

+250
-41
lines changed

12 files changed

+250
-41
lines changed

src/Connections/Elasticsearch.Net.Connection.Thrift/ThriftConnection.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ public ThriftConnection(IConnectionConfigurationValues connectionSettings)
2828
this._poolSize = Math.Max(1, connectionSettings.MaximumAsyncConnections);
2929

3030
this._resourceLock = new Semaphore(_poolSize, _poolSize);
31-
31+
int seed;
3232
for (var i = 0; i <= connectionSettings.MaximumAsyncConnections; i++)
3333
{
34-
var uri = this._connectionSettings.ConnectionPool.GetNext();
34+
var uri = this._connectionSettings.ConnectionPool.GetNext(null, out seed);
3535
var host = uri.Host;
3636
var port = uri.Port;
3737
var tsocket = new TSocket(host, port);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Net;
5+
using System.Runtime.InteropServices;
6+
using System.Text;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Autofac;
10+
using Autofac.Core.Activators.Reflection;
11+
using Autofac.Extras.FakeItEasy;
12+
using Elasticsearch.Net.Connection;
13+
using Elasticsearch.Net.ConnectionPool;
14+
using Elasticsearch.Net.Exceptions;
15+
using Elasticsearch.Net.Providers;
16+
using Elasticsearch.Net.Tests.Unit.Stubs;
17+
using FakeItEasy;
18+
using FluentAssertions;
19+
using NUnit.Framework;
20+
21+
namespace Elasticsearch.Net.Tests.Unit.Connection
22+
{
23+
[TestFixture]
24+
public class ConcurrencyTests
25+
{
26+
private static Uri[] _uris = new[]
27+
{
28+
new Uri("http://localhost:9200"),
29+
new Uri("http://localhost:9201"),
30+
new Uri("http://localhost:9202"),
31+
new Uri("http://localhost:9203"),
32+
};
33+
private static readonly int _retries = _uris.Count() - 1;
34+
private readonly StaticConnectionPool _connectionPool;
35+
private readonly ConnectionConfiguration _config;
36+
37+
public ConcurrencyTests()
38+
{
39+
_connectionPool = new SniffingConnectionPool(_uris);
40+
_config = new ConnectionConfiguration(_connectionPool)
41+
.SnifsOnConnectionFault()
42+
.SniffOnStartup();
43+
}
44+
45+
private void ProvideTransport(AutoFake fake)
46+
{
47+
var param = new TypedParameter(typeof(IDateTimeProvider), null);
48+
fake.Provide<ITransport, Transport>(param);
49+
}
50+
[Test]
51+
public void CallInfo40000TimesOnMultipleThreads()
52+
{
53+
using (var fake = new AutoFake(callsDoNothing: true))
54+
{
55+
//set up connection configuration that holds a connection pool
56+
//with '_uris' (see the constructor)
57+
fake.Provide<IConnectionConfigurationValues>(_config);
58+
//prove a real HttpTransport with its unspecified dependencies
59+
//as fakes
60+
61+
//set up fake for a call on IConnection.GetSync so that it always throws
62+
//an exception
63+
var connection = fake.Provide<IConnection>(new ConcurrencyTestConnection(this._config));
64+
this.ProvideTransport(fake);
65+
//create a real ElasticsearchClient with it unspecified dependencies
66+
//as fakes
67+
var client = fake.Resolve<ElasticsearchClient>();
68+
int seen = 0;
69+
Assert.DoesNotThrow(()=>
70+
{
71+
Action a = () =>
72+
{
73+
for(var i=0;i<10000;i++)
74+
{
75+
client.Info();
76+
Interlocked.Increment(ref seen);
77+
}
78+
};
79+
var thread1 = new Thread(()=>a());
80+
var thread2 = new Thread(()=>a());
81+
var thread3 = new Thread(()=>a());
82+
var thread4 = new Thread(()=>a());
83+
thread1.Start();
84+
thread2.Start();
85+
thread3.Start();
86+
thread4.Start();
87+
thread1.Join();
88+
thread2.Join();
89+
thread3.Join();
90+
thread4.Join();
91+
92+
});
93+
seen.Should().Be(40000);
94+
}
95+
}
96+
97+
public class ConcurrencyTestConnection : InMemoryConnection
98+
{
99+
private static Uri[] _uris = new[]
100+
{
101+
new Uri("http://localhost:9200"),
102+
new Uri("http://localhost:9201"),
103+
new Uri("http://localhost:9202"),
104+
new Uri("http://localhost:9203"),
105+
};
106+
private readonly Random _rnd = new Random();
107+
public ConcurrencyTestConnection(IConnectionConfigurationValues settings) : base(settings)
108+
{
109+
}
110+
111+
public override IList<Uri> Sniff(Uri uri, int connectTimeout)
112+
{
113+
return _uris;
114+
}
115+
116+
public override ElasticsearchResponse GetSync(Uri uri)
117+
{
118+
var statusCode = _rnd.Next(1, 11) % 7 == 0 ? 503 : 200;
119+
if (uri.Port == 9202)
120+
statusCode = 200;
121+
122+
return ElasticsearchResponse.Create(this._ConnectionSettings, statusCode, "GET", "/", null, null);
123+
124+
}
125+
}
126+
127+
}
128+
}

src/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,66 @@ public void SniffOnConnectionFaultCausesSniffOn503()
166166
sniffCall.MustHaveHappened(Repeated.Exactly.Once);
167167
nowCall.MustHaveHappened(Repeated.Exactly.Times(7));
168168

169+
//var nowCall = A.CallTo(() => fake.Resolve<IDateTimeProvider>().Sniff(A<Uri>._, A<int>._));
170+
}
171+
}
172+
[Test]
173+
public void HostsReturnedBySniffAreVisited()
174+
{
175+
using (var fake = new AutoFake())
176+
{
177+
var dateTimeProvider = fake.Resolve<IDateTimeProvider>();
178+
var nowCall = A.CallTo(()=>dateTimeProvider.Now());
179+
nowCall.Returns(DateTime.UtcNow);
180+
181+
var connectionPool = new SniffingConnectionPool(new[]
182+
{
183+
new Uri("http://localhost:9200"),
184+
new Uri("http://localhost:9201")
185+
}, randomizeOnStartup: false);
186+
var config = new ConnectionConfiguration(connectionPool)
187+
.SnifsOnConnectionFault();
188+
fake.Provide<IConnectionConfigurationValues>(config);
189+
fake.Provide<ITransport>(fake.Resolve<Transport>());
190+
var connection = fake.Resolve<IConnection>();
191+
var sniffCall = A.CallTo(() => connection.Sniff(A<Uri>._, A<int>._));
192+
sniffCall.Returns(new List<Uri>()
193+
{
194+
new Uri("http://localhost:9204"),
195+
new Uri("http://localhost:9203"),
196+
new Uri("http://localhost:9202"),
197+
new Uri("http://localhost:9201")
198+
});
199+
200+
var seenNodes = new List<Uri>();
201+
var getCall = A.CallTo(() => connection.GetSync(A<Uri>._));
202+
getCall.ReturnsNextFromSequence(
203+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 1
204+
ElasticsearchResponse.Create(config, 503, "GET", "/", null, null), //info 2
205+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 2 retry
206+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 3
207+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null), //info 4
208+
ElasticsearchResponse.Create(config, 200, "GET", "/", null, null) //info 5
209+
);
210+
getCall.Invokes((Uri u) => seenNodes.Add(u));
211+
212+
var client1 = fake.Resolve<ElasticsearchClient>();
213+
client1.Info(); //info call 1
214+
client1.Info(); //info call 2
215+
client1.Info(); //info call 3
216+
client1.Info(); //info call 4
217+
client1.Info(); //info call 5
218+
219+
sniffCall.MustHaveHappened(Repeated.Exactly.Once);
220+
seenNodes.Should().NotBeEmpty().And.HaveCount(6);
221+
seenNodes[0].Port.Should().Be(9200);
222+
seenNodes[1].Port.Should().Be(9201);
223+
//after sniff
224+
seenNodes[2].Port.Should().Be(9204);
225+
seenNodes[3].Port.Should().Be(9203);
226+
seenNodes[4].Port.Should().Be(9202);
227+
seenNodes[5].Port.Should().Be(9201);
228+
169229
//var nowCall = A.CallTo(() => fake.Resolve<IDateTimeProvider>().Sniff(A<Uri>._, A<int>._));
170230
}
171231
}

src/Elasticsearch.Net.Tests.Unit/Connection/StaticConnectionPoolRetryTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
using FluentAssertions;
1818
using NUnit.Framework;
1919

20-
namespace Elasticsearch.Net.Tests.Unit.Connection
20+
namespace Elasticsearch.Net.Tests.Unit.ConnectionA
2121
{
2222
[TestFixture]
2323
public class StaticConnectionPoolRetryTests

src/Elasticsearch.Net.Tests.Unit/Elasticsearch.Net.Tests.Unit.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
</ItemGroup>
6262
<ItemGroup>
6363
<Compile Include="Connection\SniffingConnectionPoolTests.cs" />
64+
<Compile Include="Connection\ConcurrencyTests.cs" />
6465
<Compile Include="Connection\StaticConnectionPoolRetryTests.cs" />
6566
<Compile Include="Connection\RetryTests.cs" />
6667
<Compile Include="Properties\AssemblyInfo.cs" />

src/Elasticsearch.Net/Connection/HttpConnection.cs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,35 +39,35 @@ public HttpConnection(IConnectionConfigurationValues settings)
3939
this._enableTrace = settings.TraceEnabled;
4040
}
4141

42-
public ElasticsearchResponse GetSync(Uri uri)
42+
public virtual ElasticsearchResponse GetSync(Uri uri)
4343
{
4444
return this.HeaderOnlyRequest(uri, "GET");
4545
}
46-
public ElasticsearchResponse HeadSync(Uri uri)
46+
public virtual ElasticsearchResponse HeadSync(Uri uri)
4747
{
4848
return this.HeaderOnlyRequest(uri, "HEAD");
4949
}
5050

51-
public ElasticsearchResponse PostSync(Uri uri, byte[] data)
51+
public virtual ElasticsearchResponse PostSync(Uri uri, byte[] data)
5252
{
5353
return this.BodyRequest(uri, data, "POST");
5454
}
55-
public ElasticsearchResponse PutSync(Uri uri, byte[] data)
55+
public virtual ElasticsearchResponse PutSync(Uri uri, byte[] data)
5656
{
5757
return this.BodyRequest(uri, data, "PUT");
5858
}
59-
public ElasticsearchResponse DeleteSync(Uri uri)
59+
public virtual ElasticsearchResponse DeleteSync(Uri uri)
6060
{
6161
var connection = this.CreateHttpWebRequest(uri, "DELETE");
6262
return this.DoSynchronousRequest(connection);
6363
}
64-
public ElasticsearchResponse DeleteSync(Uri uri, byte[] data)
64+
public virtual ElasticsearchResponse DeleteSync(Uri uri, byte[] data)
6565
{
6666
var connection = this.CreateHttpWebRequest(uri, "DELETE");
6767
return this.DoSynchronousRequest(connection, data);
6868
}
6969

70-
public bool Ping(Uri uri, int connectTimeout)
70+
public virtual bool Ping(Uri uri, int connectTimeout)
7171
{
7272
var request = this.CreateHttpWebRequest(uri, "HEAD");
7373
request.Timeout = connectTimeout;
@@ -78,7 +78,7 @@ public bool Ping(Uri uri, int connectTimeout)
7878
}
7979
}
8080

81-
public IList<Uri> Sniff(Uri uri, int connectTimeout)
81+
public virtual IList<Uri> Sniff(Uri uri, int connectTimeout)
8282
{
8383
uri = new Uri(uri, "_nodes/_all/clear?timeout=" + connectTimeout);
8484
var request = this.CreateHttpWebRequest(uri, "GET");
@@ -93,34 +93,34 @@ public IList<Uri> Sniff(Uri uri, int connectTimeout)
9393
}
9494
}
9595

96-
public Task<ElasticsearchResponse> Get(Uri uri)
96+
public virtual Task<ElasticsearchResponse> Get(Uri uri)
9797
{
9898
var r = this.CreateHttpWebRequest(uri, "GET");
9999
return this.DoAsyncRequest(r);
100100
}
101-
public Task<ElasticsearchResponse> Head(Uri uri)
101+
public virtual Task<ElasticsearchResponse> Head(Uri uri)
102102
{
103103
var r = this.CreateHttpWebRequest(uri, "HEAD");
104104
return this.DoAsyncRequest(r);
105105
}
106-
public Task<ElasticsearchResponse> Post(Uri uri, byte[] data)
106+
public virtual Task<ElasticsearchResponse> Post(Uri uri, byte[] data)
107107
{
108108
var r = this.CreateHttpWebRequest(uri, "POST");
109109
return this.DoAsyncRequest(r, data);
110110
}
111111

112-
public Task<ElasticsearchResponse> Put(Uri uri, byte[] data)
112+
public virtual Task<ElasticsearchResponse> Put(Uri uri, byte[] data)
113113
{
114114
var r = this.CreateHttpWebRequest(uri, "PUT");
115115
return this.DoAsyncRequest(r, data);
116116
}
117117

118-
public Task<ElasticsearchResponse> Delete(Uri uri, byte[] data)
118+
public virtual Task<ElasticsearchResponse> Delete(Uri uri, byte[] data)
119119
{
120120
var r = this.CreateHttpWebRequest(uri, "DELETE");
121121
return this.DoAsyncRequest(r, data);
122122
}
123-
public Task<ElasticsearchResponse> Delete(Uri uri)
123+
public virtual Task<ElasticsearchResponse> Delete(Uri uri)
124124
{
125125
var r = this.CreateHttpWebRequest(uri, "DELETE");
126126
return this.DoAsyncRequest(r);
@@ -371,17 +371,20 @@ public void Iterate(HttpWebRequest request, byte[] data, IEnumerable<Task> async
371371

372372
private Uri _CreateUriString(string path)
373373
{
374-
var s = this._ConnectionSettings;
375-
var uri = s.ConnectionPool.GetNext();
374+
//TODO reapply this
375+
return null;
376376

377-
if (s.QueryStringParameters != null)
378-
{
379-
var tempUri = new Uri(uri, path);
380-
var qs = s.QueryStringParameters.ToQueryString(tempUri.Query.IsNullOrEmpty() ? "?" : "&");
381-
path += qs;
382-
}
383-
uri = path.IsNullOrEmpty() ? uri : new Uri(uri, path);
384-
return uri.Purify();
377+
//var s = this._ConnectionSettings;
378+
////var uri = s.ConnectionPool.GetNext();
379+
380+
//if (s.QueryStringParameters != null)
381+
//{
382+
// var tempUri = new Uri(uri, path);
383+
// var qs = s.QueryStringParameters.ToQueryString(tempUri.Query.IsNullOrEmpty() ? "?" : "&");
384+
// path += qs;
385+
//}
386+
//uri = path.IsNullOrEmpty() ? uri : new Uri(uri, path);
387+
//return uri.Purify();
385388
}
386389

387390
}

src/Elasticsearch.Net/Connection/ITransport.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace Elasticsearch.Net.Connection
55
{
66
public interface ITransport
77
{
8-
ElasticsearchResponse DoRequest(string method, string path, object data = null, NameValueCollection queryString = null, int retried = 0);
8+
ElasticsearchResponse DoRequest(string method, string path, object data = null, NameValueCollection queryString = null, int retried = 0, int? seed = null);
99

1010
Task<ElasticsearchResponse> DoRequestAsync(
1111
string method,

0 commit comments

Comments
 (0)