diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..c91860a 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -12,6 +14,11 @@ public abstract class ClusterClientBase { protected string[] ReplicaAddresses { get; set; } + private readonly Dictionary> _responseHistory = + new Dictionary>(); + + private readonly object _historyLock = new object(); + protected ClusterClientBase(string[] replicaAddresses) { ReplicaAddresses = replicaAddresses; @@ -40,5 +47,34 @@ protected async Task ProcessRequestAsync(WebRequest request) return result; } } + + + protected void UpdateStatistics(string replicaAddress, + TimeSpan responseTime) + { + lock (_historyLock) + { + if (!_responseHistory.ContainsKey(replicaAddress)) + _responseHistory[replicaAddress] = []; + + var list = _responseHistory[replicaAddress]; + list.Add(responseTime); + if (list.Count > 10) + list.RemoveAt(0); + } + } + + protected IEnumerable GetOrderedReplicas() + { + lock (_historyLock) + { + return ReplicaAddresses.OrderBy(addr => + { + if (_responseHistory.TryGetValue(addr, out var times) && times.Count > 0) + return times.Average(t => t.TotalMilliseconds); + return double.MaxValue; + }); + } + } } } \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..556416a 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; +using System.Threading; using System.Threading.Tasks; using log4net; @@ -12,12 +12,75 @@ public class ParallelClusterClient : ClusterClientBase public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } + + protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + using var cts = new CancellationTokenSource(); + var tasks = CreateRequestTasks(query, cts.Token); + return await AwaitFirstSuccessAsync(tasks, timeout, cts); } - protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); + private List<(Task Task, string Address)> CreateRequestTasks(string query, CancellationToken token) + { + var tasks = new List<(Task, string)>(); + foreach (var addr in ReplicaAddresses) + { + var task = ProcessSingleRequestAsync(addr, query, token); + tasks.Add((task, addr)); + } + return tasks; + } + + private async Task AwaitFirstSuccessAsync(List<(Task Task, string Address)> tasks, TimeSpan timeout, CancellationTokenSource cts) + { + var timeoutTask = Task.Delay(timeout, cts.Token); + + while (tasks.Count > 0) + { + var completed = await Task.WhenAny(tasks.Select(t => t.Task).Append(timeoutTask)); + if (completed == timeoutTask) + { + cts.Cancel(); + throw new TimeoutException("Request timed out"); + } + + var index = tasks.FindIndex(t => t.Task == completed); + var (_, address) = tasks[index]; + tasks.RemoveAt(index); + + try + { + var result = await (Task)completed; + cts.Cancel(); + return result; + } + catch (Exception ex) + { + Log.InfoFormat("Request to {0} failed: {1}", address, ex.Message); + } + } + + throw new Exception("All replicas failed to return a successful response"); + } + + private async Task ProcessSingleRequestAsync(string replicaAddress, string query, CancellationToken token) + { + var uri = $"{replicaAddress}?query={Uri.EscapeDataString(query)}"; + var request = CreateRequest(uri); + + await using (token.Register(() => request.Abort())) + { + try + { + return await ProcessRequestAsync(request).ConfigureAwait(false); + } + catch (Exception ex) when (token.IsCancellationRequested) + { + throw new OperationCanceledException("Request was cancelled", ex, token); + } + } + } } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..8e0ceb3 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,7 +1,10 @@ using System; -using System.Collections.Generic; +using System.Diagnostics; +using System.IO; using System.Linq; +using System.Net; using System.Text; +using System.Threading; using System.Threading.Tasks; using log4net; @@ -13,11 +16,85 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var orderedReplicas = GetOrderedReplicas().ToArray(); + var totalTimeout = timeout; + var remainingTime = totalTimeout; + var stopwatch = Stopwatch.StartNew(); + + for (var i = 0; i < orderedReplicas.Length; i++) + { + var replicasLeft = orderedReplicas.Length - i; + var perReplicaTimeout = remainingTime / replicasLeft; + + var result = await TryRequestReplicaAsync(orderedReplicas[i], query, perReplicaTimeout).ConfigureAwait(false); + if (result != null) + return result; + + remainingTime = totalTimeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException("Request timed out"); + } + + throw new Exception("All replicas failed to return a successful response"); } + + private async Task TryRequestReplicaAsync(string replicaAddress, string query, TimeSpan timeout) + { + var uri = $"{replicaAddress}?query={Uri.EscapeDataString(query)}"; + var request = CreateRequest(uri); + using var timeoutCts = new CancellationTokenSource(); + var delayTask = Task.Delay(timeout, timeoutCts.Token); + var stopwatch = Stopwatch.StartNew(); - protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + try + { + var responseTask = request.GetResponseAsync(); + var completedTask = await Task.WhenAny(responseTask, delayTask).ConfigureAwait(false); + + if (completedTask == delayTask) + { + timeoutCts.Cancel(); + request.Abort(); + try + { + await responseTask; + } + catch (Exception ex) + { + Log.InfoFormat("Replica {0} timed out: {1}", replicaAddress, ex.Message); + } + + return null; + } + + timeoutCts.Cancel(); + using var response = await responseTask.ConfigureAwait(false); + var httpResponse = (HttpWebResponse)response; + + if (httpResponse.StatusCode != HttpStatusCode.OK) + { + Log.InfoFormat("Replica {0} returned status {1}", replicaAddress, httpResponse.StatusCode); + return null; + } + + using var reader = new StreamReader(response.GetResponseStream(), Encoding.UTF8); + var result = await reader.ReadToEndAsync().ConfigureAwait(false); + Log.InfoFormat("Replica {0} succeeded", replicaAddress); + + stopwatch.Stop(); + UpdateStatistics(replicaAddress, stopwatch.Elapsed); + + return result; + } + catch (Exception ex) + { + Log.InfoFormat("Replica {0} failed: {1}", replicaAddress, ex.Message); + return null; + } + } } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..d4d7315 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,6 +1,9 @@ using System; using System.Collections.Generic; +using System.Diagnostics; +using System.IO; using System.Linq; +using System.Net; using System.Text; using System.Threading.Tasks; using log4net; @@ -13,11 +16,113 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) + { + var orderedReplicas = GetOrderedReplicas().ToArray(); + var totalTimeout = timeout; + var stopwatch = Stopwatch.StartNew(); + var taskToAddress = new Dictionary, string>(); + var i = 0; + + while (i < orderedReplicas.Length) + { + if (!taskToAddress.Values.Contains(orderedReplicas[i])) + { + var addr = orderedReplicas[i]; + var task = RequestReplicaAsync(addr, query); + taskToAddress[task] = addr; + } + + var remainingTime = totalTimeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException("Request timed out"); + + var replicasLeft = orderedReplicas.Length - i; + var stepTimeout = remainingTime / replicasLeft; + + var completedTask = await WaitForAnyTaskOrTimeoutAsync(taskToAddress.Keys, stepTimeout) + .ConfigureAwait(false); + + if (completedTask != null) + { + var addr = taskToAddress[completedTask]; + var result = await TryGetResultAsync(completedTask, addr).ConfigureAwait(false); + if (result != null) + return result; + + taskToAddress.Remove(completedTask); + + if (taskToAddress.Count == 0) + i++; + } + else + i++; + } + + while (taskToAddress.Count > 0) + { + var remainingTime = totalTimeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException("Request timed out"); + + var completedTask = await WaitForAnyTaskOrTimeoutAsync(taskToAddress.Keys, remainingTime) + .ConfigureAwait(false); + if (completedTask == null) + throw new TimeoutException("Request timed out"); + + var addr = taskToAddress[completedTask]; + var result = await TryGetResultAsync(completedTask, addr).ConfigureAwait(false); + if (result != null) + return result; + + taskToAddress.Remove(completedTask); + } + + throw new Exception("All replicas failed to return a successful response"); + } + + private async Task RequestReplicaAsync(string replicaAddress, string query) { - throw new NotImplementedException(); + var uri = $"{replicaAddress}?query={Uri.EscapeDataString(query)}"; + var request = CreateRequest(uri); + var stopwatch = Stopwatch.StartNew(); + + using var response = await request.GetResponseAsync().ConfigureAwait(false); + var httpResponse = (HttpWebResponse)response; + if (httpResponse.StatusCode != HttpStatusCode.OK) + throw new Exception($"Replica returned status {httpResponse.StatusCode}"); + + using var reader = new StreamReader(response.GetResponseStream(), Encoding.UTF8); + var result = await reader.ReadToEndAsync().ConfigureAwait(false); + + stopwatch.Stop(); + UpdateStatistics(replicaAddress, stopwatch.Elapsed); + + return result; + } + + private async Task> WaitForAnyTaskOrTimeoutAsync(IEnumerable> tasks, TimeSpan timeout) + { + var timeoutTask = Task.Delay(timeout); + var completed = await Task.WhenAny(tasks.Append(timeoutTask)).ConfigureAwait(false); + return completed == timeoutTask ? null : (Task)completed; } - protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + private async Task TryGetResultAsync(Task task, string address) + { + try + { + var result = await task.ConfigureAwait(false); + Log.InfoFormat("Replica {0} succeeded", address); + return result; + } + catch (Exception ex) + { + Log.InfoFormat("Replica {0} failed: {1}", address, ex.Message); + return null; + } + } } -} +} \ No newline at end of file