From 4e9c4ac567ad5d85fa2bd3e993fab33c9e591d5d Mon Sep 17 00:00:00 2001 From: Krotkaya Date: Wed, 25 Feb 2026 21:35:39 +0500 Subject: [PATCH 1/2] =?UTF-8?q?=D0=A0=D0=B5=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0?= =?UTF-8?q?=D1=86=D0=B8=D1=8F=20=D0=B1=D0=B5=D0=B7=20=D0=B7=D0=B0=D0=B4?= =?UTF-8?q?=D0=B0=D1=87=D0=B8=20=D1=81=D0=BE=20=D0=B7=D0=B2=D0=B5=D0=B7?= =?UTF-8?q?=D0=B4=D0=BE=D1=87=D0=BA=D0=BE=D0=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Clients/ParallelClusterClient.cs | 39 +++++++++--- .../Clients/RoundRobinClusterClient.cs | 36 +++++++---- .../Clients/SmartClusterClient.cs | 59 ++++++++++++++++--- 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..37045db 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,23 +1,44 @@ using System; -using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class ParallelClusterClient : ClusterClientBase + public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var stopwatch = Stopwatch.StartNew(); + var tasks = ReplicaAddresses + .Select(addr => + { + var request = CreateRequest(addr + "?query=" + query); + return ProcessRequestAsync(request); + }) + .ToList(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + Exception lastError = null; + while (tasks.Count > 0) + { + var remaining = timeout - stopwatch.Elapsed; + var timeoutTask = Task.Delay(remaining); + var completed = await Task.WhenAny(tasks.Append(timeoutTask)); + if (completed == timeoutTask) + throw new TimeoutException(); + + var finished = (Task)completed; + if (finished.Status == TaskStatus.RanToCompletion) + return finished.Result; + + try { await finished; } + catch (Exception ex) { lastError = ex; } + tasks.Remove(finished); + } + throw lastError ?? new Exception("Ни одна реплика не ответила"); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..30bb6d4 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,23 +1,39 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Diagnostics; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class RoundRobinClusterClient : ClusterClientBase + public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var stopwatch = Stopwatch.StartNew(); + Exception lastError = null; - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + for (var i = 0; i < ReplicaAddresses.Length; i++) + { + var remaining = timeout - stopwatch.Elapsed; + var remainingReplicas = ReplicaAddresses.Length - i; + var slice = TimeSpan + .FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas)); + + var request = CreateRequest(ReplicaAddresses[i] + "?query=" + query); + var task = ProcessRequestAsync(request); + var completed = await Task.WhenAny(task, Task.Delay(slice)); + if (completed != task) + continue; + + if (task.Status == TaskStatus.RanToCompletion) + return task.Result; + + try { await task; } + catch (Exception ex) { lastError = ex; } + } + throw lastError ?? new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..a3396ef 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,23 +1,66 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class SmartClusterClient : ClusterClientBase + public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var stopwatch = Stopwatch.StartNew(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + var runningTasks = new List>(); + Exception lastError = null; + + for (var i = 0; i < ReplicaAddresses.Length; i++) + { + var remaining = timeout - stopwatch.Elapsed; + var remainingReplicas = ReplicaAddresses.Length - i; + var slice = TimeSpan + .FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas)); + var request = CreateRequest(ReplicaAddresses[i] + "?query=" + query); + + var requestTask = ProcessRequestAsync(request); + runningTasks.Add(requestTask); + var sliceEnd = stopwatch.Elapsed + slice; + + while (true) + { + for (var k = runningTasks.Count - 1; k >= 0; k--) + { + var runningTask = runningTasks[k]; + if (!runningTask.IsCompleted) continue; + if (runningTask.Status == TaskStatus.RanToCompletion) + return runningTask.Result; + + try { await runningTask; } + catch (Exception ex) { lastError = ex; } + runningTasks.RemoveAt(k); + } + + var now = stopwatch.Elapsed; + var leftTotal = timeout - now; + + var leftSlice = sliceEnd - now; + if (leftSlice <= TimeSpan.Zero) + break; + + if (runningTasks.Count == 0) + break; + + var delay = Task.Delay(leftSlice < leftTotal ? leftSlice : leftTotal); + var any = await Task.WhenAny(runningTasks.Append(delay)); + if (any == delay) + break; + } + } + throw lastError ?? new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file From aae9de9b84ff7f4aef9007e8468fc215251dff58 Mon Sep 17 00:00:00 2001 From: Krotkaya Date: Wed, 25 Feb 2026 22:10:23 +0500 Subject: [PATCH 2/2] =?UTF-8?q?=D0=A0=D0=B5=D0=B0=D0=BB=D0=B8=D0=B7=D0=BE?= =?UTF-8?q?=D0=B2=D0=B0=D0=BD=D0=B0=20=D0=B7=D0=B0=D0=B4=D0=B0=D1=87=D0=B0?= =?UTF-8?q?=20=D1=81=D0=BE=20=D0=B7=D0=B2=D0=B5=D0=B7=D0=B4=D0=BE=D1=87?= =?UTF-8?q?=D0=BA=D0=BE=D0=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Clients/ClusterClientBase.cs | 40 ++++++++++++++++++- .../Clients/RandomClusterClient.cs | 2 - .../Clients/RoundRobinClusterClient.cs | 32 ++++++++++----- .../Clients/SmartClusterClient.cs | 39 +++++++++++------- 4 files changed, 85 insertions(+), 28 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..5268c34 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -10,12 +12,48 @@ namespace ClusterClient.Clients { public abstract class ClusterClientBase { - protected string[] ReplicaAddresses { get; set; } + protected string[] ReplicaAddresses { get; } + private readonly object statsLock = new (); + private readonly Dictionary stats = new(); + private readonly Dictionary originalIndex; + protected ClusterClientBase(string[] replicaAddresses) { ReplicaAddresses = replicaAddresses; + originalIndex = replicaAddresses + .Select((a, i) => (Addr: a, Index: i)) + .ToDictionary(x => x.Addr, x => x.Index); + } + + protected string[] GetReplicasOrderedBySpeed() + { + lock (statsLock) + { + return ReplicaAddresses + .OrderBy(a => stats.TryGetValue(a, out var s) ? s.AvgMs : double.MaxValue) + .ThenBy(a => originalIndex[a]) + .ToArray(); + } } + + protected void RecordReplicaTime(string address, long elapsedMs) + { + lock (statsLock) + { + if (!stats.TryGetValue(address, out var s)) + { + stats[address] = (elapsedMs, 1); + return; + } + var newAvg = (s.AvgMs * s.Count + elapsedMs) / (s.Count + 1); + stats[address] = (newAvg, s.Count + 1); + } + } + + protected void RecordReplicaPenalty(string address, long penaltyMs) + => RecordReplicaTime(address, penaltyMs); + public abstract Task ProcessRequestAsync(string query, TimeSpan timeout); protected abstract ILog Log { get; } diff --git a/homework 2/ClusterClient/Clients/RandomClusterClient.cs b/homework 2/ClusterClient/Clients/RandomClusterClient.cs index 4de13f3..9e64a46 100644 --- a/homework 2/ClusterClient/Clients/RandomClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RandomClusterClient.cs @@ -18,8 +18,6 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti var uri = ReplicaAddresses[random.Next(ReplicaAddresses.Length)]; var webRequest = CreateRequest(uri + "?query=" + query); - - Log.InfoFormat($"Processing {webRequest.RequestUri}"); var resultTask = ProcessRequestAsync(webRequest); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 30bb6d4..9e69bcc 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -11,25 +11,37 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti { var stopwatch = Stopwatch.StartNew(); Exception lastError = null; + var replicas = GetReplicasOrderedBySpeed(); - for (var i = 0; i < ReplicaAddresses.Length; i++) + for (var i = 0; i < replicas.Length; i++) { var remaining = timeout - stopwatch.Elapsed; - var remainingReplicas = ReplicaAddresses.Length - i; + var remainingReplicas = replicas.Length - i; var slice = TimeSpan .FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas)); - - var request = CreateRequest(ReplicaAddresses[i] + "?query=" + query); + var addr = replicas[i]; + var request = CreateRequest(addr+ "?query=" + query); + var attemptSw = Stopwatch.StartNew(); var task = ProcessRequestAsync(request); var completed = await Task.WhenAny(task, Task.Delay(slice)); + attemptSw.Stop(); if (completed != task) - continue; - - if (task.Status == TaskStatus.RanToCompletion) - return task.Result; + { + RecordReplicaPenalty(addr, (long)slice.TotalMilliseconds); + continue; + } - try { await task; } - catch (Exception ex) { lastError = ex; } + try + { + var result = await task; + RecordReplicaTime(addr, attemptSw.ElapsedMilliseconds); + return result; + } + catch (Exception ex) + { + lastError = ex; + RecordReplicaPenalty(addr, Math.Max(1, attemptSw.ElapsedMilliseconds * 2)); + } } throw lastError ?? new TimeoutException(); } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index a3396ef..8e8236b 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -12,20 +12,22 @@ public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(r public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { var stopwatch = Stopwatch.StartNew(); - - var runningTasks = new List>(); + var runningTasks = new List<(string Addr, Task Task, Stopwatch Timer)>(); Exception lastError = null; + var replicas = GetReplicasOrderedBySpeed(); - for (var i = 0; i < ReplicaAddresses.Length; i++) + for (var i = 0; i < replicas.Length; i++) { var remaining = timeout - stopwatch.Elapsed; - var remainingReplicas = ReplicaAddresses.Length - i; + var remainingReplicas = replicas.Length - i; var slice = TimeSpan .FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas)); - var request = CreateRequest(ReplicaAddresses[i] + "?query=" + query); + var addr = replicas[i]; + var request = CreateRequest(addr + "?query=" + query); - var requestTask = ProcessRequestAsync(request); - runningTasks.Add(requestTask); + var tmr = Stopwatch.StartNew(); + var task = ProcessRequestAsync(request); + runningTasks.Add((addr, task, tmr)); var sliceEnd = stopwatch.Elapsed + slice; while (true) @@ -33,18 +35,25 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti for (var k = runningTasks.Count - 1; k >= 0; k--) { var runningTask = runningTasks[k]; - if (!runningTask.IsCompleted) continue; - if (runningTask.Status == TaskStatus.RanToCompletion) - return runningTask.Result; + if (!runningTask.Task.IsCompleted) continue; - try { await runningTask; } - catch (Exception ex) { lastError = ex; } runningTasks.RemoveAt(k); + runningTask.Timer.Stop(); + try + { + var result = await runningTask.Task; + RecordReplicaTime(runningTask.Addr, runningTask.Timer.ElapsedMilliseconds); + return result; + } + catch (Exception ex) + { + lastError = ex; + RecordReplicaPenalty(runningTask.Addr, Math.Max(1, runningTask.Timer.ElapsedMilliseconds * 2)); + } } - + var now = stopwatch.Elapsed; var leftTotal = timeout - now; - var leftSlice = sliceEnd - now; if (leftSlice <= TimeSpan.Zero) break; @@ -53,7 +62,7 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti break; var delay = Task.Delay(leftSlice < leftTotal ? leftSlice : leftTotal); - var any = await Task.WhenAny(runningTasks.Append(delay)); + var any = await Task.WhenAny(runningTasks.Select(x => x.Task).Append(delay)); if (any == delay) break; }