diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..ec8080f 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -13,11 +13,31 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var requests = ReplicaAddresses + .Select(uri => CreateRequest(uri + "?query=" + query)) + .Select(ProcessRequestAsync) + .ToList(); + + var delay = Task.Delay(timeout); + + while (requests.Count > 0) + { + var completedTask = await Task.WhenAny(requests.Concat([delay])); + + if (completedTask == delay) + throw new TimeoutException(); + + requests.Remove((Task)completedTask); + + if (completedTask.IsCompletedSuccessfully) + return await (Task)completedTask; + } + + return null; } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..3e077fd 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,7 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; using System.Threading.Tasks; using log4net; @@ -9,15 +10,45 @@ namespace ClusterClient.Clients { public class RoundRobinClusterClient : ClusterClientBase { + private readonly ConcurrentDictionary _replicaTimings = new(); + public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var replicaAddress in replicaAddresses) + { + _replicaTimings.TryAdd(replicaAddress, 0); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var sortedReplicas = ReplicaAddresses.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)); + var remainingReplicas = ReplicaAddresses.Length; + + foreach (var replicaAddress in sortedReplicas) + { + var timeoutPerReplica = (timeout - stopwatch.Elapsed) / remainingReplicas--; + + var request = CreateRequest(replicaAddress + "?query=" + query); + var requestTask = ProcessRequestAsync(request); + var delay = Task.Delay(timeoutPerReplica); + + var completedTask = await Task.WhenAny(requestTask, delay); + _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; + + if (completedTask == delay) + continue; + + if (!completedTask.IsCompletedSuccessfully) + continue; + + return await requestTask; + } + + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); } -} +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..0b2a22e 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -9,15 +11,63 @@ namespace ClusterClient.Clients { public class SmartClusterClient : ClusterClientBase { + private readonly ConcurrentDictionary _replicaTimings = new(); + public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { + foreach (var replicaAddress in replicaAddresses) + { + _replicaTimings.TryAdd(replicaAddress, 0); + } } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var stopwatch = Stopwatch.StartNew(); + var sortedReplicas = ReplicaAddresses.OrderBy(a => _replicaTimings.GetValueOrDefault(a, long.MaxValue)); + var remainingReplicas = ReplicaAddresses.Length; + var pendingRequests = new List>(); + + foreach (var replicaAddress in sortedReplicas) + { + var timeoutPerReplica = (timeout - stopwatch.Elapsed) / remainingReplicas--; + + pendingRequests.Add(ProcessRequestAsync(CreateRequest(replicaAddress + "?query=" + query))); + + var delay = Task.Delay(timeoutPerReplica); + var completedTask = await Task.WhenAny(pendingRequests.Append(delay)) as Task; + _replicaTimings[replicaAddress] = stopwatch.ElapsedMilliseconds; + + if (completedTask == null) + continue; + + pendingRequests.Remove(completedTask); + + if (completedTask.IsCompletedSuccessfully) + return await completedTask; + } + + while (pendingRequests.Count > 0) + { + var remainingTime = timeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + break; + + var completedTask = + await Task.WhenAny(pendingRequests.Append(Task.Delay(remainingTime))) as Task; + + if (completedTask == null) + break; + + pendingRequests.Remove(completedTask); + + if (completedTask.IsCompletedSuccessfully) + return await completedTask; + } + + throw new TimeoutException(); } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file