diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..312e45d 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -13,9 +13,35 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(addr => CreateRequest($"{addr}?query={query}")) + .Select(ProcessRequestAsync) + .ToList(); + + var timeoutTask = Task.Delay(timeout); + + var tasksToWait = new List(tasks); + tasksToWait.Add(timeoutTask); + + while (tasksToWait.Count > 1) + { + var winner = await Task.WhenAny(tasksToWait); + if (winner == timeoutTask) break; + + var completedTask = (Task)winner; + tasksToWait.Remove(completedTask); + try + { + return await completedTask; + } + catch + { + Log.Warn("Failed to process replica."); + } + } + throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..9bf6f14 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,11 +16,67 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var sortedReplicas = ReplicaAddresses + .OrderBy(addr => ReplicaTimes.TryGetValue(addr, out var time) ? time : 0) + .ToArray(); + + var timer = Stopwatch.StartNew(); + + for (var i = 0; i < sortedReplicas.Length; i++) + { + var timeRemain = timeout - timer.Elapsed; + if (timeRemain <= TimeSpan.Zero) + break; + + var currentReplica = sortedReplicas[i]; + var replicaTimeout = timeRemain / (sortedReplicas.Length - i); + var uri = $"{currentReplica}?query={query}"; + var request = CreateRequest(uri); + var completedTask = ProcessAndMeasureAsync(currentReplica, request); + + await Task.WhenAny(completedTask, Task.Delay(replicaTimeout)); + + if (!completedTask.IsCompleted) + continue; + + try + { + return await completedTask; + } + catch + { + Log.Warn("Failed to process replica."); + } + } + throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); + } + + private async Task ProcessAndMeasureAsync(string replicaAddress, WebRequest request) + { + var timer = Stopwatch.StartNew(); + try + { + var result = await base.ProcessRequestAsync(request); + timer.Stop(); + + ReplicaTimes.AddOrUpdate( + replicaAddress, + timer.ElapsedMilliseconds, + (key, oldTime) => oldTime == double.MaxValue ? timer.ElapsedMilliseconds : (oldTime * 0.8 + timer.ElapsedMilliseconds * 0.2) + ); + + return result; + } + catch + { + ReplicaTimes[replicaAddress] = double.MaxValue; + throw; + } } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + private readonly ConcurrentDictionary ReplicaTimes = new ConcurrentDictionary(); } } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..81dec3d 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,11 +16,77 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); - } + var sortedReplicas = ReplicaAddresses + .OrderBy(addr => ReplicaTimes.TryGetValue(addr, out var time) ? time : 0) + .ToArray(); + + var tasks = new List>(); + var timer = Stopwatch.StartNew(); + + for (var i = 0; i < sortedReplicas.Length; i++) + { + var timeRemain = timeout - timer.Elapsed; + if (timeRemain <= TimeSpan.Zero) + break; + + var currentReplica = sortedReplicas[i]; + var replicaTimeout = timeRemain / (sortedReplicas.Length - i); + var uri = $"{sortedReplicas[i]}?query={query}"; + var request = CreateRequest(uri); + tasks.Add(ProcessAndMeasureAsync(currentReplica, request)); + + var timeoutTask = Task.Delay(replicaTimeout); + + while (tasks.Count > 0) + { + var tasksToWait = new List(tasks); + tasksToWait.Add(timeoutTask); + + var winner = await Task.WhenAny(tasksToWait); + if (winner == timeoutTask) break; + var completedTask = (Task)winner; + tasks.Remove(completedTask); + try + { + return await completedTask; + } + catch + { + Log.Warn("Failed to process replica."); + if (tasks.Count == 0) break; + } + } + } + throw new TimeoutException("Not a single replica was successfully answered within the allotted time."); + } + + private async Task ProcessAndMeasureAsync(string replicaAddress, WebRequest request) + { + var timer = Stopwatch.StartNew(); + try + { + var result = await base.ProcessRequestAsync(request); + timer.Stop(); + + ReplicaTimes.AddOrUpdate( + replicaAddress, + timer.ElapsedMilliseconds, + (key, oldTime) => oldTime == double.MaxValue ? timer.ElapsedMilliseconds : (oldTime * 0.8 + timer.ElapsedMilliseconds * 0.2) + ); + + return result; + } + catch + { + ReplicaTimes[replicaAddress] = double.MaxValue; + throw; + } + } + + private readonly ConcurrentDictionary ReplicaTimes = new ConcurrentDictionary(); protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } }