diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..4f8af8f 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -13,9 +13,35 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var tasks = ReplicaAddresses + .Select(uri => CreateRequest(uri + "?query=" + query)) + .Select(ProcessRequestAsync) + .ToList(); + + var delayTask = Task.Delay(timeout); + while (tasks.Count != 0) + { + var processTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask); + await Task.WhenAny(processTask, delayTask); + if (delayTask.IsCompleted) + { + throw new TimeoutException(); + } + var completedTask = await (Task>)processTask; + tasks.Remove(completedTask); + + try + { + return await completedTask; + } + catch (Exception) + { + Log.Error("Task failed"); + } + } + return null; } protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..64d8494 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -12,10 +14,48 @@ public class RoundRobinClusterClient : ClusterClientBase public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } + + private static readonly ConcurrentDictionary ReplicaStats = new(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var sortedReplicas = ReplicaAddresses + .OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0)) + .ToList(); + var sw = Stopwatch.StartNew(); + + for (int i = 0; i < sortedReplicas.Count; i++) + { + var remainingReplicas = sortedReplicas.Count - i; + var timeLeft = timeout - sw.Elapsed; + var currentReplicaTimeout = TimeSpan.FromMilliseconds(timeLeft.TotalMilliseconds / remainingReplicas); + var uri = sortedReplicas[i]; + var request = CreateRequest(uri + "?query=" + query); + var requestTimer = Stopwatch.StartNew(); + var task = ProcessRequestAsync(request); + var delayTask = Task.Delay(currentReplicaTimeout); + var completedTask = await Task.WhenAny(task, delayTask); + + if (completedTask == task) + { + try + { + var result = await task; + UpdateStats(uri, requestTimer.ElapsedMilliseconds); + return result; + } + catch (Exception) + { + UpdateStats(uri, (long)timeout.TotalMilliseconds); + } + } + } + throw new TimeoutException($"Request {query} timed out"); + } + + private void UpdateStats(string uri, long time) + { + ReplicaStats.AddOrUpdate(uri, time, (key, oldVal) => (oldVal + time) / 2); } protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..a786381 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,5 +1,7 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text; using System.Threading.Tasks; @@ -12,12 +14,84 @@ public class SmartClusterClient : ClusterClientBase public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) { } + + private static readonly ConcurrentDictionary ReplicaStats = new(); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - throw new NotImplementedException(); + var sortedReplicas = ReplicaAddresses + .OrderBy(uri => ReplicaStats.GetOrAdd(uri, 0)) + .ToList(); + + var tasks = new List>(); + var perReplicaTimeout = TimeSpan.FromMilliseconds(timeout.TotalMilliseconds / ReplicaAddresses.Length); + var sw = Stopwatch.StartNew(); + + for (int i = 0; i < sortedReplicas.Count; i++) + { + var request = CreateRequest(sortedReplicas[i] + "?query=" + query); + tasks.Add(ProcessAndMeasureAsync(sortedReplicas[i], request)); + + while (true) + { + var timePassed = sw.Elapsed; + var timeToNextLaunch = perReplicaTimeout * (i + 1) - timePassed; + + if (i == ReplicaAddresses.Length - 1) + timeToNextLaunch = timeout - timePassed; + + var delayTask = Task.Delay(timeToNextLaunch); + var completedTask = await Task.WhenAny(Task.WhenAny(tasks), delayTask); + + if (completedTask != delayTask) + { + var finishedTask = await (Task>)completedTask; + try + { + var response = await finishedTask; + UpdateStats(response.Uri, response.ElapsedMs); + return response.Content; + } + catch (Exception) + { + UpdateStats(sortedReplicas[i], (long)timeout.TotalMilliseconds); + tasks.Remove(finishedTask); + if (tasks.Count == 0 || i < ReplicaAddresses.Length - 1) + { + goto next; + } + } + } + else + { + break; + } + } + + next: ; + } + + throw new TimeoutException(); + } + + private async Task ProcessAndMeasureAsync(string uri, System.Net.WebRequest request) + { + var timer = Stopwatch.StartNew(); + var content = await ProcessRequestAsync(request); + return new ResponseData { Uri = uri, Content = content, ElapsedMs = timer.ElapsedMilliseconds }; + } + private void UpdateStats(string uri, long elapsedMs) + { + ReplicaStats.AddOrUpdate(uri, elapsedMs, (key, oldVal) => (oldVal + elapsedMs) / 2); + } + + private class ResponseData + { + public string Uri { get; set; } + public string Content { get; set; } + public long ElapsedMs { get; set; } } protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); } -} +} \ No newline at end of file