Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/Websocket.Client/IWebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
/// </summary>
IObservable<DisconnectionInfo> DisconnectionHappened { get; }

/// <summary>
/// Time range for how long to wait while connecting a new client.
/// Default: 2 seconds
/// </summary>
TimeSpan ConnectTimeout { get; set; }

/// <summary>
/// Time range for how long to wait before reconnecting if no message comes from server.
/// Set null to disable this feature.
Expand Down Expand Up @@ -158,6 +164,21 @@
/// </summary>
bool IsRunning { get; }

/// <summary>
/// Returns whether text message sender is running.
/// </summary>
bool TextSenderRunning { get; }

/// <summary>
/// Returns whether the binary message sender is running.
/// </summary>
bool BinarySenderRunning { get; }

/// <summary>
/// Indicates whether any thread has entered the lock.
/// </summary>
bool IsInsideLock { get; }

/// <summary>
/// Enable or disable text message conversion from binary to string (via 'MessageEncoding' property).
/// Default: true
Expand Down Expand Up @@ -186,7 +207,7 @@
/// <summary>
/// Terminate the websocket connection and cleanup everything
/// </summary>
void Dispose();

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

Check warning on line 210 in src/Websocket.Client/IWebsocketClient.cs

View workflow job for this annotation

GitHub Actions / build

'IWebsocketClient.Dispose()' hides inherited member 'IDisposable.Dispose()'. Use the new keyword if hiding was intended.

/// <summary>
/// Start listening to the websocket stream on the background thread.
Expand Down
5 changes: 5 additions & 0 deletions src/Websocket.Client/Threading/WebsocketAsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public WebsocketAsyncLock()
_releaserTask = Task.FromResult(_releaser);
}

/// <summary>
/// True if the lock is currently taken
/// </summary>
public bool IsLocked => _semaphore.CurrentCount == 0;

/// <summary>
/// Use inside 'using' block
/// </summary>
Expand Down
5 changes: 3 additions & 2 deletions src/Websocket.Client/WebsocketClient.Reconnecting.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -67,7 +68,7 @@ private async Task Reconnect(ReconnectionType type, bool failFast, Exception? ca

var disType = TranslateTypeToDisconnection(type);
var disInfo = DisconnectionInfo.Create(disType, _client, causedException);
if (type != ReconnectionType.Error)
if (type != ReconnectionType.Error && _client?.State != WebSocketState.CloseReceived && _client?.State != WebSocketState.Closed)
{
_disconnectedSubject.OnNext(disInfo);
if (disInfo.CancelReconnection)
Expand All @@ -88,7 +89,7 @@ private async Task Reconnect(ReconnectionType type, bool failFast, Exception? ca
}
_client?.Dispose();

if (!IsReconnectionEnabled || disInfo.CancelReconnection)
if (type != ReconnectionType.Error && (!IsReconnectionEnabled || disInfo.CancelReconnection))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this change, it could affect existing implementations that rely on auto reconnection. I will test it locally

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this, the unit test Starting_WithServerDelay_RetriesAfterConnectionTimeout fails. With it, all the unit test pass.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a little worried it could stop the reconnection, but actually, it is the opposite; in case of error, it will always reconnect.
Looks good to me

{
// reconnection disabled, do nothing
IsStarted = false;
Expand Down
32 changes: 24 additions & 8 deletions src/Websocket.Client/WebsocketClient.Sending.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ public partial class WebsocketClient

private static readonly byte[] _emptyArray = { };

/// <inheritdoc />
public bool TextSenderRunning { get; private set; }

/// <inheritdoc />
public bool BinarySenderRunning { get; private set; }

/// <summary>
/// Send text message to the websocket channel.
/// It inserts the message to the queue and actual sending is done on another thread
Expand Down Expand Up @@ -159,6 +165,7 @@ public void StreamFakeMessage(ResponseMessage message)

private async Task SendTextFromQueue()
{
TextSenderRunning = true;
try
{
while (await _messagesTextToSendQueue.Reader.WaitToReadAsync())
Expand All @@ -176,30 +183,35 @@ private async Task SendTextFromQueue()
}
}
}
catch (TaskCanceledException)
catch (TaskCanceledException e)
{
// task was canceled, ignore
_logger.LogDebug(e, L("Sending text thread failed, error: {error}. Shutting down."), Name, e.Message);
}
catch (OperationCanceledException)
catch (OperationCanceledException e)
{
// operation was canceled, ignore
_logger.LogDebug(e, L("Sending text thread failed, error: {error}. Shutting down."), Name, e.Message);
}
catch (Exception e)
{
if (_cancellationTotal?.IsCancellationRequested == true || _disposing)
{
// disposing/canceling, do nothing and exit
_logger.LogDebug(e, L("Sending text thread failed, error: {error}. Shutting down."), Name, e.Message);
TextSenderRunning = false;
return;
}

_logger.LogTrace(L("Sending text thread failed, error: {error}. Creating a new sending thread."), Name, e.Message);
_logger.LogDebug(e, L("Sending text thread failed, error: {error}. Creating a new sending thread."), Name, e.Message);
StartBackgroundThreadForSendingText();
}

TextSenderRunning = false;
}

private async Task SendBinaryFromQueue()
{
BinarySenderRunning = true;
try
{
while (await _messagesBinaryToSendQueue.Reader.WaitToReadAsync())
Expand All @@ -217,26 +229,30 @@ private async Task SendBinaryFromQueue()
}
}
}
catch (TaskCanceledException)
catch (TaskCanceledException e)
{
// task was canceled, ignore
_logger.LogDebug(e, L("Sending binary thread failed, error: {error}. Shutting down."), Name, e.Message);
}
catch (OperationCanceledException)
catch (OperationCanceledException e)
{
// operation was canceled, ignore
_logger.LogDebug(e, L("Sending binary thread failed, error: {error}. Shutting down."), Name, e.Message);
}
catch (Exception e)
{
if (_cancellationTotal?.IsCancellationRequested == true || _disposing)
{
// disposing/canceling, do nothing and exit
_logger.LogDebug(e, L("Sending binary thread failed, error: {error}. Shutting down."), Name, e.Message);
BinarySenderRunning = false;
return;
}

_logger.LogTrace(L("Sending binary thread failed, error: {error}. Creating a new sending thread."), Name, e.Message);
_logger.LogDebug(e, L("Sending binary thread failed, error: {error}. Creating a new sending thread."), Name, e.Message);
StartBackgroundThreadForSendingBinary();
}

BinarySenderRunning = false;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better put it into finally block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but you already merged it now.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, np, I will update it

}

private void StartBackgroundThreadForSendingText()
Expand Down
35 changes: 27 additions & 8 deletions src/Websocket.Client/WebsocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public partial class WebsocketClient : IWebsocketClient
private bool _isReconnectionEnabled = true;
private WebSocket? _client;
private CancellationTokenSource? _cancellation;
private CancellationTokenSource? _cancellationConnection;
private CancellationTokenSource? _cancellationTotal;

private readonly Subject<ResponseMessage> _messageReceivedSubject = new Subject<ResponseMessage>();
Expand Down Expand Up @@ -123,6 +124,12 @@ public Uri Url
/// </summary>
public IObservable<DisconnectionInfo> DisconnectionHappened => _disconnectedSubject.AsObservable();

/// <summary>
/// Time range for how long to wait while connecting a new client.
/// Default: 2 seconds
/// </summary>
public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(2);

/// <summary>
/// Time range for how long to wait before reconnecting if no message comes from server.
/// Set null to disable this feature.
Expand Down Expand Up @@ -189,6 +196,9 @@ public bool IsReconnectionEnabled
/// Default: true
/// </summary>
public bool IsTextMessageConversionEnabled { get; set; } = true;

/// <inheritdoc />
public bool IsInsideLock => _locker.IsLocked;

/// <summary>
/// Enable or disable automatic <see cref="MemoryStream.Dispose(bool)"/> of the <see cref="MemoryStream"/>
Expand All @@ -210,19 +220,24 @@ public bool IsReconnectionEnabled
/// </summary>
public void Dispose()
{
if (_disposing)
return;

_disposing = true;
_logger.LogDebug(L("Disposing.."), Name);
try
{
_messagesTextToSendQueue.Writer.Complete();
_messagesBinaryToSendQueue.Writer.Complete();
_messagesTextToSendQueue.Writer.TryComplete();
_messagesBinaryToSendQueue.Writer.TryComplete();
_lastChanceTimer?.Dispose();
_errorReconnectTimer?.Dispose();
_cancellation?.Cancel();
_cancellationConnection?.Cancel();
_cancellationTotal?.Cancel();
_client?.Abort();
_client?.Dispose();
_cancellation?.Dispose();
_cancellationConnection?.Dispose();
_cancellationTotal?.Dispose();
_messageReceivedSubject.OnCompleted();
_reconnectionSubject.OnCompleted();
Expand Down Expand Up @@ -402,7 +417,9 @@ private async Task StartClient(Uri uri, CancellationToken token, ReconnectionTyp

try
{
_client = await _connectionFactory(uri, token).ConfigureAwait(false);
_cancellationConnection = CancellationTokenSource.CreateLinkedTokenSource(token);
_cancellationConnection.CancelAfter(ConnectTimeout);
_client = await _connectionFactory(uri, _cancellationConnection.Token).ConfigureAwait(false);
_ = Listen(_client, token);
IsRunning = true;
IsStarted = true;
Expand All @@ -412,6 +429,7 @@ private async Task StartClient(Uri uri, CancellationToken token, ReconnectionTyp
}
catch (Exception e)
{
IsRunning = _client?.State == WebSocketState.Open;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this possible to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the exception could get thrown unexpectedly (perhaps even in future after other changes happen to this code). This is just here to be extra safe and make sure that we set the IsRunning property to false in case it is true at this point. I was just being pessimistic with this line of code. It can be removed if you want.

var info = DisconnectionInfo.Create(DisconnectionType.Error, _client, e);
_disconnectedSubject.OnNext(info);

Expand Down Expand Up @@ -447,7 +465,9 @@ private async Task StartClient(Uri uri, CancellationToken token, ReconnectionTyp

private void ReconnectOnError(object? state)
{
// await Task.Delay(timeout, token).ConfigureAwait(false);
if (_client != null && ShouldIgnoreReconnection(_client))
return;

_ = Reconnect(ReconnectionType.Error, false, state as Exception).ConfigureAwait(false);
}

Expand Down Expand Up @@ -491,7 +511,7 @@ private async Task Listen(WebSocket client, CancellationToken token)
}
else if (result.MessageType == WebSocketMessageType.Close)
{
_logger.LogTrace(L("Received close message"), Name);
_logger.LogDebug(L("Received close message"), Name);

if (!IsStarted || _stopping)
{
Expand All @@ -512,13 +532,12 @@ private async Task Listen(WebSocket client, CancellationToken token)
continue;
}

await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing",
token, false, true);
await StopInternal(client, WebSocketCloseStatus.NormalClosure, "Closing", token, false, true);

// reconnect if enabled
if (IsReconnectionEnabled && !ShouldIgnoreReconnection(client))
{
_ = ReconnectSynchronized(ReconnectionType.Lost, false, null);
_ = ReconnectSynchronized(ReconnectionType.ByServer, false, null);
}

return;
Expand Down
Loading