diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..9bc479a 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -40,5 +40,15 @@ protected async Task ProcessRequestAsync(WebRequest request) return result; } } + + protected Task CallRequest(string address, string query) + { + var uri = new UriBuilder(address) + { + Query = $"query={query}" + }.Uri.AbsoluteUri; + + return ProcessRequestAsync(CreateRequest(uri)); + } } } \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..2e659a7 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using log4net; @@ -11,11 +12,44 @@ public class ParallelClusterClient : ClusterClientBase { public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + ReplicaAddresses = replicaAddresses; } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(address => CallRequest(address, query)) + .ToList(); + + var timeoutTask = Task.Delay(timeout).ContinueWith(_ => throw new TimeoutException()); + tasks.Add(timeoutTask); + + while (tasks.Count > 1) + { + var completedTask = await Task.WhenAny(tasks); + if (completedTask == timeoutTask) + await timeoutTask; + try + { + return await completedTask; + } + catch + { + tasks.Remove(completedTask); + if (tasks.Count == 1) throw new Exception(); + } + } + throw new TimeoutException(); + } + + private Task CallRequest(string address, string query) + { + var uri = new UriBuilder(address) + { + Query = $"query={query}" + }.Uri.AbsoluteUri; + + return ProcessRequestAsync(CreateRequest(uri)); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..e81a620 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -9,15 +11,52 @@ namespace ClusterClient.Clients { public class RoundRobinClusterClient : ClusterClientBase { + private ConcurrentDictionary _stats; + public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + ReplicaAddresses = replicaAddresses; + _stats = new (); + foreach (var addr in replicaAddresses) + { + _stats.TryAdd(addr, TimeSpan.MaxValue); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); - } + var orderedReplicas = ReplicaAddresses + .OrderBy(addr => _stats.GetValueOrDefault(addr, TimeSpan.MaxValue)) + .ToArray(); + + for (int i = 0; i < ReplicaAddresses.Length; i++) + { + var replicaTimeout = timeout / (ReplicaAddresses.Length - i); + var address = orderedReplicas[i]; + var task = CallRequest(address, query); + var timeoutTask = Task.Delay(replicaTimeout); + var sw = Stopwatch.StartNew(); + var completedTask = await Task.WhenAny(task, timeoutTask); + + sw.Stop(); + _stats[address] = sw.Elapsed; + timeout -= sw.Elapsed; + if (completedTask == timeoutTask) + { + continue; + } + try + { + return await task; + } + catch + { + _stats[address] = TimeSpan.MaxValue; + } + } + throw new TimeoutException(); + } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); } } diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..c0f1341 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -9,13 +11,58 @@ namespace ClusterClient.Clients { public class SmartClusterClient : ClusterClientBase { + private ConcurrentDictionary _stats; + public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + ReplicaAddresses = replicaAddresses; + _stats = new (); + foreach (var addr in replicaAddresses) + { + _stats.TryAdd(addr, TimeSpan.MaxValue); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var runningTasks = new Dictionary, string>(); + + var orderedReplicas = ReplicaAddresses + .OrderBy(addr => _stats.GetValueOrDefault(addr, TimeSpan.MaxValue)) + .ToArray(); + + for (int i = 0; i < ReplicaAddresses.Length; i++) + { + var replicaTimeout = timeout / (ReplicaAddresses.Length - i); + var address = orderedReplicas[i]; + var task = CallRequest(address, query); + var timeoutTask = Task.Delay(replicaTimeout).ContinueWith(_ => throw new TimeoutException()); + runningTasks.Add(task, address); + runningTasks.Add(timeoutTask, "timeout"); + var sw = Stopwatch.StartNew(); + var completedTask = await Task.WhenAny(runningTasks.Keys); + sw.Stop(); + + runningTasks.Remove(timeoutTask); + + timeout -= sw.Elapsed; + if (completedTask == timeoutTask) + { + continue; + } + + try + { + _stats[runningTasks[completedTask]] = sw.Elapsed; + return await completedTask; + } + catch + { + _stats[runningTasks[completedTask]] = TimeSpan.MaxValue; + } + runningTasks.Remove(completedTask); + } + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));