diff --git a/homework 2/ClusterClient/Clients/ClusterClientBase.cs b/homework 2/ClusterClient/Clients/ClusterClientBase.cs index 23a2ffd..bd43ba1 100644 --- a/homework 2/ClusterClient/Clients/ClusterClientBase.cs +++ b/homework 2/ClusterClient/Clients/ClusterClientBase.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Diagnostics; using System.IO; +using System.Linq; using System.Net; using System.Text; using System.Threading.Tasks; @@ -40,5 +42,74 @@ protected async Task ProcessRequestAsync(WebRequest request) return result; } } + + protected async Task> WaitForReplicaAndLogAsync( + List> requests, + TimeSpan timeout, + bool throwIfTimeout = false) + { + ArgumentNullException.ThrowIfNull(requests); + + Log.Debug($"Waiting for replica response. Pending replicas: {requests.Count}"); + var completedRequest = await WaitWithTimeout(requests, timeout); + + if (completedRequest == null) + { + Log.Debug("Global timeout reached. No replica responded in time"); + return throwIfTimeout + ? throw new TimeoutException() + : null; + } + + requests.Remove(completedRequest); + Log.Debug($"Replica task completed. Remaining replicas: {requests.Count}"); + + if (completedRequest.IsCompletedSuccessfully) + { + Log.Info("Replica responded successfully"); + return completedRequest; + } + + Log.Error("Replica failed", completedRequest.Exception?.InnerException); + return completedRequest; + } + + protected async Task HandleWhenAnyResultAsync(Task requestTask, Task completedTask, string replicaUri) + { + if (completedTask == requestTask) + { + if (!requestTask.IsCompletedSuccessfully) + { + Log.Error($"Replica {replicaUri} failed", requestTask.Exception?.InnerException); + return null; + } + + Log.Info($"Replica {replicaUri} responded successfully"); + return await requestTask; + } + + Log.Debug($"Replica {replicaUri} timed out"); + return null; + } + + protected async Task ProcessReplicaAsync(string uri, string query) + { + var request = CreateRequest($"{uri}?query={query}"); + Log.Info($"Processing {request.RequestUri}"); + return await ProcessRequestAsync(request); + } + + private async Task> WaitWithTimeout(IEnumerable> tasks, TimeSpan timeout) + { + var delay = Task.Delay(timeout); + var completed = await Task.WhenAny(tasks.Append(delay)); + + if (completed == delay) + { + return null; + } + + return (Task)completed; + } } } \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..c1dcc24 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,7 +1,5 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -13,11 +11,26 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var requests = ReplicaAddresses + .Select(uri => ProcessReplicaAsync(uri, query)) + .ToList(); + + while (requests.Count > 0) + { + var completedRequest = await WaitForReplicaAndLogAsync(requests, timeout, true); + + if (completedRequest is { IsCompletedSuccessfully: true }) + { + return await completedRequest; + } + } + + Log.Error("All replicas failed"); + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..932c002 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,7 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,13 +10,46 @@ namespace ClusterClient.Clients { public class RoundRobinClusterClient : ClusterClientBase { + private readonly ConcurrentDictionary _replicaTimings = new(); + public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var replicaAddress in replicaAddresses) + { + _replicaTimings.TryAdd(replicaAddress, 0); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var replicasLeft = ReplicaAddresses.Length; + + var sortedReplicas = ReplicaAddresses + .OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)); + + foreach (var replicaUri in sortedReplicas) + { + var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--; + + Log.Debug($"Sending request to replica {replicaUri}. Timeout per replica: " + + $"{timeoutPerReplica.TotalMilliseconds} ms"); + + var requestTask = ProcessReplicaAsync(replicaUri, query); + var delay = Task.Delay(timeoutPerReplica); + + var completedTask = await Task.WhenAny(requestTask, delay); + _replicaTimings[replicaUri] = stopwatch.ElapsedMilliseconds; + + var result = await HandleWhenAnyResultAsync(requestTask, completedTask, replicaUri); + if (result != null) + { + return result; + } + } + + Log.Debug("All replicas failed or timed out"); + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..3bcb0fa 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,7 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,15 +10,77 @@ namespace ClusterClient.Clients { public class SmartClusterClient : ClusterClientBase { + private readonly ConcurrentDictionary _replicaTimings = new(); + public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var replicaAddress in replicaAddresses) + { + _replicaTimings.TryAdd(replicaAddress, 0); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var replicasLeft = ReplicaAddresses.Length; + + var sortedReplicas = ReplicaAddresses + .OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)) + .ToList(); + + Log.Debug($"Replica order: {string.Join(", ", sortedReplicas)}"); + + var pendingRequests = new List>(); + + foreach (var replicaAddress in sortedReplicas) + { + var timeoutPerReplica = (timeout - stopwatch.Elapsed) / replicasLeft--; + + Log.Debug($"Starting request to {replicaAddress}. Timeout per replica: " + + $"{timeoutPerReplica.TotalMilliseconds} ms"); + + pendingRequests.Add(ProcessReplicaAsync(replicaAddress, query)); + var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, timeoutPerReplica); + _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; + + if (completedTask is not { IsCompletedSuccessfully: true }) + { + continue; + } + + Log.Info($"Replica responded successfully in {stopwatch.ElapsedMilliseconds} ms"); + return await completedTask; + } + + Log.Debug("Initial replica attempts finished. Waiting for remaining pending requests"); + + while (pendingRequests.Count > 0) + { + var remainingTime = timeout - stopwatch.Elapsed; + + if (remainingTime <= TimeSpan.Zero) + { + Log.Debug("Global timeout reached while waiting for pending replicas"); + break; + } + + Log.Debug($"Waiting for pending replicas. Remaining: {pendingRequests.Count}"); + var completedTask = await WaitForReplicaAndLogAsync(pendingRequests, remainingTime); + + if (completedTask is not { IsCompletedSuccessfully: true }) + { + continue; + } + + Log.Info($"Pending replica responded successfully in {stopwatch.ElapsedMilliseconds} ms"); + return await completedTask; + } + + Log.Error("All replicas failed or timed out"); + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file