diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..bd7e7da 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -9,15 +9,51 @@ namespace ClusterClient.Clients { public class ParallelClusterClient : ClusterClientBase { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public ParallelClusterClient(string[] replicaAddresses) + : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(address => ProcessReplica(address, query)) + .ToList(); + + var timeoutTask = Task.Delay(timeout); + + while (tasks.Any()) + { + var completed = await Task.WhenAny(tasks.Cast().Append(timeoutTask)); + + if (completed == timeoutTask) + throw new TimeoutException(); + + var task = (Task)completed; + tasks.Remove(task); + + if (task.Result != null) + return task.Result; + } + + throw new TimeoutException(); + } + + private async Task ProcessReplica(string address, string query) + { + try + { + var request = CreateRequest(address + "?query=" + query); + Log.InfoFormat($"Processing {request.RequestUri}"); + return await ProcessRequestAsync(request); + } + catch + { + return null; + } } - protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); + protected override ILog Log => + LogManager.GetLogger(typeof(ParallelClusterClient)); } } diff --git a/homework 2/ClusterClient/Clients/ReplicaStats.cs b/homework 2/ClusterClient/Clients/ReplicaStats.cs new file mode 100644 index 0000000..2e85f9b --- /dev/null +++ b/homework 2/ClusterClient/Clients/ReplicaStats.cs @@ -0,0 +1,7 @@ +namespace ClusterClient.Clients; + +public class ReplicaStats +{ + public double AverageResponseTime; + public int Count; +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..d2da93e 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -9,15 +10,105 @@ namespace ClusterClient.Clients { public class RoundRobinClusterClient : ClusterClientBase { - public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) + private readonly Dictionary statistics = new(); + private readonly object statsLock = new(); + + public RoundRobinClusterClient(string[] replicaAddresses) + : base(replicaAddresses) + { + } + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) + { + var startTime = DateTime.UtcNow; + + var orderedAddresses = GetOrderedReplicas(); + + for (int i = 0; i < orderedAddresses.Length; i++) + { + var elapsed = DateTime.UtcNow - startTime; + var remaining = timeout - elapsed; + + if (remaining <= TimeSpan.Zero) + throw new TimeoutException(); + + var remainingReplicas = orderedAddresses.Length - i; + var perReplicaTimeout = TimeSpan.FromMilliseconds( + remaining.TotalMilliseconds / remainingReplicas); + + var address = orderedAddresses[i]; + + var stopwatch = Stopwatch.StartNew(); + var requestTask = ProcessReplica(address, query); + var delayTask = Task.Delay(perReplicaTimeout); + + var completed = await Task.WhenAny(requestTask, delayTask); + + if (completed == requestTask) + { + var result = await requestTask; + if (result != null) + { + stopwatch.Stop(); + UpdateStatistics(address, stopwatch.ElapsedMilliseconds); + return result; + } + } + } + + throw new TimeoutException(); + } + + private async Task ProcessReplica(string address, string query) { + try + { + var request = CreateRequest(address + "?query=" + query); + Log.InfoFormat($"Processing {request.RequestUri}"); + return await ProcessRequestAsync(request); + } + catch + { + return null; + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + private string[] GetOrderedReplicas() { - throw new NotImplementedException(); + lock (statsLock) + { + return ReplicaAddresses + .OrderBy(addr => + statistics.TryGetValue(addr, out var stat) + ? stat.AverageResponseTime + : double.MaxValue) + .ToArray(); + } + } + + private void UpdateStatistics(string address, long elapsedMs) + { + lock (statsLock) + { + if (!statistics.TryGetValue(address, out var stat)) + { + statistics[address] = new ReplicaStats + { + AverageResponseTime = elapsedMs, + Count = 1 + }; + } + else + { + stat.AverageResponseTime = + (stat.AverageResponseTime * stat.Count + elapsedMs) / (stat.Count + 1); + + stat.Count++; + } + } } - protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + protected override ILog Log => + LogManager.GetLogger(typeof(RoundRobinClusterClient)); } } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..72f68f5 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,23 +1,155 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using log4net; - -namespace ClusterClient.Clients -{ - public class SmartClusterClient : ClusterClientBase + using System; + using System.Collections.Generic; + using System.Diagnostics; + using System.Linq; + using System.Text; + using System.Threading.Tasks; + using log4net; + + namespace ClusterClient.Clients { - public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public class SmartClusterClient : ClusterClientBase { - } + private readonly Dictionary statistics = new(); + private readonly object statsLock = new(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); - } + public SmartClusterClient(string[] replicaAddresses) + : base(replicaAddresses) + { + } + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) + { + var startTime = DateTime.UtcNow; + var tasks = new List<(string Address, Task Task, Stopwatch Timer)>(); + + var orderedAddresses = GetOrderedReplicas(); + + for (int i = 0; i < orderedAddresses.Length; i++) + { + var elapsed = DateTime.UtcNow - startTime; + var remaining = timeout - elapsed; + + if (remaining <= TimeSpan.Zero) + throw new TimeoutException(); + + var remainingReplicas = orderedAddresses.Length - i; + var perReplicaTimeout = TimeSpan.FromMilliseconds( + remaining.TotalMilliseconds / remainingReplicas); + + var address = orderedAddresses[i]; + + var stopwatch = Stopwatch.StartNew(); + var task = ProcessReplica(address, query); + + tasks.Add((address, task, stopwatch)); + + var delayTask = Task.Delay(perReplicaTimeout); + var completed = await Task.WhenAny( + tasks.Select(t => t.Task).Cast().Append(delayTask)); + + if (completed != delayTask) + { + var finished = tasks.First(t => t.Task == completed); + var result = await finished.Task; + + if (result != null) + { + finished.Timer.Stop(); + UpdateStatistics(finished.Address, finished.Timer.ElapsedMilliseconds); + return result; + } - protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + tasks.Remove(finished); + } + } + + var remainingFinal = timeout - (DateTime.UtcNow - startTime); + if (remainingFinal <= TimeSpan.Zero) + throw new TimeoutException(); + + var timeoutTask = Task.Delay(remainingFinal); + + while (tasks.Any()) + { + var completed = await Task.WhenAny( + tasks.Select(t => t.Task).Cast().Append(timeoutTask)); + + if (completed == timeoutTask) + throw new TimeoutException(); + + var finished = tasks.First(t => t.Task == completed); + var result = await finished.Task; + + if (result != null) + { + finished.Timer.Stop(); + UpdateStatistics(finished.Address, finished.Timer.ElapsedMilliseconds); + return result; + } + + tasks.Remove(finished); + } + + throw new TimeoutException(); + } + + private async Task ProcessReplica(string address, string query) + { + try + { + var request = CreateRequest(address + "?query=" + query); + Log.InfoFormat($"Processing {request.RequestUri}"); + return await ProcessRequestAsync(request); + } + catch + { + return null; + } + } + + private string[] GetOrderedReplicas() + { + lock (statsLock) + { + return ReplicaAddresses + .OrderBy(addr => + statistics.TryGetValue(addr, out var stat) + ? stat.AverageResponseTime + : double.MaxValue) + .ToArray(); + } + } + + private void UpdateStatistics(string address, long elapsedMs) + { + lock (statsLock) + { + if (!statistics.TryGetValue(address, out var stat)) + { + statistics[address] = new ReplicaStats + { + AverageResponseTime = elapsedMs, + Count = 1 + }; + } + else + { + stat.AverageResponseTime = + (stat.AverageResponseTime * stat.Count + elapsedMs) / (stat.Count + 1); + + stat.Count++; + } + } + } + + protected override ILog Log => + LogManager.GetLogger(typeof(SmartClusterClient)); + + private class ReplicaStats + { + public double AverageResponseTime; + public int Count; + } + } } -}