From 0fa2ee066c8b57f1b3ba59df75312cbbaff8a956 Mon Sep 17 00:00:00 2001 From: Adam Trzmiel Date: Wed, 10 Dec 2025 12:41:29 +0100 Subject: [PATCH 1/3] fix(webgl): Fixed js dyncalls, added support for Cookies --- .../MagicOnionSampleApp.Shared.csproj | 1 + .../GrpcWebSocketBridgeHandler.Net.cs | 2 +- .../GrpcWebSocketBridgeHandler.Unity.cs | 4 ++-- .../GrpcWebSocketBridgeHandler.cs | 2 ++ src/GrpcWebSocketBridge.Client/Unity/JsWebSocket.jslib | 10 +++++----- .../Unity/UnityWebRequestHttpHandler.cs | 2 ++ .../WebSockets/SystemNetWebSocketsClientWebSocket.cs | 5 +++-- src/GrpcWebSocketBridge.Client/package.json | 4 ++-- 8 files changed, 18 insertions(+), 12 deletions(-) diff --git a/samples/MagicOnionSampleApp.Shared/MagicOnionSampleApp.Shared.csproj b/samples/MagicOnionSampleApp.Shared/MagicOnionSampleApp.Shared.csproj index c7753f0..db305b2 100644 --- a/samples/MagicOnionSampleApp.Shared/MagicOnionSampleApp.Shared.csproj +++ b/samples/MagicOnionSampleApp.Shared/MagicOnionSampleApp.Shared.csproj @@ -2,6 +2,7 @@ netstandard2.1 + enable diff --git a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Net.cs b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Net.cs index 3eb5285..1d660e6 100644 --- a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Net.cs +++ b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Net.cs @@ -9,7 +9,7 @@ public partial class GrpcWebSocketBridgeHandler { private static PipeOptions PipeOptions { get; } = new PipeOptions(); - private static IClientWebSocket CreateClientWebSocket() => new SystemNetWebSocketsClientWebSocket(); + private IClientWebSocket CreateClientWebSocket() => new SystemNetWebSocketsClientWebSocket(CookieContainer); public GrpcWebSocketBridgeHandler(bool forceWebSocketMode = false) diff --git a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Unity.cs b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Unity.cs index fb56415..a259a8c 100644 --- a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Unity.cs +++ b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.Unity.cs @@ -13,11 +13,11 @@ public partial class GrpcWebSocketBridgeHandler { private static PipeOptions PipeOptions { get; } = new PipeOptions(readerScheduler: PipeScheduler.Inline, writerScheduler: PipeScheduler.Inline, useSynchronizationContext: false); - private static IClientWebSocket CreateClientWebSocket() => + private IClientWebSocket CreateClientWebSocket() => #if UNITY_WEBGL && !UNITY_EDITOR new JsWebSocketsClientWebSocket(); #else - new SystemNetWebSocketsClientWebSocket(); + new SystemNetWebSocketsClientWebSocket(CookieContainer); #endif public GrpcWebSocketBridgeHandler(bool forceWebSocketMode = false) diff --git a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs index 55eb739..ed4b9ea 100644 --- a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs +++ b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs @@ -30,6 +30,8 @@ namespace GrpcWebSocketBridge.Client { public partial class GrpcWebSocketBridgeHandler : DelegatingHandler { + public CookieContainer? CookieContainer { get; set; } + private readonly HashSet _ongoingWebSockets = new HashSet(); private readonly bool _forceWebSocketMode = false; diff --git a/src/GrpcWebSocketBridge.Client/Unity/JsWebSocket.jslib b/src/GrpcWebSocketBridge.Client/Unity/JsWebSocket.jslib index d2fe1c8..3949575 100644 --- a/src/GrpcWebSocketBridge.Client/Unity/JsWebSocket.jslib +++ b/src/GrpcWebSocketBridge.Client/Unity/JsWebSocket.jslib @@ -23,18 +23,18 @@ var connection = connections[handle]; connection.socket = new WebSocket(connections[handle].url, connections[handle].subProtocol); - + connection.socket.binaryType = 'arraybuffer'; connection.socket.onopen = function (e) { - dynCall('vi', connection.onConnected, [connection.id]); + getWasmTableEntry(connection.onConnected)(connection.id); }; connection.socket.onclose = function (e) { - dynCall('viii', connection.onClose, [connection.id, e.code, e.wasClean ? 1 : 0]); + getWasmTableEntry(connection.onClose)(connection.id, e.code, e.wasClean ? 1 : 0); }; connection.socket.onmessage = function (e) { var buffer = _malloc(e.data.byteLength); HEAPU8.set(new Uint8Array(e.data), buffer); - dynCall('viii', connection.onReceive, [connection.id, buffer, e.data.byteLength]); + getWasmTableEntry(connection.onReceive)(connection.id, buffer, e.data.byteLength); _free(buffer); }; }, @@ -72,4 +72,4 @@ autoAddDeps(JsWebSocketLibrary, '$connections'); autoAddDeps(JsWebSocketLibrary, '$connectionsSequence'); -mergeInto(LibraryManager.library, JsWebSocketLibrary); \ No newline at end of file +mergeInto(LibraryManager.library, JsWebSocketLibrary); diff --git a/src/GrpcWebSocketBridge.Client/Unity/UnityWebRequestHttpHandler.cs b/src/GrpcWebSocketBridge.Client/Unity/UnityWebRequestHttpHandler.cs index d8a6297..35d62e6 100644 --- a/src/GrpcWebSocketBridge.Client/Unity/UnityWebRequestHttpHandler.cs +++ b/src/GrpcWebSocketBridge.Client/Unity/UnityWebRequestHttpHandler.cs @@ -57,9 +57,11 @@ protected override async Task SendAsync(HttpRequestMessage }; var responseHeader = response.Headers; + var contentHeader = response.Content.Headers; foreach (var header in webRequest.GetResponseHeaders()) { responseHeader.TryAddWithoutValidation(header.Key, header.Value); + contentHeader.TryAddWithoutValidation(header.Key, header.Value); } return response; diff --git a/src/GrpcWebSocketBridge.Client/WebSockets/SystemNetWebSocketsClientWebSocket.cs b/src/GrpcWebSocketBridge.Client/WebSockets/SystemNetWebSocketsClientWebSocket.cs index 3a65ab3..2819b8c 100644 --- a/src/GrpcWebSocketBridge.Client/WebSockets/SystemNetWebSocketsClientWebSocket.cs +++ b/src/GrpcWebSocketBridge.Client/WebSockets/SystemNetWebSocketsClientWebSocket.cs @@ -1,4 +1,5 @@ using System; +using System.Net; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; @@ -9,9 +10,9 @@ internal class SystemNetWebSocketsClientWebSocket : IClientWebSocket { private readonly System.Net.WebSockets.ClientWebSocket _clientWebSocket; - public SystemNetWebSocketsClientWebSocket() + public SystemNetWebSocketsClientWebSocket(CookieContainer? cookieContainer = null) { - _clientWebSocket = new ClientWebSocket(); + _clientWebSocket = new ClientWebSocket { Options = { Cookies = cookieContainer } }; } public WebSocketState State => _clientWebSocket.State; diff --git a/src/GrpcWebSocketBridge.Client/package.json b/src/GrpcWebSocketBridge.Client/package.json index 6645858..8b73282 100644 --- a/src/GrpcWebSocketBridge.Client/package.json +++ b/src/GrpcWebSocketBridge.Client/package.json @@ -2,8 +2,8 @@ "name": "com.cysharp.grpcwebsocketbridge", "displayName": "GrpcWebSocketBridge", "author": { "name": "Cysharp, Inc.", "url": "https://cysharp.co.jp/en/" }, - "version": "1.4.1", - "unity": "2022.3", + "version": "1.4.2", + "unity": "6000.0", "description": "", "keywords": ["WebSocket", "gRPC"], "license": "MIT", From 16ca4002d42d8787cb6bc000ee7d50f2e4eaa4b6 Mon Sep 17 00:00:00 2001 From: Adam Trzmiel Date: Wed, 25 Mar 2026 03:19:05 +0100 Subject: [PATCH 2/3] fix(websocket-termination): Fixed issue with websocket termination for Duplex streaming --- .../GrpcWebSocketBridgeHandler.cs | 65 ++++++++++++------- .../Unity/JsWebSocketClient.cs | 3 +- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs index ed4b9ea..e008634 100644 --- a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs +++ b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs @@ -1,4 +1,5 @@ // NET_STANDARD is .NET Standard 2.1 on Unity + #if NET_STANDARD_2_0 #define NETSTANDARD2_0 #endif @@ -12,12 +13,10 @@ using System.Buffers; using System.Collections.Generic; using System.IO.Pipelines; -using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Net.WebSockets; -using System.Text; using System.Threading; using System.Threading.Tasks; using Grpc.AspNetCore.Web.Internal; @@ -30,12 +29,15 @@ namespace GrpcWebSocketBridge.Client { public partial class GrpcWebSocketBridgeHandler : DelegatingHandler { - public CookieContainer? CookieContainer { get; set; } + private readonly bool _forceWebSocketMode; - private readonly HashSet _ongoingWebSockets = new HashSet(); - private readonly bool _forceWebSocketMode = false; + private readonly HashSet _ongoingWebSockets = new(); + public CookieContainer? CookieContainer { get; set; } - private static TaskCompletionSource CreateHeadersTaskCompletionSource() => new TaskCompletionSource(); + private static TaskCompletionSource CreateHeadersTaskCompletionSource() + { + return new TaskCompletionSource(); + } private void AddToOngoing(IClientWebSocket webSocket) { @@ -54,15 +56,17 @@ private void RemoveFromOngoing(IClientWebSocket webSocket) } } - protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + protected override Task SendAsync(HttpRequestMessage request, + CancellationToken cancellationToken) { // If the request invokes Unary method, use HTTP/1 transport instead of WebSocket. - return (request.Content.GetType().Name.Contains("Unary") && !_forceWebSocketMode) + return request.Content.GetType().Name.Contains("Unary") && !_forceWebSocketMode ? SendWithHttpHandlerAsync(request, cancellationToken) : SendWithWebSocketAsync(request, cancellationToken); } - private async Task SendWithHttpHandlerAsync(HttpRequestMessage request, CancellationToken cancellationToken) + private async Task SendWithHttpHandlerAsync(HttpRequestMessage request, + CancellationToken cancellationToken) { request.Version = HttpVersion.Version11; // NOTE: Force downgrade to HTTP/1.1 request.Content = new GrpcWebRequestContent(request.Content); @@ -85,7 +89,8 @@ private async Task SendWithHttpHandlerAsync(HttpRequestMess return response; } - private async Task SendWithWebSocketAsync(HttpRequestMessage request, CancellationToken cancellationToken) + private async Task SendWithWebSocketAsync(HttpRequestMessage request, + CancellationToken cancellationToken) { request.Content = new GrpcWebSocketRequestContent(request.Content); @@ -105,12 +110,12 @@ private async Task SendWithWebSocketAsync(HttpRequestMessag var arrayBufferWriter = new ArrayBufferWriter(); GrpcWebProtocolHelpers.WriteTrailers(request.Headers, arrayBufferWriter); - await clientWebSocket.SendAsync(new ArraySegment(arrayBufferWriter.WrittenMemory.ToArray()), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); + await clientWebSocket.SendAsync(new ArraySegment(arrayBufferWriter.WrittenMemory.ToArray()), + WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); var response = new HttpResponseMessage(HttpStatusCode.OK) { - Version = GrpcProtocolConstants.Http2Version, - RequestMessage = request, + Version = GrpcProtocolConstants.Http2Version, RequestMessage = request }; #if NETSTANDARD2_0 response.EnsureTrailingHeaders(); @@ -120,7 +125,8 @@ private async Task SendWithWebSocketAsync(HttpRequestMessag var requestPushTask = ProcessRequestAsync(clientWebSocket, request, ctx, cancellationToken); var responseTask = ProcessResponseAsync(clientWebSocket, response, ctx, cancellationToken); - var responseContent = new GrpcWebSocketResponseContent(ctx.ResponsePipe.Reader, () => RemoveFromOngoing(clientWebSocket)); + var responseContent = + new GrpcWebSocketResponseContent(ctx.ResponsePipe.Reader, () => RemoveFromOngoing(clientWebSocket)); responseContent.Headers.ContentType = new MediaTypeHeaderValue(GrpcProtocolConstants.GrpcContentType); response.Content = responseContent; @@ -130,9 +136,11 @@ private async Task SendWithWebSocketAsync(HttpRequestMessag } - private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpRequestMessage request, ConnectionContext ctx, CancellationToken cancellationToken) + private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpRequestMessage request, + ConnectionContext ctx, CancellationToken cancellationToken) { - cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; + cancellationToken = CancellationTokenSource + .CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; _ = request.Content.CopyToAsync(ctx.RequestPipe.Writer.AsStream()).ConfigureAwait(false); @@ -147,7 +155,9 @@ private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpReq try { result.Buffer.CopyTo(buffer); - await clientWebSocket.SendAsync(new ArraySegment(buffer, 0, (int)result.Buffer.Length), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); + await clientWebSocket + .SendAsync(new ArraySegment(buffer, 0, (int)result.Buffer.Length), + WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); } finally { @@ -171,8 +181,11 @@ private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpReq { try { + //TODO: Sending this packet breaks websocket graceful completion as the server stops listening to any incoming packets after receiving this one + //TODO: The actual change should happen server-side to continue listening for frames even after receiving this packet + //TODO: Removing this line fixes websocket termination for Client and Duplex streaming but brakes Server streaming because a completed RequestStream is required // Send a empty trailer for completion. - await clientWebSocket.SendAsync(new ArraySegment(new byte[] { 0b10000000, 0x00, 0x00, 0x00, 0x00 }), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); + //await clientWebSocket.SendAsync(new ArraySegment(new byte[] { 0b10000000, 0x00, 0x00, 0x00, 0x00 }), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); } catch (WebSocketException e) when (e.WebSocketErrorCode == WebSocketError.InvalidState) { @@ -185,19 +198,22 @@ private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpReq } } - private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpResponseMessage response, ConnectionContext ctx, CancellationToken cancellationToken) + private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpResponseMessage response, + ConnectionContext ctx, CancellationToken cancellationToken) { - cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; + cancellationToken = CancellationTokenSource + .CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; var reader = new GrpcWebSocketBufferReader(); - var buffer = ArrayPool.Shared.Rent(minimumLength: 32 * 1024); + var buffer = ArrayPool.Shared.Rent(32 * 1024); var bufferWriter = new ArrayBufferWriter(); var readOffset = 0; try { while (clientWebSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested) { - var result = await clientWebSocket.ReceiveAsync(new ArraySegment(buffer), cancellationToken).ConfigureAwait(false); + var result = await clientWebSocket.ReceiveAsync(new ArraySegment(buffer), cancellationToken) + .ConfigureAwait(false); if (result.Count > 0) { bufferWriter.Write(buffer.AsSpan(0, result.Count)); @@ -214,6 +230,7 @@ private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpRe { response.Headers.TryAddWithoutValidation(keyValue.Key, keyValue.Value); } + ctx.SignalResponseHeaderHasReceived(); break; case GrpcWebSocketBufferReader.BufferReadResultType.Trailer: @@ -227,7 +244,8 @@ private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpRe await ctx.CompleteResponseAsync().ConfigureAwait(false); return; case GrpcWebSocketBufferReader.BufferReadResultType.Content: - await ctx.ResponsePipe.Writer.WriteAsync(readResult.Data, cancellationToken).ConfigureAwait(false); + await ctx.ResponsePipe.Writer.WriteAsync(readResult.Data, cancellationToken) + .ConfigureAwait(false); break; } @@ -268,6 +286,7 @@ protected override void Dispose(bool disposing) { clientWebSocket.Dispose(); } + _ongoingWebSockets.Clear(); } } diff --git a/src/GrpcWebSocketBridge.Client/Unity/JsWebSocketClient.cs b/src/GrpcWebSocketBridge.Client/Unity/JsWebSocketClient.cs index 9522fcc..90c5df1 100644 --- a/src/GrpcWebSocketBridge.Client/Unity/JsWebSocketClient.cs +++ b/src/GrpcWebSocketBridge.Client/Unity/JsWebSocketClient.cs @@ -128,6 +128,7 @@ private void ThrowIfStateIsNotOpen() public void Dispose() { + CloseAsync(WebSocketCloseStatus.NormalClosure, "Disposed", CancellationToken.None); _ws.Dispose(); } @@ -159,7 +160,7 @@ public ValueWebSocketReceiveResult(int count, WebSocketMessageType messageType, public class JsWebSocket { private static Dictionary _instanceByHandle = new Dictionary(); - + private Queue _queue = new Queue(); private TaskCompletionSource _receiveTcs; private int _handle; From c74ad298526825b3d0f23b18565e44861687d14d Mon Sep 17 00:00:00 2001 From: Adam Trzmiel Date: Wed, 25 Mar 2026 04:01:15 +0100 Subject: [PATCH 3/3] fix(close-websockets): Fixed an issue causing web-sockets to not terminate gracefully --- .../GrpcWebSocketBridgeMiddleware.cs | 15 +++-- .../GrpcWebSocketBridgeHandler.cs | 65 +++++++------------ 2 files changed, 31 insertions(+), 49 deletions(-) diff --git a/src/GrpcWebSocketBridge.AspNetCore/GrpcWebSocketBridgeMiddleware.cs b/src/GrpcWebSocketBridge.AspNetCore/GrpcWebSocketBridgeMiddleware.cs index eec3158..596a222 100644 --- a/src/GrpcWebSocketBridge.AspNetCore/GrpcWebSocketBridgeMiddleware.cs +++ b/src/GrpcWebSocketBridge.AspNetCore/GrpcWebSocketBridgeMiddleware.cs @@ -93,7 +93,6 @@ private async Task RunReadFromClientLoopAsync(PipeWriter websocketPipeWriter, We await readyToRunTask; var bufferArray = new byte[ReaderWriterBufferSize]; - var isRequestCompleted = false; var isPipeCompleted = false; var reader = new GrpcWebSocketBufferReader(); @@ -101,14 +100,15 @@ private async Task RunReadFromClientLoopAsync(PipeWriter websocketPipeWriter, We var consumed = 0; try { - while (webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested && !isRequestCompleted) + while (webSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested) { var result = await webSocket.ReceiveAsync(bufferArray, cancellationToken); - if (result.MessageType != WebSocketMessageType.Binary) continue; + if (result.MessageType != WebSocketMessageType.Binary) + continue; bufferWriter.Write(bufferArray.AsSpan(0, result.Count)); - while (reader.TryRead(bufferWriter.WrittenMemory.Slice(consumed), out var readResult)) + while (!isPipeCompleted && reader.TryRead(bufferWriter.WrittenMemory.Slice(consumed), out var readResult)) { switch (readResult.Type) { @@ -125,7 +125,8 @@ private async Task RunReadFromClientLoopAsync(PipeWriter websocketPipeWriter, We await websocketPipeWriter.FlushAsync(cancellationToken); break; case GrpcWebSocketBufferReader.BufferReadResultType.Trailer: - isRequestCompleted = true; + await websocketPipeWriter.CompleteAsync(); + isPipeCompleted = true; break; } @@ -147,7 +148,7 @@ private async Task RunReadFromClientLoopAsync(PipeWriter websocketPipeWriter, We catch (Exception e) when (e is ConnectionAbortedException || e is WebSocketException) { // When the WebSocket connection has been closed, Ignore ConnectionAbortedException and WebSocketException. - if (!isRequestCompleted) + if (!isPipeCompleted) { await websocketPipeWriter.CompleteAsync(new IOException("The request was aborted.", e)); isPipeCompleted = true; @@ -169,7 +170,7 @@ private async Task RunWriteToClientLoopAsync(Pipe writerPipe, WebSocket webSocke // Wait until the features are ready to run. await readyToRunTask; - using var stream = writerPipe.Reader.AsStream(); + await using var stream = writerPipe.Reader.AsStream(); var bufferArray = new byte[ReaderWriterBufferSize]; var readLen = 0; while ((readLen = await stream.ReadAsync(bufferArray, cancellationToken)) > 0) diff --git a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs index e008634..ed4b9ea 100644 --- a/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs +++ b/src/GrpcWebSocketBridge.Client/GrpcWebSocketBridgeHandler.cs @@ -1,5 +1,4 @@ // NET_STANDARD is .NET Standard 2.1 on Unity - #if NET_STANDARD_2_0 #define NETSTANDARD2_0 #endif @@ -13,10 +12,12 @@ using System.Buffers; using System.Collections.Generic; using System.IO.Pipelines; +using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Net.WebSockets; +using System.Text; using System.Threading; using System.Threading.Tasks; using Grpc.AspNetCore.Web.Internal; @@ -29,15 +30,12 @@ namespace GrpcWebSocketBridge.Client { public partial class GrpcWebSocketBridgeHandler : DelegatingHandler { - private readonly bool _forceWebSocketMode; - - private readonly HashSet _ongoingWebSockets = new(); public CookieContainer? CookieContainer { get; set; } - private static TaskCompletionSource CreateHeadersTaskCompletionSource() - { - return new TaskCompletionSource(); - } + private readonly HashSet _ongoingWebSockets = new HashSet(); + private readonly bool _forceWebSocketMode = false; + + private static TaskCompletionSource CreateHeadersTaskCompletionSource() => new TaskCompletionSource(); private void AddToOngoing(IClientWebSocket webSocket) { @@ -56,17 +54,15 @@ private void RemoveFromOngoing(IClientWebSocket webSocket) } } - protected override Task SendAsync(HttpRequestMessage request, - CancellationToken cancellationToken) + protected override Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) { // If the request invokes Unary method, use HTTP/1 transport instead of WebSocket. - return request.Content.GetType().Name.Contains("Unary") && !_forceWebSocketMode + return (request.Content.GetType().Name.Contains("Unary") && !_forceWebSocketMode) ? SendWithHttpHandlerAsync(request, cancellationToken) : SendWithWebSocketAsync(request, cancellationToken); } - private async Task SendWithHttpHandlerAsync(HttpRequestMessage request, - CancellationToken cancellationToken) + private async Task SendWithHttpHandlerAsync(HttpRequestMessage request, CancellationToken cancellationToken) { request.Version = HttpVersion.Version11; // NOTE: Force downgrade to HTTP/1.1 request.Content = new GrpcWebRequestContent(request.Content); @@ -89,8 +85,7 @@ private async Task SendWithHttpHandlerAsync(HttpRequestMess return response; } - private async Task SendWithWebSocketAsync(HttpRequestMessage request, - CancellationToken cancellationToken) + private async Task SendWithWebSocketAsync(HttpRequestMessage request, CancellationToken cancellationToken) { request.Content = new GrpcWebSocketRequestContent(request.Content); @@ -110,12 +105,12 @@ private async Task SendWithWebSocketAsync(HttpRequestMessag var arrayBufferWriter = new ArrayBufferWriter(); GrpcWebProtocolHelpers.WriteTrailers(request.Headers, arrayBufferWriter); - await clientWebSocket.SendAsync(new ArraySegment(arrayBufferWriter.WrittenMemory.ToArray()), - WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); + await clientWebSocket.SendAsync(new ArraySegment(arrayBufferWriter.WrittenMemory.ToArray()), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); var response = new HttpResponseMessage(HttpStatusCode.OK) { - Version = GrpcProtocolConstants.Http2Version, RequestMessage = request + Version = GrpcProtocolConstants.Http2Version, + RequestMessage = request, }; #if NETSTANDARD2_0 response.EnsureTrailingHeaders(); @@ -125,8 +120,7 @@ await clientWebSocket.SendAsync(new ArraySegment(arrayBufferWriter.Written var requestPushTask = ProcessRequestAsync(clientWebSocket, request, ctx, cancellationToken); var responseTask = ProcessResponseAsync(clientWebSocket, response, ctx, cancellationToken); - var responseContent = - new GrpcWebSocketResponseContent(ctx.ResponsePipe.Reader, () => RemoveFromOngoing(clientWebSocket)); + var responseContent = new GrpcWebSocketResponseContent(ctx.ResponsePipe.Reader, () => RemoveFromOngoing(clientWebSocket)); responseContent.Headers.ContentType = new MediaTypeHeaderValue(GrpcProtocolConstants.GrpcContentType); response.Content = responseContent; @@ -136,11 +130,9 @@ await clientWebSocket.SendAsync(new ArraySegment(arrayBufferWriter.Written } - private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpRequestMessage request, - ConnectionContext ctx, CancellationToken cancellationToken) + private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpRequestMessage request, ConnectionContext ctx, CancellationToken cancellationToken) { - cancellationToken = CancellationTokenSource - .CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; _ = request.Content.CopyToAsync(ctx.RequestPipe.Writer.AsStream()).ConfigureAwait(false); @@ -155,9 +147,7 @@ private async Task ProcessRequestAsync(IClientWebSocket clientWebSocket, HttpReq try { result.Buffer.CopyTo(buffer); - await clientWebSocket - .SendAsync(new ArraySegment(buffer, 0, (int)result.Buffer.Length), - WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); + await clientWebSocket.SendAsync(new ArraySegment(buffer, 0, (int)result.Buffer.Length), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); } finally { @@ -181,11 +171,8 @@ await clientWebSocket { try { - //TODO: Sending this packet breaks websocket graceful completion as the server stops listening to any incoming packets after receiving this one - //TODO: The actual change should happen server-side to continue listening for frames even after receiving this packet - //TODO: Removing this line fixes websocket termination for Client and Duplex streaming but brakes Server streaming because a completed RequestStream is required // Send a empty trailer for completion. - //await clientWebSocket.SendAsync(new ArraySegment(new byte[] { 0b10000000, 0x00, 0x00, 0x00, 0x00 }), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); + await clientWebSocket.SendAsync(new ArraySegment(new byte[] { 0b10000000, 0x00, 0x00, 0x00, 0x00 }), WebSocketMessageType.Binary, true, cancellationToken).ConfigureAwait(false); } catch (WebSocketException e) when (e.WebSocketErrorCode == WebSocketError.InvalidState) { @@ -198,22 +185,19 @@ await clientWebSocket } } - private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpResponseMessage response, - ConnectionContext ctx, CancellationToken cancellationToken) + private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpResponseMessage response, ConnectionContext ctx, CancellationToken cancellationToken) { - cancellationToken = CancellationTokenSource - .CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; + cancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, ctx.ConnectionAborted).Token; var reader = new GrpcWebSocketBufferReader(); - var buffer = ArrayPool.Shared.Rent(32 * 1024); + var buffer = ArrayPool.Shared.Rent(minimumLength: 32 * 1024); var bufferWriter = new ArrayBufferWriter(); var readOffset = 0; try { while (clientWebSocket.State == WebSocketState.Open && !cancellationToken.IsCancellationRequested) { - var result = await clientWebSocket.ReceiveAsync(new ArraySegment(buffer), cancellationToken) - .ConfigureAwait(false); + var result = await clientWebSocket.ReceiveAsync(new ArraySegment(buffer), cancellationToken).ConfigureAwait(false); if (result.Count > 0) { bufferWriter.Write(buffer.AsSpan(0, result.Count)); @@ -230,7 +214,6 @@ private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpRe { response.Headers.TryAddWithoutValidation(keyValue.Key, keyValue.Value); } - ctx.SignalResponseHeaderHasReceived(); break; case GrpcWebSocketBufferReader.BufferReadResultType.Trailer: @@ -244,8 +227,7 @@ private async Task ProcessResponseAsync(IClientWebSocket clientWebSocket, HttpRe await ctx.CompleteResponseAsync().ConfigureAwait(false); return; case GrpcWebSocketBufferReader.BufferReadResultType.Content: - await ctx.ResponsePipe.Writer.WriteAsync(readResult.Data, cancellationToken) - .ConfigureAwait(false); + await ctx.ResponsePipe.Writer.WriteAsync(readResult.Data, cancellationToken).ConfigureAwait(false); break; } @@ -286,7 +268,6 @@ protected override void Dispose(bool disposing) { clientWebSocket.Dispose(); } - _ongoingWebSockets.Clear(); } }