diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..5268c34 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -10,12 +12,48 @@ namespace ClusterClient.Clients { public abstract class ClusterClientBase { - protected string[] ReplicaAddresses { get; set; } + protected string[] ReplicaAddresses { get; } + private readonly object statsLock = new (); + private readonly Dictionary stats = new(); + private readonly Dictionary originalIndex; + protected ClusterClientBase(string[] replicaAddresses) { ReplicaAddresses = replicaAddresses; + originalIndex = replicaAddresses + .Select((a, i) => (Addr: a, Index: i)) + .ToDictionary(x => x.Addr, x => x.Index); + } + + protected string[] GetReplicasOrderedBySpeed() + { + lock (statsLock) + { + return ReplicaAddresses + .OrderBy(a => stats.TryGetValue(a, out var s) ? s.AvgMs : double.MaxValue) + .ThenBy(a => originalIndex[a]) + .ToArray(); + } } + + protected void RecordReplicaTime(string address, long elapsedMs) + { + lock (statsLock) + { + if (!stats.TryGetValue(address, out var s)) + { + stats[address] = (elapsedMs, 1); + return; + } + var newAvg = (s.AvgMs * s.Count + elapsedMs) / (s.Count + 1); + stats[address] = (newAvg, s.Count + 1); + } + } + + protected void RecordReplicaPenalty(string address, long penaltyMs) + => RecordReplicaTime(address, penaltyMs); + public abstract Task ProcessRequestAsync(string query, TimeSpan timeout); protected abstract ILog Log { get; } diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..37045db 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,23 +1,44 @@ using System; -using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class ParallelClusterClient : ClusterClientBase + public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var stopwatch = Stopwatch.StartNew(); + var tasks = ReplicaAddresses + .Select(addr => + { + var request = CreateRequest(addr + "?query=" + query); + return ProcessRequestAsync(request); + }) + .ToList(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + Exception lastError = null; + while (tasks.Count > 0) + { + var remaining = timeout - stopwatch.Elapsed; + var timeoutTask = Task.Delay(remaining); + var completed = await Task.WhenAny(tasks.Append(timeoutTask)); + if (completed == timeoutTask) + throw new TimeoutException(); + + var finished = (Task)completed; + if (finished.Status == TaskStatus.RanToCompletion) + return finished.Result; + + try { await finished; } + catch (Exception ex) { lastError = ex; } + tasks.Remove(finished); + } + throw lastError ?? new Exception("Ни одна реплика не ответила"); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RandomClusterClient.cs b/homework 2/ClusterClient/Clients/RandomClusterClient.cs index 4de13f3..9e64a46 100644 --- a/homework 2/ClusterClient/Clients/RandomClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RandomClusterClient.cs @@ -18,8 +18,6 @@ public override async Task ProcessRequestAsync(string query, TimeSpan ti var uri = ReplicaAddresses[random.Next(ReplicaAddresses.Length)]; var webRequest = CreateRequest(uri + "?query=" + query); - - Log.InfoFormat($"Processing {webRequest.RequestUri}"); var resultTask = ProcessRequestAsync(webRequest); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..9e69bcc 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,23 +1,51 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Diagnostics; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class RoundRobinClusterClient : ClusterClientBase + public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var stopwatch = Stopwatch.StartNew(); + Exception lastError = null; + var replicas = GetReplicasOrderedBySpeed(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + for (var i = 0; i < replicas.Length; i++) + { + var remaining = timeout - stopwatch.Elapsed; + var remainingReplicas = replicas.Length - i; + var slice = TimeSpan + .FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas)); + var addr = replicas[i]; + var request = CreateRequest(addr+ "?query=" + query); + var attemptSw = Stopwatch.StartNew(); + var task = ProcessRequestAsync(request); + var completed = await Task.WhenAny(task, Task.Delay(slice)); + attemptSw.Stop(); + if (completed != task) + { + RecordReplicaPenalty(addr, (long)slice.TotalMilliseconds); + continue; + } + + try + { + var result = await task; + RecordReplicaTime(addr, attemptSw.ElapsedMilliseconds); + return result; + } + catch (Exception ex) + { + lastError = ex; + RecordReplicaPenalty(addr, Math.Max(1, attemptSw.ElapsedMilliseconds * 2)); + } + } + throw lastError ?? new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..8e8236b 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,23 +1,75 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class SmartClusterClient : ClusterClientBase + public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var stopwatch = Stopwatch.StartNew(); + var runningTasks = new List<(string Addr, Task Task, Stopwatch Timer)>(); + Exception lastError = null; + var replicas = GetReplicasOrderedBySpeed(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + for (var i = 0; i < replicas.Length; i++) + { + var remaining = timeout - stopwatch.Elapsed; + var remainingReplicas = replicas.Length - i; + var slice = TimeSpan + .FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas)); + var addr = replicas[i]; + var request = CreateRequest(addr + "?query=" + query); + + var tmr = Stopwatch.StartNew(); + var task = ProcessRequestAsync(request); + runningTasks.Add((addr, task, tmr)); + var sliceEnd = stopwatch.Elapsed + slice; + + while (true) + { + for (var k = runningTasks.Count - 1; k >= 0; k--) + { + var runningTask = runningTasks[k]; + if (!runningTask.Task.IsCompleted) continue; + + runningTasks.RemoveAt(k); + runningTask.Timer.Stop(); + try + { + var result = await runningTask.Task; + RecordReplicaTime(runningTask.Addr, runningTask.Timer.ElapsedMilliseconds); + return result; + } + catch (Exception ex) + { + lastError = ex; + RecordReplicaPenalty(runningTask.Addr, Math.Max(1, runningTask.Timer.ElapsedMilliseconds * 2)); + } + } + + var now = stopwatch.Elapsed; + var leftTotal = timeout - now; + var leftSlice = sliceEnd - now; + if (leftSlice <= TimeSpan.Zero) + break; + + if (runningTasks.Count == 0) + break; + + var delay = Task.Delay(leftSlice < leftTotal ? leftSlice : leftTotal); + var any = await Task.WhenAny(runningTasks.Select(x => x.Task).Append(delay)); + if (any == delay) + break; + } + } + throw lastError ?? new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file