Skip to content

Commit 3a98c49

Browse files
committed
Release SemaphoreSlim after blocking
Fix for issue where multiple threads could timeout waiting for the semaphore: 1. Thread A blocks, sniffs, and releases. 2. Thread B blocks and returns. 3. Thread C is blocked forever.
1 parent 4be57c3 commit 3a98c49

File tree

8 files changed

+171
-46
lines changed

8 files changed

+171
-46
lines changed

src/Elasticsearch.Net/Configuration/ConnectionConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public ConnectionConfiguration(IConnectionPool connectionPool, IConnection conne
6464
public abstract class ConnectionConfiguration<T> : IConnectionConfigurationValues, IHideObjectMembers
6565
where T : ConnectionConfiguration<T>
6666
{
67-
private SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
67+
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(1, 1);
6868
SemaphoreSlim IConnectionConfigurationValues.BootstrapLock => this._semaphore;
6969

7070
private TimeSpan _requestTimeout;

src/Elasticsearch.Net/Transport/Pipeline/RequestPipeline.cs

Lines changed: 54 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -122,45 +122,63 @@ public void MarkDead(Node node)
122122

123123
public void FirstPoolUsage(SemaphoreSlim semaphore)
124124
{
125-
if (!this.FirstPoolUsageNeedsSniffing) return;
126-
if (!semaphore.Wait(this._settings.RequestTimeout))
127-
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, (Exception)null);
128-
if (!this.FirstPoolUsageNeedsSniffing) return;
129-
try
130-
{
131-
using (this.Audit(SniffOnStartup))
132-
{
133-
this.Sniff();
134-
this._connectionPool.SniffedOnStartup = true;
135-
}
136-
}
137-
finally
138-
{
139-
semaphore.Release();
140-
}
141-
}
125+
if (!this.FirstPoolUsageNeedsSniffing) return;
126+
if (!semaphore.Wait(this._settings.RequestTimeout))
127+
{
128+
if (this.FirstPoolUsageNeedsSniffing)
129+
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
130+
return;
131+
}
132+
133+
if (!this.FirstPoolUsageNeedsSniffing)
134+
{
135+
semaphore.Release();
136+
return;
137+
}
138+
139+
try
140+
{
141+
using (this.Audit(SniffOnStartup))
142+
{
143+
this.Sniff();
144+
this._connectionPool.SniffedOnStartup = true;
145+
}
146+
}
147+
finally
148+
{
149+
semaphore.Release();
150+
}
151+
}
142152

143153
public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore)
144154
{
145-
if (!this.FirstPoolUsageNeedsSniffing) return;
146-
var success = await semaphore.WaitAsync(this._settings.RequestTimeout, this._cancellationToken).ConfigureAwait(false);
147-
if (!success)
148-
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, (Exception)null);
149-
150-
if (!this.FirstPoolUsageNeedsSniffing) return;
151-
try
152-
{
153-
using (this.Audit(SniffOnStartup))
154-
{
155-
await this.SniffAsync().ConfigureAwait(false);
156-
this._connectionPool.SniffedOnStartup = true;
157-
}
158-
}
159-
finally
160-
{
161-
semaphore.Release();
162-
}
163-
}
155+
if (!this.FirstPoolUsageNeedsSniffing) return;
156+
var success = await semaphore.WaitAsync(this._settings.RequestTimeout, _cancellationToken).ConfigureAwait(false);
157+
if (!success)
158+
{
159+
if (this.FirstPoolUsageNeedsSniffing)
160+
throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null);
161+
return;
162+
}
163+
164+
if (!this.FirstPoolUsageNeedsSniffing)
165+
{
166+
semaphore.Release();
167+
return;
168+
}
169+
try
170+
{
171+
using (this.Audit(SniffOnStartup))
172+
{
173+
await this.SniffAsync().ConfigureAwait(false);
174+
this._connectionPool.SniffedOnStartup = true;
175+
}
176+
}
177+
finally
178+
{
179+
semaphore.Release();
180+
}
181+
}
164182

165183
public void SniffOnStaleCluster()
166184
{

src/Tests/ClientConcepts/ConnectionPooling/BuildingBlocks/RequestPipelines.doc.cs

Lines changed: 79 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Linq;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
47
using Elasticsearch.Net;
58
using FluentAssertions;
69
using Nest;
10+
using Newtonsoft.Json;
11+
using Newtonsoft.Json.Linq;
712
using Tests.Framework;
13+
using Tests.Framework.VirtualClustering;
14+
using Xunit;
815

916
namespace Tests.ClientConcepts.ConnectionPooling.BuildingBlocks
1017
{
@@ -53,11 +60,15 @@ public void RequestPipeline()
5360
new MemoryStreamFactory());
5461
}
5562

63+
//hide
5664
private IRequestPipeline CreatePipeline(
57-
Func<IEnumerable<Uri>, IConnectionPool> setupPool, Func<ConnectionSettings, ConnectionSettings> settingsSelector = null, IDateTimeProvider dateTimeProvider = null)
65+
Func<IEnumerable<Uri>, IConnectionPool> setupPool,
66+
Func<ConnectionSettings, ConnectionSettings> settingsSelector = null,
67+
IDateTimeProvider dateTimeProvider = null,
68+
InMemoryConnection connection = null)
5869
{
5970
var pool = setupPool(new[] { TestClient.CreateUri(), TestClient.CreateUri(9201) });
60-
var settings = new ConnectionSettings(pool, TestClient.CreateConnection());
71+
var settings = new ConnectionSettings(pool, connection ?? new InMemoryConnection());
6172
settings = settingsSelector?.Invoke(settings) ?? settings;
6273
return new FixedPipelineFactory(settings, dateTimeProvider ?? DateTimeProvider.Default).Pipeline;
6374
}
@@ -84,10 +95,72 @@ public void FirstUsageCheck()
8495
*/
8596
sniffingPipeline = CreatePipeline(uris => new SniffingConnectionPool(uris), s => s.SniffOnStartup(false)); //<1> Disable sniffing on startup
8697
sniffingPipeline.FirstPoolUsageNeedsSniffing.Should().BeFalse();
87-
}
88-
89-
/**==== Sniffing on Connection Failure */
90-
[U]
98+
}
99+
100+
/**==== Wait for first Sniff
101+
*
102+
* All threads wait for the sniff on startup to finish, waiting the request timeout period. A
103+
* https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim(v=vs.110).aspx[`SemaphoreSlim`]
104+
* is used to block threads until the sniff finishes and waiting threads release the `SemaphoreSlim` appropriately.
105+
*/
106+
[U]
107+
public void FirstUsageCheckConcurrentThreads()
108+
{
109+
var response = new
110+
{
111+
cluster_name = "elasticsearch",
112+
nodes = new
113+
{
114+
node1 = new
115+
{
116+
name = "Node Name 1",
117+
transport_address = "127.0.0.1:9300",
118+
host = "127.0.0.1",
119+
ip = "127.0.01",
120+
version = "5.0.0-alpha3",
121+
build = "e455fd0",
122+
http_address = "127.0.0.1:9200",
123+
settings = new JObject
124+
{
125+
{"client.type", "node"},
126+
{"cluster.name", "elasticsearch"},
127+
{"config.ignore_system_properties", "true"},
128+
{"name", "Node Name 1"},
129+
{"path.home", "c:\\elasticsearch\\elasticsearch"},
130+
{"path.logs", "c:/ elasticsearch/logs"}
131+
}
132+
}
133+
}
134+
};
135+
136+
var responseBody = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(response));
137+
138+
var inMemoryConnection = new WaitingInMemoryConnection(
139+
TimeSpan.FromSeconds(1),
140+
responseBody);
141+
142+
var sniffingPipeline = CreatePipeline(
143+
uris => new SniffingConnectionPool(uris),
144+
connection: inMemoryConnection,
145+
settingsSelector: s => s.RequestTimeout(TimeSpan.FromSeconds(2)));
146+
147+
var semaphoreSlim = new SemaphoreSlim(1, 1);
148+
149+
/**
150+
* start three tasks that will initiate a sniff on startup. The first task will successfully
151+
* sniff on startup with the remaining two waiting tasks exiting without exception and releasing
152+
* the `SemaphoreSlim`.
153+
*/
154+
var task1 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
155+
var task2 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
156+
var task3 = Task.Run(() => sniffingPipeline.FirstPoolUsage(semaphoreSlim));
157+
158+
var exception = Record.Exception(() => Task.WaitAll(task1, task2, task3));
159+
exception.Should().BeNull();
160+
}
161+
162+
/**==== Sniffing on Connection Failure */
163+
[U]
91164
public void SniffsOnConnectionFailure()
92165
{
93166
var singleNodePipeline = CreatePipeline(uris => new SingleNodeConnectionPool(uris.First()));

src/Tests/ClientConcepts/ConnectionPooling/RoundRobin/VolatileUpdates.doc.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class VolatileUpdates
1212
{
1313
/**== Volatile Updates */
1414
protected int NumberOfNodes = 10;
15-
private Random Random = new Random();
15+
private readonly Random Random = new Random();
1616

1717
private List<Node> Update = Enumerable.Range(9200, 10).Select(p => new Uri("http://localhost:" + p)).Select(u => new Node(u)).ToList();
1818

src/Tests/ClientConcepts/HighLevel/CovariantHits/CovariantSearchResults.doc.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ [U] public void UsingConcreteTypeSelector()
100100
);
101101

102102
/**
103-
* here for each hit we'll call the delegate passed to `ConcreteTypeSelector where
103+
* here for each hit we'll call the delegate passed to `ConcreteTypeSelector` where
104104
* - `d` is a representation of the `_source` exposed as a `dynamic` type
105105
* - a typed `h` which represents the encapsulating hit of the source i.e. `Hit<dynamic>`
106106
*/
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Elasticsearch.Net;
5+
6+
namespace Tests.Framework.VirtualClustering
7+
{
8+
public class WaitingInMemoryConnection : InMemoryConnection
9+
{
10+
private readonly TimeSpan _waitTime;
11+
12+
public WaitingInMemoryConnection(TimeSpan waitTime, byte[] responseBody, int statusCode = 200, Exception exception = null)
13+
: base(responseBody, statusCode, exception)
14+
{
15+
this._waitTime = waitTime;
16+
}
17+
18+
public override ElasticsearchResponse<TReturn> Request<TReturn>(RequestData requestData)
19+
{
20+
Thread.Sleep(_waitTime);
21+
return base.Request<TReturn>(requestData);
22+
}
23+
24+
public override async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData)
25+
{
26+
await Task.Delay(_waitTime);
27+
return await base.RequestAsync<TReturn>(requestData);
28+
}
29+
}
30+
}

src/Tests/Framework/Xunit/TestAssemblyRunner.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ orderby g.Count() descending
6868
{
6969
var type = group.Key?.GetType();
7070
var clusterName = type?.Name.Replace("Cluster", "") ?? "UNKNOWN";
71-
var dop = group.Key.MaxConcurrency == 0 ? defaultMaxConcurrency : group.Key.MaxConcurrency;
71+
var dop = group.Key != null && group.Key.MaxConcurrency > 0
72+
? group.Key.MaxConcurrency
73+
: defaultMaxConcurrency;
74+
7275
//if (type != typeof(ReadOnlyCluster)) continue;
7376

7477
clusterTotals.Add(clusterName, Stopwatch.StartNew());

src/Tests/Tests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@
573573
<Compile Include="Framework\Integration\Process\ElasticsearchMessage.cs" />
574574
<Compile Include="Framework\Integration\Process\ElasticsearchNodeInfo.cs" />
575575
<Compile Include="Framework\Integration\Process\ElasticsearchVersionInfo.cs" />
576+
<Compile Include="Framework\VirtualClustering\WaitingInMemoryConnection.cs" />
576577
<Compile Include="Framework\Xunit\ForEachAsyncExtensions.cs" />
577578
<Compile Include="Framework\Xunit\IClusterFixture.cs" />
578579
<Compile Include="Framework\Xunit\TestAssemblyRunner.cs" />

0 commit comments

Comments
 (0)