From f1168042d6516abc320e3ee1b7fc9830c35081fc Mon Sep 17 00:00:00 2001 From: gogy4 Date: Sun, 8 Mar 2026 02:21:24 +0500 Subject: [PATCH 1/2] feat(client): add Parallel, RoundRobin and Smart replica request strategies --- .../Clients/ClusterClientBase.cs | 71 +++++++++++++++++++ .../Clients/ParallelClusterClient.cs | 23 ++++-- .../Clients/RoundRobinClusterClient.cs | 39 +++++++++- .../Clients/SmartClusterClient.cs | 64 +++++++++++++++-- 4 files changed, 185 insertions(+), 12 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..bd43ba1 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -40,5 +42,74 @@ protected async Task ProcessRequestAsync(WebRequest request) return result; } } + + protected async Task> WaitForReplicaAndLogAsync( + List> requests, + TimeSpan timeout, + bool throwIfTimeout = false) + { + ArgumentNullException.ThrowIfNull(requests); + + Log.Debug($"Waiting for replica response. Pending replicas: {requests.Count}"); + var completedRequest = await WaitWithTimeout(requests, timeout); + + if (completedRequest == null) + { + Log.Debug("Global timeout reached. No replica responded in time"); + return throwIfTimeout + ? throw new TimeoutException() + : null; + } + + requests.Remove(completedRequest); + Log.Debug($"Replica task completed. Remaining replicas: {requests.Count}"); + + if (completedRequest.IsCompletedSuccessfully) + { + Log.Info("Replica responded successfully"); + return completedRequest; + } + + Log.Error("Replica failed", completedRequest.Exception?.InnerException); + return completedRequest; + } + + protected async Task HandleWhenAnyResultAsync(Task requestTask, Task completedTask, string replicaUri) + { + if (completedTask == requestTask) + { + if (!requestTask.IsCompletedSuccessfully) + { + Log.Error($"Replica {replicaUri} failed", requestTask.Exception?.InnerException); + return null; + } + + Log.Info($"Replica {replicaUri} responded successfully"); + return await requestTask; + } + + Log.Debug($"Replica {replicaUri} timed out"); + return null; + } + + protected async Task ProcessReplicaAsync(string uri, string query) + { + var request = CreateRequest($"{uri}?query={query}"); + Log.Info($"Processing {request.RequestUri}"); + return await ProcessRequestAsync(request); + } + + private async Task> WaitWithTimeout(IEnumerable> tasks, TimeSpan timeout) + { + var delay = Task.Delay(timeout); + var completed = await Task.WhenAny(tasks.Append(delay)); + + if (completed == delay) + { + return null; + } + + return (Task)completed; + } } } \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..c892352 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,7 +1,5 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -13,11 +11,26 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var requests = ReplicaAddresses + .Select(uri => ProcessReplicaAsync(uri, query)) + .ToList(); + + while (requests.Count > 0) + { + var completedRequest = await WaitForReplicaAndLogAsync(requests, timeout, throwIfTimeout: true); + + if (completedRequest is { IsCompletedSuccessfully: true }) + { + return await completedRequest; + } + } + + Log.Error("All replicas failed"); + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..444803a 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,7 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,13 +10,45 @@ namespace ClusterClient.Clients { public class RoundRobinClusterClient : ClusterClientBase { + private readonly ConcurrentDictionary _replicaTimings = new(); + public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var replicaAddress in replicaAddresses) + { + _replicaTimings.TryAdd(replicaAddress, 0); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var replicasLeft = ReplicaAddresses.Length; + + var sortedReplicas = ReplicaAddresses + .OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)); + + foreach (var replicaUri in sortedReplicas) + { + var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--; + + Log.Debug($"Sending request to replica {replicaUri}. Timeout per replica: {timeoutPerReplica.TotalMilliseconds} ms"); + + var requestTask = ProcessReplicaAsync(replicaUri, query); + var delay = Task.Delay(timeoutPerReplica); + + var completedTask = await Task.WhenAny(requestTask, delay); + _replicaTimings[replicaUri] = stopwatch.ElapsedMilliseconds; + + var result = await HandleWhenAnyResultAsync(requestTask, completedTask, replicaUri); + if (result != null) + { + return result; + } + } + + Log.Debug("All replicas failed or timed out"); + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..e13a8f4 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,7 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,15 +10,70 @@ namespace ClusterClient.Clients { public class SmartClusterClient : ClusterClientBase { + private readonly ConcurrentDictionary _replicaTimings = new(); + public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var replicaAddress in replicaAddresses) + { + _replicaTimings.TryAdd(replicaAddress, 0); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var replicasLeft = ReplicaAddresses.Length; + + var sortedReplicas = ReplicaAddresses + .OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)) + .ToList(); + + Log.Debug($"Replica order: {string.Join(", ", sortedReplicas)}"); + + var pendingRequests = new List>(); + + foreach (var replicaAddress in sortedReplicas) + { + var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--; + + Log.Debug($"Starting request to {replicaAddress}. Timeout per replica: {timeoutPerReplica.TotalMilliseconds} ms"); + + pendingRequests.Add(ProcessReplicaAsync(replicaAddress, query)); + + var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, timeoutPerReplica, throwIfTimeout: false); + + _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; + + if (completedTask is not { IsCompletedSuccessfully: true }) continue; + Log.Info($"Replica responded successfully in {stopwatch.ElapsedMilliseconds} ms"); + return await completedTask; + } + + Log.Debug("Initial replica attempts finished. Waiting for remaining pending requests"); + + while (pendingRequests.Count > 0) + { + var remainingTime = timeout - stopwatch.Elapsed; + + if (remainingTime <= TimeSpan.Zero) + { + Log.Debug("Global timeout reached while waiting for pending replicas"); + break; + } + + Log.Debug($"Waiting for pending replicas. Remaining: {pendingRequests.Count}"); + var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, remainingTime, throwIfTimeout: false); + + if (completedTask is not { IsCompletedSuccessfully: true }) continue; + Log.Info($"Pending replica responded successfully in {stopwatch.ElapsedMilliseconds} ms"); + return await completedTask; + } + + Log.Error("All replicas failed or timed out"); + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file From 86ac8112ad7cbb682248f4f79938b89cda304490 Mon Sep 17 00:00:00 2001 From: gogy4 Date: Sun, 8 Mar 2026 02:32:52 +0500 Subject: [PATCH 2/2] chore: minor formatting --- .../Clients/ParallelClusterClient.cs | 2 +- .../Clients/RoundRobinClusterClient.cs | 3 ++- .../Clients/SmartClusterClient.cs | 23 ++++++++++++------- 3 files changed, 18 insertions(+), 10 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index c892352..c1dcc24 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -19,7 +19,7 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti while (requests.Count > 0) { - var completedRequest = await WaitForReplicaAndLogAsync(requests, timeout, throwIfTimeout: true); + var completedRequest = await WaitForReplicaAndLogAsync(requests, timeout, true); if (completedRequest is { IsCompletedSuccessfully: true }) { diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 444803a..932c002 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -32,7 +32,8 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti { var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--; - Log.Debug($"Sending request to replica {replicaUri}. Timeout per replica: {timeoutPerReplica.TotalMilliseconds} ms"); + Log.Debug($"Sending request to replica {replicaUri}. Timeout per replica: " + + $"{timeoutPerReplica.TotalMilliseconds} ms"); var requestTask = ProcessReplicaAsync(replicaUri, query); var delay = Task.Delay(timeoutPerReplica); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index e13a8f4..3bcb0fa 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -37,15 +37,18 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti { var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--; - Log.Debug($"Starting request to {replicaAddress}. Timeout per replica: {timeoutPerReplica.TotalMilliseconds} ms"); + Log.Debug($"Starting request to {replicaAddress}. Timeout per replica: " + + $"{timeoutPerReplica.TotalMilliseconds} ms"); pendingRequests.Add(ProcessReplicaAsync(replicaAddress, query)); - - var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, timeoutPerReplica, throwIfTimeout: false); - + var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, timeoutPerReplica); _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; - if (completedTask is not { IsCompletedSuccessfully: true }) continue; + if (completedTask is not { IsCompletedSuccessfully: true }) + { + continue; + } + Log.Info($"Replica responded successfully in {stopwatch.ElapsedMilliseconds} ms"); return await completedTask; } @@ -63,9 +66,13 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti } Log.Debug($"Waiting for pending replicas. Remaining: {pendingRequests.Count}"); - var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, remainingTime, throwIfTimeout: false); - - if (completedTask is not { IsCompletedSuccessfully: true }) continue; + var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, remainingTime); + + if (completedTask is not { IsCompletedSuccessfully: true }) + { + continue; + } + Log.Info($"Pending replica responded successfully in {stopwatch.ElapsedMilliseconds} ms"); return await completedTask; }