From 997d8a682c08b7ca9b8ddd93d5bc63ee1812a41d Mon Sep 17 00:00:00 2001 From: alex_the_nugget <2005kan@mail.ru> Date: Wed, 25 Feb 2026 23:15:52 +0500 Subject: [PATCH 1/2] =?UTF-8?q?=D0=A1=D0=B0=D0=BC=D0=BE=D1=80=D0=BE=D0=B4?= =?UTF-8?q?=D0=BE=D0=B2=D0=B0=20=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B0=D0=BD?= =?UTF-8?q?=D0=B4=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Clients/ParallelClusterClient.cs | 30 ++++++++++++- .../Clients/RoundRobinClusterClient.cs | 32 +++++++++++++- .../Clients/SmartClusterClient.cs | 42 ++++++++++++++++++- 3 files changed, 98 insertions(+), 6 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..312e45d 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -13,9 +13,35 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(addr => CreateRequest($"{addr}?query={query}")) + .Select(ProcessRequestAsync) + .ToList(); + + var timeoutTask = Task.Delay(timeout); + + var tasksToWait = new List(tasks); + tasksToWait.Add(timeoutTask); + + while (tasksToWait.Count > 1) + { + var winner = await Task.WhenAny(tasksToWait); + if (winner == timeoutTask) break; + + var completedTask = (Task)winner; + tasksToWait.Remove(completedTask); + try + { + return await completedTask; + } + catch + { + Log.Warn("Failed to process replica."); + } + } + throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..e7a8cb9 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -13,9 +14,36 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var timer = Stopwatch.StartNew(); + + for (var i = 0; i < ReplicaAddresses.Length; i++) + { + var timeRemain = timeout - timer.Elapsed; + if (timeRemain <= TimeSpan.Zero) + break; + + var replicaTimeout = timeRemain / (ReplicaAddresses.Length - i); + var uri = $"{ReplicaAddresses[i]}?query={query}"; + var request = CreateRequest(uri); + var completedTask = ProcessRequestAsync(request); + + await Task.WhenAny(completedTask, Task.Delay(replicaTimeout)); + + if (!completedTask.IsCompleted) + continue; + + try + { + return await completedTask; + } + catch + { + Log.Warn("Failed to process replica."); + } + } + throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..4261b54 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -13,9 +14,46 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = new List>(); + var timer = Stopwatch.StartNew(); + + for (var i = 0; i < ReplicaAddresses.Length; i++) + { + var timeRemain = timeout - timer.Elapsed; + if (timeRemain <= TimeSpan.Zero) + break; + + var replicaTimeout = timeRemain / (ReplicaAddresses.Length - i); + var uri = $"{ReplicaAddresses[i]}?query={query}"; + var request = CreateRequest(uri); + tasks.Add(ProcessRequestAsync(request)); + + var timeoutTask = Task.Delay(replicaTimeout); + + while (tasks.Count > 0) + { + var tasksToWait = new List(tasks); + tasksToWait.Add(timeoutTask); + + var winner = await Task.WhenAny(tasksToWait); + if (winner == timeoutTask) break; + + var completedTask = (Task)winner; + tasks.Remove(completedTask); + try + { + return await completedTask; + } + catch + { + Log.Warn("Failed to process replica."); + if (tasks.Count == 0) break; + } + } + } + throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); From a40f81a2808fc356fa541fd3ae3317aa7760dbc1 Mon Sep 17 00:00:00 2001 From: alex_the_nugget <2005kan@mail.ru> Date: Thu, 26 Feb 2026 01:10:32 +0500 Subject: [PATCH 2/2] =?UTF-8?q?=D0=97=D0=B0=D0=B4=D0=B0=D1=87=D0=B0=20?= =?UTF-8?q?=D1=81=D0=BE=20=D0=B7=D0=B2=D0=B5=D0=B7=D0=B4=D0=BE=D1=87=D0=BA?= =?UTF-8?q?=D0=BE=D0=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Clients/RoundRobinClusterClient.cs | 39 +++++++++++++++-- .../Clients/SmartClusterClient.cs | 43 ++++++++++++++++--- 2 files changed, 72 insertions(+), 10 deletions(-) diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index e7a8cb9..9bf6f14 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,7 +1,9 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -16,18 +18,23 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { + var sortedReplicas = ReplicaAddresses + .OrderBy(addr => ReplicaTimes.TryGetValue(addr, out var time) ? time : 0) + .ToArray(); + var timer = Stopwatch.StartNew(); - for (var i = 0; i < ReplicaAddresses.Length; i++) + for (var i = 0; i < sortedReplicas.Length; i++) { var timeRemain = timeout - timer.Elapsed; if (timeRemain <= TimeSpan.Zero) break; - var replicaTimeout = timeRemain / (ReplicaAddresses.Length - i); - var uri = $"{ReplicaAddresses[i]}?query={query}"; + var currentReplica = sortedReplicas[i]; + var replicaTimeout = timeRemain / (sortedReplicas.Length - i); + var uri = $"{currentReplica}?query={query}"; var request = CreateRequest(uri); - var completedTask = ProcessRequestAsync(request); + var completedTask = ProcessAndMeasureAsync(currentReplica, request); await Task.WhenAny(completedTask, Task.Delay(replicaTimeout)); @@ -45,7 +52,31 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti } throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); } + + private async Task ProcessAndMeasureAsync(string replicaAddress, WebRequest request) + { + var timer = Stopwatch.StartNew(); + try + { + var result = await base.ProcessRequestAsync(request); + timer.Stop(); + + ReplicaTimes.AddOrUpdate( + replicaAddress, + timer.ElapsedMilliseconds, + (key, oldTime) => oldTime == double.MaxValue ? timer.ElapsedMilliseconds : (oldTime * 0.8 + timer.ElapsedMilliseconds * 0.2) + ); + + return result; + } + catch + { + ReplicaTimes[replicaAddress] = double.MaxValue; + throw; + } + } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + private readonly ConcurrentDictionary ReplicaTimes = new ConcurrentDictionary(); } } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index 4261b54..81dec3d 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,7 +1,9 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -16,19 +18,24 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { + var sortedReplicas = ReplicaAddresses + .OrderBy(addr => ReplicaTimes.TryGetValue(addr, out var time) ? time : 0) + .ToArray(); + var tasks = new List>(); var timer = Stopwatch.StartNew(); - for (var i = 0; i < ReplicaAddresses.Length; i++) + for (var i = 0; i < sortedReplicas.Length; i++) { var timeRemain = timeout - timer.Elapsed; if (timeRemain <= TimeSpan.Zero) break; - - var replicaTimeout = timeRemain / (ReplicaAddresses.Length - i); - var uri = $"{ReplicaAddresses[i]}?query={query}"; + + var currentReplica = sortedReplicas[i]; + var replicaTimeout = timeRemain / (sortedReplicas.Length - i); + var uri = $"{sortedReplicas[i]}?query={query}"; var request = CreateRequest(uri); - tasks.Add(ProcessRequestAsync(request)); + tasks.Add(ProcessAndMeasureAsync(currentReplica, request)); var timeoutTask = Task.Delay(replicaTimeout); @@ -55,7 +62,31 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti } throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); } - + + private async Task ProcessAndMeasureAsync(string replicaAddress, WebRequest request) + { + var timer = Stopwatch.StartNew(); + try + { + var result = await base.ProcessRequestAsync(request); + timer.Stop(); + + ReplicaTimes.AddOrUpdate( + replicaAddress, + timer.ElapsedMilliseconds, + (key, oldTime) => oldTime == double.MaxValue ? timer.ElapsedMilliseconds : (oldTime * 0.8 + timer.ElapsedMilliseconds * 0.2) + ); + + return result; + } + catch + { + ReplicaTimes[replicaAddress] = double.MaxValue; + throw; + } + } + + private readonly ConcurrentDictionary ReplicaTimes = new ConcurrentDictionary(); protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } }