diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..a1406d8 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,9 @@ using System; +using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -12,11 +15,25 @@ public abstract class ClusterClientBase { protected string[] ReplicaAddresses { get; set; } + private static readonly ConcurrentDictionary _responseTimes + = new ConcurrentDictionary(); + protected ClusterClientBase(string[] replicaAddresses) { ReplicaAddresses = replicaAddresses; } + protected IEnumerable SortedReplicas => + ReplicaAddresses.OrderBy(r => _responseTimes.GetOrAdd(r, _ => 1000)); + + protected void RecordResponse(string replica, long elapsedMs, bool success) + { + if (success) + _responseTimes.AddOrUpdate(replica, elapsedMs, (_, avg) => (avg + elapsedMs) / 2); + else + _responseTimes.AddOrUpdate(replica, 2000, (_, avg) => Math.Min(avg * 2, 30000)); + } + public abstract Task ProcessRequestAsync(string query, TimeSpan timeout); protected abstract ILog Log { get; } diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..bfd0809 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,9 +15,31 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(addr => CreateRequest(addr + "?query=" + query)) + .Select(req => ProcessRequestAsync(req)) + .ToList(); + + var overallTimeoutTask = Task.Delay(timeout); + + while (tasks.Count > 0) + { + var completedTask = await Task.WhenAny(tasks.Concat(new[] { overallTimeoutTask })); + + if (completedTask == overallTimeoutTask) + throw new TimeoutException("No replica responded within timeout"); + + tasks.Remove((Task)completedTask); + + if (completedTask.IsCompletedSuccessfully) + return await (Task)completedTask; + + Log.ErrorFormat("Replica failed: {0}", completedTask.Exception?.InnerException?.Message); + } + + throw new Exception("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..e4714e1 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,9 +15,50 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var timeoutPerRequest = new TimeSpan(); + var stopwatch = new Stopwatch(); + var remainingReplicas = ReplicaAddresses.Length; + + stopwatch.Start(); + + foreach (var replica in SortedReplicas) + { + var remainingTime = timeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException("No replica responded within overall timeout"); + + timeoutPerRequest = remainingTime.Divide(remainingReplicas); + + remainingReplicas--; + + var webRequest = CreateRequest(replica + "?query=" + query); + Log.InfoFormat($"Processing {webRequest.RequestUri}"); + + var requestTask = ProcessRequestAsync(webRequest); + + var completedTask = await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest)); + + if (completedTask != requestTask) + { + RecordResponse(replica, 0, false); + continue; + } + + try + { + RecordResponse(replica, stopwatch.ElapsedMilliseconds, true); + return requestTask.Result; + } + catch (Exception ex) + { + RecordResponse(replica, 0, false); + Log.ErrorFormat($"Replica {replica} failed: {ex.Message}"); + } + } + + throw new TimeoutException("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..026a0c5 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; +using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,9 +15,86 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var tasks = new Dictionary, string>(); + var replicasToCall = ReplicaAddresses.Length; + + var ordered = SortedReplicas.ToList(); + var timeoutPerReplica = timeout.Divide(ordered.Count); + var replicasLeft = ordered.Count; + + foreach (var replica in ordered) + { + if (stopwatch.Elapsed >= timeout) + break; + + var webRequest = CreateRequest(replica + "?query=" + query); + Log.InfoFormat($"Processing {webRequest.RequestUri}"); + + var requestTask = ProcessRequestAsync(webRequest); + tasks[requestTask] = replica; + + + var weight = ReplicaAddresses.Length - replicasToCall + 1; + var targetTimeForNextStart = timeoutPerReplica * weight; + + var waitTime = targetTimeForNextStart - stopwatch.Elapsed; + var waitTask = Task.Delay(waitTime); + + replicasToCall--; + + if (waitTime > TimeSpan.Zero) + { + var completedTask = await Task.WhenAny(tasks.Keys.Concat(new[] { waitTask })); + + if(completedTask == waitTask) + continue; + + var addr = tasks[(Task)completedTask]; + tasks.Remove((Task)completedTask); + + try + { + RecordResponse(addr, stopwatch.ElapsedMilliseconds, true); + return await (Task)completedTask; + } + catch (Exception ex) + { + RecordResponse(addr, 0, false); + Log.ErrorFormat($"Replica request failed: {ex.Message}"); + } + } + } + + while (tasks.Count > 0 && stopwatch.Elapsed < timeout) + { + var remainingTime = timeout - stopwatch.Elapsed; + var waitTask = Task.Delay(remainingTime); + + var completedTask = await Task.WhenAny(tasks.Keys.Concat(new[] { waitTask })); + + if (completedTask == waitTask) + break; + + var addr = tasks[(Task)completedTask]; + tasks.Remove((Task)completedTask); + + + try + { + RecordResponse(addr, stopwatch.ElapsedMilliseconds, true); + return await (Task)completedTask; + } + catch (Exception ex) + { + RecordResponse(addr, 0, false); + Log.ErrorFormat($"Replica request failed: {ex.Message}"); + } + } + + throw new TimeoutException("All replicas failed"); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));