diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..fcea1de 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,23 +1,44 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class ParallelClusterClient : ClusterClientBase + public class ParallelClusterClient(string[] replicaAddresses) + : ClusterClientBase(replicaAddresses) { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var tasks = ReplicaAddresses + .Select(address => + { + var request = CreateRequest(address + "?query=" + query); + Log.InfoFormat($"Processing {request.RequestUri}"); + return ProcessRequestAsync(request); + }) + .ToList(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + var timeoutTask = Task.Delay(timeout); + + while (tasks.Count > 0) + { + var completed = await Task.WhenAny(tasks.Append(timeoutTask)); + + if (completed == timeoutTask) + throw new TimeoutException(); + + var finishedTask = (Task)completed; + tasks.Remove(finishedTask); + + if (finishedTask.Status == TaskStatus.RanToCompletion) + return finishedTask.Result; + } + + throw new TimeoutException(); } - protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); + protected override ILog Log => + LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/ReplicaStatisticsEntry.cs b/homework 2/ClusterClient/Clients/ReplicaStatisticsEntry.cs new file mode 100644 index 0000000..af04a12 --- /dev/null +++ b/homework 2/ClusterClient/Clients/ReplicaStatisticsEntry.cs @@ -0,0 +1,19 @@ +using System.Collections.Generic; +using System.Linq; + +namespace ClusterClient.Clients; + +public class ReplicaStatisticsEntry(string address, int maxHistory = 10) +{ + public string Address { get; } = address; + private readonly Queue lastTimes = new(); + + public void RecordResponseTime(long ms) + { + lastTimes.Enqueue(ms); + if (lastTimes.Count > maxHistory) + lastTimes.Dequeue(); + } + + public double AverageTime => lastTimes.Any() ? lastTimes.Average() : double.MaxValue; +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..4694833 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -7,15 +8,54 @@ namespace ClusterClient.Clients { - public class RoundRobinClusterClient : ClusterClientBase + public class RoundRobinClusterClient: ClusterClientBase { + private readonly Dictionary replicaStats; + public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + replicaStats = replicaAddresses.ToDictionary( + addr => addr, + addr => new ReplicaStatisticsEntry(addr) + ); } - - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var start = Stopwatch.StartNew(); + + var orderedReplicas = replicaStats.Values + .OrderBy(x => x.AverageTime) + .Select(x => x.Address) + .ToArray(); + + for (var i = 0; i < orderedReplicas.Length; i++) + { + var remainingTime = timeout - start.Elapsed; + + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException(); + + var remainingReplicas = orderedReplicas.Length - i; + var replicaTimeout = TimeSpan.FromMilliseconds( + remainingTime.TotalMilliseconds / remainingReplicas); + + var uri = orderedReplicas[i] + "?query=" + query; + var webRequest = CreateRequest(uri); + + Log.InfoFormat($"Processing {webRequest.RequestUri}, timeout {replicaTimeout}"); + + var requestTask = ProcessRequestAsync(webRequest); + var delayTask = Task.Delay(replicaTimeout); + + var completed = await Task.WhenAny(requestTask, delayTask); + + if (completed != requestTask) continue; + if (!requestTask.IsFaulted) + return requestTask.Result; + } + + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..f931a38 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,15 +9,95 @@ namespace ClusterClient.Clients { public class SmartClusterClient : ClusterClientBase { + private readonly Dictionary replicaStats; + public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + replicaStats = replicaAddresses.ToDictionary( + addr => addr, + addr => new ReplicaStatisticsEntry(addr) + ); } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var deadline = DateTime.UtcNow + timeout; + var runningTasks = new List<(Task task, string address, Stopwatch timer)>(); + + var orderedReplicas = replicaStats.Values + .OrderBy(x => x.AverageTime) + .Select(x => x.Address) + .ToArray(); + + var addressesLeft = orderedReplicas.Length; + + foreach (var replica in orderedReplicas) + { + var remaining = deadline - DateTime.UtcNow; + if (remaining <= TimeSpan.Zero) + throw new TimeoutException(); + + var replicaTimeout = remaining / addressesLeft; + + var request = CreateRequest($"{replica}?query={query}"); + Log.InfoFormat($"Processing {request.RequestUri}"); + + var timer = Stopwatch.StartNew(); + var task = ProcessRequestAsync(request); + runningTasks.Add((task, replica, timer)); + + var finished = await WaitOneAsync(runningTasks, replicaTimeout); + if (finished != null) + { + var (resultTask, resultReplica, resultTimer) = finished.Value; + resultTimer.Stop(); + replicaStats[resultReplica].RecordResponseTime(resultTimer.ElapsedMilliseconds); + return await resultTask; + } + + addressesLeft--; + } + + while (runningTasks.Count > 0) + { + var remaining = deadline - DateTime.UtcNow; + if (remaining <= TimeSpan.Zero) + throw new TimeoutException(); + + var finished = await WaitOneAsync(runningTasks, remaining); + if (finished == null) continue; + + var (resultTask, resultReplica, resultTimer) = finished.Value; + resultTimer.Stop(); + replicaStats[resultReplica].RecordResponseTime(resultTimer.ElapsedMilliseconds); + return await resultTask; + } + + throw new TimeoutException(); } - protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + private async Task<(Task, string, Stopwatch)?> WaitOneAsync( + List<(Task task, string address, Stopwatch timer)> runningTasks, + TimeSpan timeout) + { + if (timeout <= TimeSpan.Zero) + return null; + + var delayTask = Task.Delay(timeout); + var tasksList = runningTasks.Select(x => x.task).Append(delayTask).ToList(); + + var completed = await Task.WhenAny(tasksList); + + if (completed == delayTask) + return null; + + var finishedTask = (Task)completed; + var result = runningTasks.First(x => x.task == finishedTask); + runningTasks.Remove(result); + + return finishedTask.IsCompletedSuccessfully ? (finishedTask, result.address, result.timer) : null; + } } -} +} \ No newline at end of file