Skip to content

Commit 4f8af8b

Browse files
ghelyardeleteLater
andauthored
fix: always use ConnectTimeout when connecting the WebSocket (#27)
* fix: always use ConnectTimeout when connecting the WebSocket * wrap exception in TimeoutException * nit changes --------- Co-authored-by: deleteLater <mikcczhang@gmail.com>
1 parent 0cfb263 commit 4f8af8b

5 files changed

Lines changed: 49 additions & 32 deletions

File tree

src/FeatBit.ServerSdk/DataSynchronizer/WebSocketDataSynchronizer.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Net.WebSockets;
44
using System.Text;
55
using System.Text.Json;
6-
using System.Threading;
76
using System.Threading.Tasks;
87
using FeatBit.Sdk.Server.Concurrent;
98
using FeatBit.Sdk.Server.Options;
@@ -67,11 +66,7 @@ private static FbWebSocket DefaultFbWebSocketFactory(FbOptions options)
6766

6867
public Task<bool> StartAsync()
6968
{
70-
Task.Run(() =>
71-
{
72-
var cts = new CancellationTokenSource(_options.ConnectTimeout);
73-
return _webSocket.ConnectAsync(cts.Token);
74-
});
69+
Task.Run(() => _webSocket.ConnectAsync());
7570

7671
return _initTcs.Task;
7772
}

src/FeatBit.ServerSdk/Transport/FbWebSocket.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ internal sealed partial class FbWebSocket
2424
new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("{\"messageType\":\"ping\",\"data\":{}}"));
2525

2626
private readonly FbOptions _options;
27-
private readonly Func<WebSocketTransport> _transportFactory;
27+
private readonly Func<FbOptions, WebSocketTransport> _transportFactory;
2828
private readonly Func<FbOptions, Uri> _webSocketUriResolver;
2929
private WebSocketTransport _transport;
3030
private Task _receiveTask;
@@ -38,7 +38,7 @@ internal sealed partial class FbWebSocket
3838

3939
internal FbWebSocket(
4040
FbOptions options,
41-
Func<WebSocketTransport> transportFactory = null,
41+
Func<FbOptions, WebSocketTransport> transportFactory = null,
4242
Func<FbOptions, Uri> webSocketUriResolver = null)
4343
{
4444
_options = options;
@@ -52,7 +52,7 @@ internal FbWebSocket(
5252
_logger = _loggerFactory.CreateLogger<FbWebSocket>();
5353
}
5454

55-
public async Task ConnectAsync(CancellationToken cancellationToken = default, bool isReconnecting = false)
55+
public async Task ConnectAsync(bool isReconnecting = false, CancellationToken cancellationToken = default)
5656
{
5757
Log.Starting(_logger);
5858

@@ -64,7 +64,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo
6464
}
6565

6666
var transportFactory = _transportFactory ?? DefaultWebSocketTransportFactory;
67-
var transport = transportFactory();
67+
var transport = transportFactory(_options);
6868
if (transport == null)
6969
{
7070
throw new InvalidOperationException("Configured WebSocketTransportFactory did not return a value.");
@@ -75,7 +75,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo
7575
{
7676
// starts the transport
7777
Log.StartingTransport(_logger, "WebSockets", webSocketUri);
78-
await transport.StartAsync(webSocketUri, _options.CloseTimeout, cancellationToken);
78+
await transport.StartAsync(webSocketUri, cancellationToken);
7979
}
8080
catch (Exception ex)
8181
{
@@ -108,9 +108,9 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo
108108
Log.Started(_logger);
109109
}
110110

111-
private WebSocketTransport DefaultWebSocketTransportFactory()
111+
private WebSocketTransport DefaultWebSocketTransportFactory(FbOptions options)
112112
{
113-
return new WebSocketTransport(_loggerFactory);
113+
return new WebSocketTransport(options, _loggerFactory);
114114
}
115115

116116
private static Uri DefaultWebSocketUriResolver(FbOptions options)
@@ -254,7 +254,7 @@ private async Task ReconnectAsync()
254254

255255
try
256256
{
257-
await ConnectAsync(_stopCts.Token, isReconnecting: true).ConfigureAwait(false);
257+
await ConnectAsync(isReconnecting: true, _stopCts.Token).ConfigureAwait(false);
258258

259259
Log.Reconnected(_logger, retryTimes, DateTime.UtcNow - reconnectStartTime);
260260

src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Threading;
77
using System.Threading.Tasks;
88
using FeatBit.Sdk.Server.Http;
9+
using FeatBit.Sdk.Server.Options;
910
using Microsoft.Extensions.Logging;
1011
using Microsoft.Extensions.Logging.Abstractions;
1112

@@ -21,20 +22,18 @@ internal sealed partial class WebSocketTransport : IDuplexPipe
2122
public WebSocketCloseStatus? CloseStatus => _webSocket?.CloseStatus;
2223
public string CloseDescription => _webSocket?.CloseStatusDescription;
2324

25+
private readonly FbOptions _options;
2426
private readonly Func<Uri, CancellationToken, Task<WebSocket>> _webSocketFactory;
2527
private WebSocket _webSocket;
2628

2729
private IDuplexPipe _transport;
2830
private IDuplexPipe _application;
2931

30-
private TimeSpan _closeTimeout;
3132
private readonly CancellationTokenSource _stopCts = new CancellationTokenSource();
3233
private volatile bool _aborted;
3334
private Task Running { get; set; } = Task.CompletedTask;
3435
private readonly ILogger<WebSocketTransport> _logger;
3536

36-
private static readonly TimeSpan DefaultCloseTimeout = TimeSpan.FromSeconds(5);
37-
3837
// 1MB
3938
private const long DefaultBufferSize = 1024 * 1024;
4039

@@ -49,20 +48,17 @@ internal sealed partial class WebSocketTransport : IDuplexPipe
4948
);
5049

5150
public WebSocketTransport(
51+
FbOptions options,
5252
ILoggerFactory loggerFactory = null,
5353
Func<Uri, CancellationToken, Task<WebSocket>> webSocketFactory = null)
5454
{
55+
_options = options;
5556
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketTransport>();
5657
_webSocketFactory = webSocketFactory;
5758
}
5859

59-
public async Task StartAsync(
60-
Uri uri,
61-
TimeSpan? closeTimeout = null,
62-
CancellationToken cancellationToken = default)
60+
public async Task StartAsync(Uri uri, CancellationToken cancellationToken = default)
6361
{
64-
_closeTimeout = closeTimeout ?? DefaultCloseTimeout;
65-
6662
// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
6763
var pair = DuplexPipe.CreateConnectionPair(DefaultPipeOptions, DefaultPipeOptions);
6864

@@ -88,7 +84,7 @@ public async Task StartAsync(
8884
Log.StartedTransport(_logger);
8985
}
9086

91-
private static async Task<WebSocket> DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken)
87+
private async Task<WebSocket> DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken)
9288
{
9389
var webSocket = new ClientWebSocket();
9490

@@ -104,11 +100,20 @@ private static async Task<WebSocket> DefaultWebSocketFactory(Uri uri, Cancellati
104100

105101
try
106102
{
107-
await webSocket.ConnectAsync(uri, cancellationToken);
103+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
104+
cts.CancelAfter(_options.ConnectTimeout);
105+
106+
await webSocket.ConnectAsync(uri, cts.Token).ConfigureAwait(false);
108107
}
109-
catch
108+
catch (Exception ex)
110109
{
111110
webSocket.Dispose();
111+
112+
if (ex is OperationCanceledException && !cancellationToken.IsCancellationRequested)
113+
{
114+
throw new TimeoutException("Connect timed out.", ex);
115+
}
116+
112117
throw;
113118
}
114119

@@ -128,7 +133,7 @@ private async Task ProcessSocketAsync(WebSocket socket)
128133
// Wait for send or receive to complete
129134
var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false);
130135

131-
_stopCts.CancelAfter(_closeTimeout);
136+
_stopCts.CancelAfter(_options.CloseTimeout);
132137

133138
if (trigger == receiving)
134139
{
@@ -139,7 +144,7 @@ private async Task ProcessSocketAsync(WebSocket socket)
139144
// Cancel the application so that ReadAsync yields
140145
_application.Input.CancelPendingRead();
141146

142-
var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token))
147+
var resultTask = await Task.WhenAny(sending, Task.Delay(_options.CloseTimeout, _stopCts.Token))
143148
.ConfigureAwait(false);
144149
if (resultTask != sending)
145150
{
@@ -359,7 +364,7 @@ public async Task StopAsync()
359364
_application.Input.CancelPendingRead();
360365

361366
// Start ungraceful close timer
362-
_stopCts.CancelAfter(_closeTimeout);
367+
_stopCts.CancelAfter(_options.CloseTimeout);
363368

364369
try
365370
{

tests/FeatBit.ServerSdk.Tests/TestApp.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,12 @@ internal Uri GetWsUri(string op)
2222
return wsUri;
2323
}
2424

25-
internal WebSocketTransport CreateWebSocketTransport()
25+
internal WebSocketTransport CreateWebSocketTransport(FbOptions options)
2626
{
2727
var client = Server.CreateWebSocketClient();
2828

2929
return new WebSocketTransport(
30+
options,
3031
webSocketFactory: (uri, cancellationToken) => client.ConnectAsync(uri, cancellationToken)
3132
);
3233
}

tests/FeatBit.ServerSdk.Tests/Transport/WebSocketsTransportTests.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
using System.Net.WebSockets;
22
using System.Text;
33

4+
using FeatBit.Sdk.Server.Options;
5+
46
namespace FeatBit.Sdk.Server.Transport;
57

68
[Collection(nameof(TestApp))]
@@ -16,7 +18,8 @@ public WebSocketsTransportTests(TestApp app)
1618
[Fact]
1719
public async Task StartAndStopAsync()
1820
{
19-
var transport = _app.CreateWebSocketTransport();
21+
var options = new FbOptionsBuilder().Build();
22+
var transport = _app.CreateWebSocketTransport(options);
2023
var uri = _app.GetWsUri("echo");
2124

2225
await transport.StartAsync(uri);
@@ -32,7 +35,8 @@ public async Task StartAndStopAsync()
3235
[Fact]
3336
public async Task SendReceiveAsync()
3437
{
35-
var transport = _app.CreateWebSocketTransport();
38+
var options = new FbOptionsBuilder().Build();
39+
var transport = _app.CreateWebSocketTransport(options);
3640
var uri = _app.GetWsUri("echo");
3741

3842
await transport.StartAsync(uri);
@@ -57,4 +61,16 @@ public async Task SendReceiveAsync()
5761

5862
await transport.StopAsync();
5963
}
64+
65+
[Fact]
66+
public async Task StartAsyncShouldThrowTimeoutExceptionWhenConnectTimesOut()
67+
{
68+
var options = new FbOptionsBuilder()
69+
.ConnectTimeout(TimeSpan.FromTicks(1))
70+
.Build();
71+
var transport = new WebSocketTransport(options);
72+
var uri = _app.GetWsUri("echo");
73+
74+
await Assert.ThrowsAsync<TimeoutException>(() => transport.StartAsync(uri));
75+
}
6076
}

0 commit comments

Comments
 (0)