From 052adeae8bb97b5c4ff047a3b57a62b7fa42ecc1 Mon Sep 17 00:00:00 2001 From: Anton Savitskikh Date: Mon, 23 Feb 2026 23:01:58 +0500 Subject: [PATCH 1/2] Implemented all clients without advanced scheduling --- .../Clients/ParallelClusterClient.cs | 28 +++++++- .../Clients/RoundRobinClusterClient.cs | 42 ++++++++++- .../Clients/SmartClusterClient.cs | 72 ++++++++++++++++++- 3 files changed, 136 insertions(+), 6 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..bfd0809 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,9 +15,31 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(addr => CreateRequest(addr + "?query=" + query)) + .Select(req => ProcessRequestAsync(req)) + .ToList(); + + var overallTimeoutTask = Task.Delay(timeout); + + while (tasks.Count > 0) + { + var completedTask = await Task.WhenAny(tasks.Concat(new[] { overallTimeoutTask })); + + if (completedTask == overallTimeoutTask) + throw new TimeoutException("No replica responded within timeout"); + + tasks.Remove((Task)completedTask); + + if (completedTask.IsCompletedSuccessfully) + return await (Task)completedTask; + + Log.ErrorFormat("Replica failed: {0}", completedTask.Exception?.InnerException?.Message); + } + + throw new Exception("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..ec5b14f 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,9 +15,45 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var timeoutPerRequest = new TimeSpan(); + var stopwatch = new Stopwatch(); + var remainingReplicas = ReplicaAddresses.Length; + + stopwatch.Start(); + + foreach (var replica in ReplicaAddresses) + { + var remainingTime = timeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException("No replica responded within overall timeout"); + + timeoutPerRequest = remainingTime.Divide(remainingReplicas); + + remainingReplicas--; + + var webRequest = CreateRequest(replica + "?query=" + query); + Log.InfoFormat($"Processing {webRequest.RequestUri}"); + + var requestTask = ProcessRequestAsync(webRequest); + + await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest)); + + if (!requestTask.IsCompleted) + continue; + + try + { + return requestTask.Result; + } + catch (Exception ex) + { + Log.ErrorFormat($"Replica {replica} failed: {ex.Message}"); + } + } + + throw new TimeoutException("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..643e189 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,9 +15,75 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var tasks = new List>(); + var replicasToCall = ReplicaAddresses.Length; + + var timeoutPerReplica = timeout.Divide(replicasToCall); + + foreach(var replica in ReplicaAddresses) + { + if (stopwatch.Elapsed >= timeout) + break; + + var webRequest = CreateRequest(replica + "?query=" + query); + Log.InfoFormat($"Processing {webRequest.RequestUri}"); + + var requestTask = ProcessRequestAsync(webRequest); + tasks.Add(requestTask); + + + var weight = ReplicaAddresses.Length - replicasToCall + 1; + var targetTimeForNextStart = timeoutPerReplica * weight; + + var waitTime = targetTimeForNextStart - stopwatch.Elapsed; + var waitTask = Task.Delay(waitTime); + + replicasToCall--; + + if (waitTime > TimeSpan.Zero) + { + var completedTask = await Task.WhenAny(tasks.Concat(new[] { waitTask })); + + if(completedTask == waitTask) + continue; + + tasks.Remove((Task)completedTask); + + try + { + return await (Task)completedTask; + } + catch (Exception ex) + { + Log.ErrorFormat($"Replica request failed: {ex.Message}"); + } + } + } + + while (tasks.Count > 0 && stopwatch.Elapsed < timeout) + { + var remainingTime = timeout - stopwatch.Elapsed; + var waitTask = Task.Delay(remainingTime); + + var completedTask = await Task.WhenAny(tasks.Concat(new[] { waitTask })); + + if (completedTask == waitTask) + break; + + try + { + return await (Task)completedTask; + } + catch (Exception ex) + { + Log.ErrorFormat($"Replica request failed: {ex.Message}"); + } + } + + throw new TimeoutException("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); From f513e4d3f9732e8815a02fa3b8dd512c1f7646ab Mon Sep 17 00:00:00 2001 From: Anton Savitskikh Date: Fri, 27 Feb 2026 22:21:51 +0500 Subject: [PATCH 2/2] Implemented advanced scheduling --- .../Clients/ClusterClientBase.cs | 17 ++++++++++++++ .../Clients/RoundRobinClusterClient.cs | 11 ++++++--- .../Clients/SmartClusterClient.cs | 23 ++++++++++++++----- 3 files changed, 42 insertions(+), 9 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..a1406d8 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -12,11 +15,25 @@ public abstract class ClusterClientBase { protected string[] ReplicaAddresses { get; set; } + private static readonly ConcurrentDictionary _responseTimes + = new ConcurrentDictionary(); + protected ClusterClientBase(string[] replicaAddresses) { ReplicaAddresses = replicaAddresses; } + protected IEnumerable SortedReplicas => + ReplicaAddresses.OrderBy(r => _responseTimes.GetOrAdd(r, _ => 1000)); + + protected void RecordResponse(string replica, long elapsedMs, bool success) + { + if (success) + _responseTimes.AddOrUpdate(replica, elapsedMs, (_, avg) => (avg + elapsedMs) / 2); + else + _responseTimes.AddOrUpdate(replica, 2000, (_, avg) => Math.Min(avg * 2, 30000)); + } + public abstract Task ProcessRequestAsync(string query, TimeSpan timeout); protected abstract ILog Log { get; } diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index ec5b14f..e4714e1 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -23,7 +23,7 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti stopwatch.Start(); - foreach (var replica in ReplicaAddresses) + foreach (var replica in SortedReplicas) { var remainingTime = timeout - stopwatch.Elapsed; if (remainingTime <= TimeSpan.Zero) @@ -38,17 +38,22 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti var requestTask = ProcessRequestAsync(webRequest); - await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest)); + var completedTask = await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest)); - if (!requestTask.IsCompleted) + if (completedTask != requestTask) + { + RecordResponse(replica, 0, false); continue; + } try { + RecordResponse(replica, stopwatch.ElapsedMilliseconds, true); return requestTask.Result; } catch (Exception ex) { + RecordResponse(replica, 0, false); Log.ErrorFormat($"Replica {replica} failed: {ex.Message}"); } } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index 643e189..026a0c5 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -18,12 +18,14 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { var stopwatch = Stopwatch.StartNew(); - var tasks = new List>(); + var tasks = new Dictionary, string>(); var replicasToCall = ReplicaAddresses.Length; - var timeoutPerReplica = timeout.Divide(replicasToCall); + var ordered = SortedReplicas.ToList(); + var timeoutPerReplica = timeout.Divide(ordered.Count); + var replicasLeft = ordered.Count; - foreach(var replica in ReplicaAddresses) + foreach (var replica in ordered) { if (stopwatch.Elapsed >= timeout) break; @@ -32,7 +34,7 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti Log.InfoFormat($"Processing {webRequest.RequestUri}"); var requestTask = ProcessRequestAsync(webRequest); - tasks.Add(requestTask); + tasks[requestTask] = replica; var weight = ReplicaAddresses.Length - replicasToCall + 1; @@ -45,19 +47,22 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti if (waitTime > TimeSpan.Zero) { - var completedTask = await Task.WhenAny(tasks.Concat(new[] { waitTask })); + var completedTask = await Task.WhenAny(tasks.Keys.Concat(new[] { waitTask })); if(completedTask == waitTask) continue; + var addr = tasks[(Task)completedTask]; tasks.Remove((Task)completedTask); try { + RecordResponse(addr, stopwatch.ElapsedMilliseconds, true); return await (Task)completedTask; } catch (Exception ex) { + RecordResponse(addr, 0, false); Log.ErrorFormat($"Replica request failed: {ex.Message}"); } } @@ -68,17 +73,23 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti var remainingTime = timeout - stopwatch.Elapsed; var waitTask = Task.Delay(remainingTime); - var completedTask = await Task.WhenAny(tasks.Concat(new[] { waitTask })); + var completedTask = await Task.WhenAny(tasks.Keys.Concat(new[] { waitTask })); if (completedTask == waitTask) break; + var addr = tasks[(Task)completedTask]; + tasks.Remove((Task)completedTask); + + try { + RecordResponse(addr, stopwatch.ElapsedMilliseconds, true); return await (Task)completedTask; } catch (Exception ex) { + RecordResponse(addr, 0, false); Log.ErrorFormat($"Replica request failed: {ex.Message}"); } }