Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using FeatBit.Sdk.Server.Concurrent;
using FeatBit.Sdk.Server.Options;
Expand Down Expand Up @@ -67,11 +66,7 @@ private static FbWebSocket DefaultFbWebSocketFactory(FbOptions options)

public Task<bool> StartAsync()
{
Task.Run(() =>
{
var cts = new CancellationTokenSource(_options.ConnectTimeout);
return _webSocket.ConnectAsync(cts.Token);
});
Task.Run(() => _webSocket.ConnectAsync());
Comment thread
deleteLater marked this conversation as resolved.

return _initTcs.Task;
}
Expand Down
16 changes: 8 additions & 8 deletions src/FeatBit.ServerSdk/Transport/FbWebSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ internal sealed partial class FbWebSocket
new ReadOnlyMemory<byte>(Encoding.UTF8.GetBytes("{\"messageType\":\"ping\",\"data\":{}}"));

private readonly FbOptions _options;
private readonly Func<WebSocketTransport> _transportFactory;
private readonly Func<FbOptions, WebSocketTransport> _transportFactory;
private readonly Func<FbOptions, Uri> _webSocketUriResolver;
private WebSocketTransport _transport;
private Task _receiveTask;
Expand All @@ -38,7 +38,7 @@ internal sealed partial class FbWebSocket

internal FbWebSocket(
FbOptions options,
Func<WebSocketTransport> transportFactory = null,
Func<FbOptions, WebSocketTransport> transportFactory = null,
Func<FbOptions, Uri> webSocketUriResolver = null)
{
_options = options;
Expand All @@ -52,7 +52,7 @@ internal FbWebSocket(
_logger = _loggerFactory.CreateLogger<FbWebSocket>();
}

public async Task ConnectAsync(CancellationToken cancellationToken = default, bool isReconnecting = false)
public async Task ConnectAsync(bool isReconnecting = false, CancellationToken cancellationToken = default)
{
Log.Starting(_logger);

Expand All @@ -64,7 +64,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo
}

var transportFactory = _transportFactory ?? DefaultWebSocketTransportFactory;
var transport = transportFactory();
var transport = transportFactory(_options);
if (transport == null)
{
throw new InvalidOperationException("Configured WebSocketTransportFactory did not return a value.");
Expand All @@ -75,7 +75,7 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo
{
// starts the transport
Log.StartingTransport(_logger, "WebSockets", webSocketUri);
await transport.StartAsync(webSocketUri, _options.CloseTimeout, cancellationToken);
await transport.StartAsync(webSocketUri, cancellationToken);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -108,9 +108,9 @@ public async Task ConnectAsync(CancellationToken cancellationToken = default, bo
Log.Started(_logger);
}

private WebSocketTransport DefaultWebSocketTransportFactory()
private WebSocketTransport DefaultWebSocketTransportFactory(FbOptions options)
{
return new WebSocketTransport(_loggerFactory);
return new WebSocketTransport(options, _loggerFactory);
}

private static Uri DefaultWebSocketUriResolver(FbOptions options)
Expand Down Expand Up @@ -254,7 +254,7 @@ private async Task ReconnectAsync()

try
{
await ConnectAsync(_stopCts.Token, isReconnecting: true).ConfigureAwait(false);
await ConnectAsync(isReconnecting: true, _stopCts.Token).ConfigureAwait(false);

Log.Reconnected(_logger, retryTimes, DateTime.UtcNow - reconnectStartTime);

Expand Down
35 changes: 20 additions & 15 deletions src/FeatBit.ServerSdk/Transport/WebSocketTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading;
using System.Threading.Tasks;
using FeatBit.Sdk.Server.Http;
using FeatBit.Sdk.Server.Options;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;

Expand All @@ -21,20 +22,18 @@ internal sealed partial class WebSocketTransport : IDuplexPipe
public WebSocketCloseStatus? CloseStatus => _webSocket?.CloseStatus;
public string CloseDescription => _webSocket?.CloseStatusDescription;

private readonly FbOptions _options;
private readonly Func<Uri, CancellationToken, Task<WebSocket>> _webSocketFactory;
private WebSocket _webSocket;

private IDuplexPipe _transport;
private IDuplexPipe _application;

private TimeSpan _closeTimeout;
private readonly CancellationTokenSource _stopCts = new CancellationTokenSource();
private volatile bool _aborted;
private Task Running { get; set; } = Task.CompletedTask;
private readonly ILogger<WebSocketTransport> _logger;

private static readonly TimeSpan DefaultCloseTimeout = TimeSpan.FromSeconds(5);

// 1MB
private const long DefaultBufferSize = 1024 * 1024;

Expand All @@ -49,20 +48,17 @@ internal sealed partial class WebSocketTransport : IDuplexPipe
);

public WebSocketTransport(
FbOptions options,
ILoggerFactory loggerFactory = null,
Func<Uri, CancellationToken, Task<WebSocket>> webSocketFactory = null)
{
_options = options;
_logger = (loggerFactory ?? NullLoggerFactory.Instance).CreateLogger<WebSocketTransport>();
_webSocketFactory = webSocketFactory;
}

public async Task StartAsync(
Uri uri,
TimeSpan? closeTimeout = null,
CancellationToken cancellationToken = default)
public async Task StartAsync(Uri uri, CancellationToken cancellationToken = default)
{
_closeTimeout = closeTimeout ?? DefaultCloseTimeout;

// Create the pipe pair (Application's writer is connected to Transport's reader, and vice versa)
var pair = DuplexPipe.CreateConnectionPair(DefaultPipeOptions, DefaultPipeOptions);

Expand All @@ -88,7 +84,7 @@ public async Task StartAsync(
Log.StartedTransport(_logger);
}

private static async Task<WebSocket> DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken)
private async Task<WebSocket> DefaultWebSocketFactory(Uri uri, CancellationToken cancellationToken)
{
var webSocket = new ClientWebSocket();

Expand All @@ -104,11 +100,20 @@ private static async Task<WebSocket> DefaultWebSocketFactory(Uri uri, Cancellati

try
{
await webSocket.ConnectAsync(uri, cancellationToken);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(_options.ConnectTimeout);

await webSocket.ConnectAsync(uri, cts.Token).ConfigureAwait(false);
}
catch
catch (Exception ex)
{
webSocket.Dispose();

if (ex is OperationCanceledException && !cancellationToken.IsCancellationRequested)
{
throw new TimeoutException("Connect timed out.", ex);
}

throw;
}

Expand All @@ -128,7 +133,7 @@ private async Task ProcessSocketAsync(WebSocket socket)
// Wait for send or receive to complete
var trigger = await Task.WhenAny(receiving, sending).ConfigureAwait(false);

_stopCts.CancelAfter(_closeTimeout);
_stopCts.CancelAfter(_options.CloseTimeout);

if (trigger == receiving)
{
Expand All @@ -139,7 +144,7 @@ private async Task ProcessSocketAsync(WebSocket socket)
// Cancel the application so that ReadAsync yields
_application.Input.CancelPendingRead();

var resultTask = await Task.WhenAny(sending, Task.Delay(_closeTimeout, _stopCts.Token))
var resultTask = await Task.WhenAny(sending, Task.Delay(_options.CloseTimeout, _stopCts.Token))
.ConfigureAwait(false);
if (resultTask != sending)
{
Expand Down Expand Up @@ -359,7 +364,7 @@ public async Task StopAsync()
_application.Input.CancelPendingRead();

// Start ungraceful close timer
_stopCts.CancelAfter(_closeTimeout);
_stopCts.CancelAfter(_options.CloseTimeout);

try
{
Expand Down
3 changes: 2 additions & 1 deletion tests/FeatBit.ServerSdk.Tests/TestApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ internal Uri GetWsUri(string op)
return wsUri;
}

internal WebSocketTransport CreateWebSocketTransport()
internal WebSocketTransport CreateWebSocketTransport(FbOptions options)
{
var client = Server.CreateWebSocketClient();

return new WebSocketTransport(
options,
webSocketFactory: (uri, cancellationToken) => client.ConnectAsync(uri, cancellationToken)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System.Net.WebSockets;
using System.Text;

using FeatBit.Sdk.Server.Options;

namespace FeatBit.Sdk.Server.Transport;

[Collection(nameof(TestApp))]
Expand All @@ -16,7 +18,8 @@ public WebSocketsTransportTests(TestApp app)
[Fact]
public async Task StartAndStopAsync()
{
var transport = _app.CreateWebSocketTransport();
var options = new FbOptionsBuilder().Build();
var transport = _app.CreateWebSocketTransport(options);
var uri = _app.GetWsUri("echo");

await transport.StartAsync(uri);
Expand All @@ -32,7 +35,8 @@ public async Task StartAndStopAsync()
[Fact]
public async Task SendReceiveAsync()
{
var transport = _app.CreateWebSocketTransport();
var options = new FbOptionsBuilder().Build();
var transport = _app.CreateWebSocketTransport(options);
var uri = _app.GetWsUri("echo");

await transport.StartAsync(uri);
Expand All @@ -57,4 +61,16 @@ public async Task SendReceiveAsync()

await transport.StopAsync();
}

[Fact]
public async Task StartAsyncShouldThrowTimeoutExceptionWhenConnectTimesOut()
{
var options = new FbOptionsBuilder()
.ConnectTimeout(TimeSpan.FromTicks(1))
.Build();
var transport = new WebSocketTransport(options);
var uri = _app.GetWsUri("echo");

await Assert.ThrowsAsync<TimeoutException>(() => transport.StartAsync(uri));
}
}