From 4933a8c41a130e0808bec670333db87783ccfeae Mon Sep 17 00:00:00 2001 From: Balashov2004 Date: Wed, 25 Feb 2026 11:35:18 +0500 Subject: [PATCH 1/2] =?UTF-8?q?=D0=B2=D1=81=D0=B5=20=D0=BA=D1=80=D0=BE?= =?UTF-8?q?=D0=BC=D0=B5=20=D0=B7=D0=B2=D0=B5=D0=B7=D0=B4=D0=BE=D1=87=D0=BA?= =?UTF-8?q?=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Clients/ParallelClusterClient.cs | 30 ++++++++++- .../Clients/RoundRobinClusterClient.cs | 28 +++++++++- .../Clients/SmartClusterClient.cs | 53 +++++++++++++++++-- 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..4f8af8f 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -13,9 +13,35 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(uri => CreateRequest(uri + "?query=" + query)) + .Select(ProcessRequestAsync) + .ToList(); + + var delayTask = Task.Delay(timeout); + while (tasks.Count != 0) + { + var processTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask); + await Task.WhenAny(processTask, delayTask); + if (delayTask.IsCompleted) + { + throw new TimeoutException(); + } + var completedTask = await (Task>)processTask; + tasks.Remove(completedTask); + + try + { + return await completedTask; + } + catch (Exception) + { + Log.Error("Task failed"); + } + } + return null; } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..92d45f8 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -13,9 +13,33 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var start = DateTime.Now; + var i = 0; + + foreach (var replicaAddress in ReplicaAddresses) + { + var elapsed = DateTime.Now - start; + var timeLeft = timeout - elapsed; + var perReplicaTimeout = TimeSpan.FromMilliseconds(timeLeft.TotalMilliseconds / (ReplicaAddresses.Length - i)); + i++; + var request = CreateRequest(replicaAddress + "?query=" + query); + var task = ProcessRequestAsync(request); + var delayTask = Task.Delay(perReplicaTimeout); + var completedTask = await Task.WhenAny(task, delayTask); + if (completedTask == task) + { + try + { + return await task; + } + catch (Exception) + { + } + } + } + throw new TimeoutException($"Request {query} timed out"); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..584bb11 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -13,11 +14,57 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = new List>(); + var perReplicaTimeout = TimeSpan.FromMilliseconds(timeout.TotalMilliseconds / ReplicaAddresses.Length); + var sw = Stopwatch.StartNew(); + + for (int i = 0; i < ReplicaAddresses.Length; i++) + { + var request = CreateRequest(ReplicaAddresses[i] + "?query=" + query); + var currentTask = ProcessRequestAsync(request); + tasks.Add(currentTask); + + while (true) + { + var timePassed = sw.Elapsed; + var timeToNextLaunch = perReplicaTimeout * (i + 1) - timePassed; + + if (i == ReplicaAddresses.Length - 1) + timeToNextLaunch = timeout - timePassed; + + var delayTask = Task.Delay(timeToNextLaunch); + var completedTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask); + + if (completedTask != delayTask) + { + var finishedTask = await (Task>)completedTask; + try + { + return await finishedTask; + } + catch (Exception) + { + tasks.Remove(finishedTask); + if (tasks.Count == 0 || i < ReplicaAddresses.Length - 1) + { + goto next; + } + } + } + else + { + break; + } + } + + next: ; + } + + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file From b733787d99e2bf37662819414bec79da45761299 Mon Sep 17 00:00:00 2001 From: Balashov2004 Date: Wed, 25 Feb 2026 23:51:41 +0500 Subject: [PATCH 2/2] =?UTF-8?q?=D0=B4=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB?= =?UTF-8?q?=20=D0=B8=D1=81=D1=82=D0=BE=D1=80=D0=B8=D1=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Clients/RoundRobinClusterClient.cs | 36 +++++++++++----- .../Clients/SmartClusterClient.cs | 41 +++++++++++++++---- 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 92d45f8..64d8494 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -12,35 +14,49 @@ public class RoundRobinClusterClient : ClusterClientBase public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } + + private static readonly ConcurrentDictionary ReplicaStats = new(); public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - var start = DateTime.Now; - var i = 0; + var sortedReplicas = ReplicaAddresses + .OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0)) + .ToList(); + var sw = Stopwatch.StartNew(); - foreach (var replicaAddress in ReplicaAddresses) + for (int i = 0; i < sortedReplicas.Count; i++) { - var elapsed = DateTime.Now - start; - var timeLeft = timeout - elapsed; - var perReplicaTimeout = TimeSpan.FromMilliseconds(timeLeft.TotalMilliseconds / (ReplicaAddresses.Length - i)); - i++; - var request = CreateRequest(replicaAddress + "?query=" + query); + var remainingReplicas = sortedReplicas.Count - i; + var timeLeft = timeout - sw.Elapsed; + var currentReplicaTimeout = TimeSpan.FromMilliseconds(timeLeft.TotalMilliseconds / remainingReplicas); + var uri = sortedReplicas[i]; + var request = CreateRequest(uri + "?query=" + query); + var requestTimer = Stopwatch.StartNew(); var task = ProcessRequestAsync(request); - var delayTask = Task.Delay(perReplicaTimeout); + var delayTask = Task.Delay(currentReplicaTimeout); var completedTask = await Task.WhenAny(task, delayTask); + if (completedTask == task) { try { - return await task; + var result = await task; + UpdateStats(uri, requestTimer.ElapsedMilliseconds); + return result; } catch (Exception) { + UpdateStats(uri, (long)timeout.TotalMilliseconds); } } } throw new TimeoutException($"Request {query} timed out"); } + + private void UpdateStats(string uri, long time) + { + ReplicaStats.AddOrUpdate(uri, time, (key, oldVal) => (oldVal + time) / 2); + } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index 584bb11..a786381 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -13,18 +14,23 @@ public class SmartClusterClient : ClusterClientBase public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } + + private static readonly ConcurrentDictionary ReplicaStats = new(); public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - var tasks = new List>(); + var sortedReplicas = ReplicaAddresses + .OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0)) + .ToList(); + + var tasks = new List>(); var perReplicaTimeout = TimeSpan.FromMilliseconds(timeout.TotalMilliseconds / ReplicaAddresses.Length); var sw = Stopwatch.StartNew(); - for (int i = 0; i < ReplicaAddresses.Length; i++) + for (int i = 0; i < sortedReplicas.Count; i++) { - var request = CreateRequest(ReplicaAddresses[i] + "?query=" + query); - var currentTask = ProcessRequestAsync(request); - tasks.Add(currentTask); + var request = CreateRequest(sortedReplicas[i] + "?query=" + query); + tasks.Add(ProcessAndMeasureAsync(sortedReplicas[i], request)); while (true) { @@ -39,13 +45,16 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti if (completedTask != delayTask) { - var finishedTask = await (Task>)completedTask; + var finishedTask = await (Task>)completedTask; try { - return await finishedTask; + var response = await finishedTask; + UpdateStats(response.Uri, response.ElapsedMs); + return response.Content; } catch (Exception) { + UpdateStats(sortedReplicas[i], (long)timeout.TotalMilliseconds); tasks.Remove(finishedTask); if (tasks.Count == 0 || i < ReplicaAddresses.Length - 1) { @@ -64,6 +73,24 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti throw new TimeoutException(); } + + private async Task ProcessAndMeasureAsync(string uri, System.Net.WebRequest request) + { + var timer = Stopwatch.StartNew(); + var content = await ProcessRequestAsync(request); + return new ResponseData { Uri = uri, Content = content, ElapsedMs = timer.ElapsedMilliseconds }; + } + private void UpdateStats(string uri, long elapsedMs) + { + ReplicaStats.AddOrUpdate(uri, elapsedMs, (key, oldVal) => (oldVal + elapsedMs) / 2); + } + + private class ResponseData + { + public string Uri { get; set; } + public string Content { get; set; } + public long ElapsedMs { get; set; } + } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); }