diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..572a054 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,23 +1,48 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; namespace ClusterClient.Clients { - public class ParallelClusterClient : ClusterClientBase + public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - } + var requestTasks = ReplicaAddresses + .Select(baseUri => ProcessRequestAsync(CreateRequest(baseUri + "?query=" + query))) + .ToList(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) - { - throw new NotImplementedException(); + var timeoutTask = Task.Delay(timeout); + + while (requestTasks.Count > 0) + { + var completedTask = await Task.WhenAny(requestTasks.Concat([timeoutTask]).ToArray()); + + if (timeoutTask.IsCompleted) + { + Log.Error("Request timed out"); + throw new TimeoutException("Request timed out"); + } + + requestTasks.Remove((Task)completedTask); + + try + { + var result = await (Task)completedTask; + if (string.IsNullOrEmpty(result)) continue; + Log.Info("Request completed"); + return result; + } + catch (Exception e) + { + Log.Debug($"Request failed: {e.Message}"); + } + } + + throw new TimeoutException("All replicas were timed out"); } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..6481a47 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,15 +9,60 @@ namespace ClusterClient.Clients { public class RoundRobinClusterClient : ClusterClientBase { + private readonly Dictionary _replicaStatus = new(); + private const double PenaltyTimeInMs = 100; + public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var address in replicaAddresses) + { + _replicaStatus[address] = double.MaxValue; + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var timer = Stopwatch.StartNew(); + var replicasLeft = ReplicaAddresses.Length; + + var sortedReplicas = ReplicaAddresses + .OrderBy(address => _replicaStatus.GetValueOrDefault(address, double.MaxValue)) + .ToList(); + + foreach (var replicaAddress in sortedReplicas) + { + var timeLeft = timeout - timer.Elapsed; + + if (timeLeft <= TimeSpan.Zero) + throw new TimeoutException(); + + var replicaTimeout = TimeSpan.FromTicks(timeLeft.Ticks / replicasLeft); + replicasLeft--; + + var request = CreateRequest($"{replicaAddress}?query={Uri.EscapeDataString(query)}"); + var requestTask = ProcessRequestAsync(request); + var timeoutTask = Task.Delay(replicaTimeout); + + var completedTask = await Task.WhenAny(requestTask, timeoutTask); + + if (completedTask == timeoutTask) + { + _replicaStatus[replicaAddress] = replicaTimeout.TotalMilliseconds; + continue; + } + + if (requestTask.Status == TaskStatus.RanToCompletion) + { + _replicaStatus[replicaAddress] = replicaTimeout.TotalMilliseconds; + return await requestTask; + } + + _replicaStatus[replicaAddress] = PenaltyTimeInMs; + } + + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..9ce2456 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,7 +1,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,15 +9,103 @@ namespace ClusterClient.Clients { public class SmartClusterClient : ClusterClientBase { + private readonly Dictionary _replicaStatus = new(); + private const double PenaltyTimeInMs = 100; + private const int TaskCheckIntervalInMs = 10; + public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var address in replicaAddresses) + { + _replicaStatus[address] = double.MaxValue; + } + } + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) + { + var timer = Stopwatch.StartNew(); + var replicasLeft = ReplicaAddresses.Length; + var pendingTasks = new List<(Task Task, string Address)>(); + + var sortedReplicas = ReplicaAddresses + .OrderBy(address => _replicaStatus.GetValueOrDefault(address, double.MaxValue)) + .ToList(); + + foreach (var replicaAddress in sortedReplicas) + { + var timeLeft = timeout - timer.Elapsed; + if (timeLeft <= TimeSpan.Zero) + throw new TimeoutException(); + + var replicaTimeout = TimeSpan.FromTicks(timeLeft.Ticks / replicasLeft); + replicasLeft--; + + var request = CreateRequest($"{replicaAddress}?query={Uri.EscapeDataString(query)}"); + var requestTask = ProcessRequestAsync(request); + pendingTasks.Add((requestTask, replicaAddress)); + + var timeoutTask = Task.Delay(replicaTimeout); + var completedTask = await Task.WhenAny( + pendingTasks.Select(pt => pt.Task) + .Append(timeoutTask).ToArray()); + + if (completedTask == timeoutTask) + continue; + + + var finishedItem = ExtractCompletedTask(pendingTasks, completedTask); + + + if (finishedItem.Task.Status == TaskStatus.RanToCompletion) + { + _replicaStatus[finishedItem.Address] = timer.ElapsedMilliseconds; + return finishedItem.Task.Result; + } + + _replicaStatus[finishedItem.Address] = PenaltyTimeInMs; + } + + while (pendingTasks.Count != 0) + { + var timeLeft = timeout - timer.Elapsed; + if (timeLeft <= TimeSpan.Zero) + throw new TimeoutException(); + + var checkTask = Task.Delay(TaskCheckIntervalInMs); + var doneTask = await Task.WhenAny( + pendingTasks + .Select(pt => pt.Task) + .Append(checkTask) + .ToList()); + + + if (doneTask == checkTask) + continue; + + var finishedItem = ExtractCompletedTask(pendingTasks, doneTask); + + if (finishedItem.Task.Status == TaskStatus.RanToCompletion) + { + _replicaStatus[finishedItem.Address] = timer.ElapsedMilliseconds; + return finishedItem.Task.Result; + } + + _replicaStatus[finishedItem.Address] = PenaltyTimeInMs; + } + + throw new TimeoutException(); } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + private static (Task Task, string Address) ExtractCompletedTask( + List<(Task Task, string Address)> pendingTasks, Task completedTask) { - throw new NotImplementedException(); + var finishedIndex = pendingTasks.FindIndex(pt => pt.Task == completedTask); + if (finishedIndex == -1) throw new InvalidOperationException(); + var finishedItem = pendingTasks[finishedIndex]; + pendingTasks.RemoveAt(finishedIndex); + return finishedItem; } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file