diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..b978679 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -10,6 +13,12 @@ namespace ClusterClient.Clients { public abstract class ClusterClientBase { + private class ReplicaStats + { + public double AverageMs; + public int Count; + } + protected string[] ReplicaAddresses { get; set; } protected ClusterClientBase(string[] replicaAddresses) @@ -19,7 +28,9 @@ protected ClusterClientBase(string[] replicaAddresses) public abstract Task ProcessRequestAsync(string query, TimeSpan timeout); protected abstract ILog Log { get; } - + + private readonly ConcurrentDictionary _stats = []; + protected static HttpWebRequest CreateRequest(string uriStr) { var request = WebRequest.CreateHttp(Uri.EscapeUriString(uriStr)); @@ -40,5 +51,42 @@ protected async Task ProcessRequestAsync(WebRequest request) return result; } } + + protected List OrderReplicas() + { + return ReplicaAddresses + .OrderBy(r => _stats.TryGetValue(r, out var s) ? s.AverageMs : double.MaxValue).ToList(); + } + + protected void UpdateStats(string replica, long elapsedMs) + { + _stats.AddOrUpdate( + replica, + _ => new ReplicaStats { AverageMs = elapsedMs, Count = 1 }, + (_, old) => + { + var newCount = old.Count + 1; + var newAvg = (old.AverageMs * old.Count + elapsedMs) / newCount; + + return new ReplicaStats + { + AverageMs = newAvg, + Count = newCount + }; + }); + } + + protected async Task ProcessWithStatsAsync(string replica, string query) + { + var request = CreateRequest($"{replica}?query={query}"); + var stopwatch = Stopwatch.StartNew(); + + var result = await ProcessRequestAsync(request); + + stopwatch.Stop(); + UpdateStats(replica, stopwatch.ElapsedMilliseconds); + + return result; + } } } \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..670af01 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -13,9 +13,33 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(address => + { + var request = CreateRequest($"{address}?query={query}"); + return ProcessRequestAsync(request); + }) + .ToList(); + + var timeoutTask = Task.Delay(timeout); + + while (tasks.Count != 0) + { + var completed = await Task.WhenAny(tasks.Append(timeoutTask)); + + if (completed == timeoutTask) + throw new TimeoutException(); + + var finishedTask = (Task)completed; + tasks.Remove(finishedTask); + + if (finishedTask.IsCompletedSuccessfully) + return finishedTask.Result; + } + + throw new Exception("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..c592d00 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -13,9 +13,20 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var replicas = OrderReplicas().ToList(); + foreach (var replica in replicas) + { + var task = ProcessWithStatsAsync(replica, query); + + await Task.WhenAny(task, Task.Delay(timeout / ReplicaAddresses.Length)); + + if (task.IsCompletedSuccessfully) + return task.Result; + } + + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..279c558 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -13,9 +13,29 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var replicas = OrderReplicas().ToArray(); + var tasks = new List>(); + var timeoutTask = Task.Delay(timeout); + + foreach (var replica in replicas) + { + var task = ProcessWithStatsAsync(replica, query); + tasks.Add(task); + + await Task.WhenAny(task, Task.Delay(timeout / replicas.Length), timeoutTask); + + if (task.IsCompletedSuccessfully) + return task.Result; + } + + var final = await Task.WhenAny(tasks.Append(timeoutTask)); + + if (final == timeoutTask) + throw new TimeoutException(); + + return await (Task)final; } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));