From 451e9bbedebc32b527947a227646cbe9e78a88a2 Mon Sep 17 00:00:00 2001 From: Matvey Date: Tue, 3 Mar 2026 20:21:17 +0500 Subject: [PATCH 1/5] implemented parralel client --- .../Clients/ParallelClusterClient.cs | 44 ++++++++++++++----- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..0efdccd 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,23 +1,47 @@ using System; -using System.Collections.Generic; using System.Linq; -using System.Text; +using System.Threading; using System.Threading.Tasks; using log4net; -namespace ClusterClient.Clients +namespace ClusterClient.Clients; + +public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public class ParallelClusterClient : ClusterClientBase + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) + var cts = new CancellationTokenSource(timeout); + + var tasks = ReplicaAddresses + .Select(address => CallReplicaAsync(address, query, cts.Token)) + .ToList(); + + while (tasks.Count > 0) { + var completed = await Task.WhenAny(tasks); + + if (completed.IsCompletedSuccessfully) + { + await cts.CancelAsync(); + return completed.Result; + } + + tasks.Remove(completed); } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + throw new TimeoutException("No replica responded within the timeout"); + } + + private async Task CallReplicaAsync(string address, string query, CancellationToken token) + { + var uri = $"{address}?query={query}"; + var request = CreateRequest(uri); + + await using (token.Register(() => request.Abort())) { - throw new NotImplementedException(); + return await ProcessRequestAsync(request); } - - protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); } -} + + protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); +} \ No newline at end of file From 2e6f5f174b2e300e43ea02097e18021da378aa2b Mon Sep 17 00:00:00 2001 From: Matvey Date: Tue, 3 Mar 2026 20:25:58 +0500 Subject: [PATCH 2/5] implemented performance tracker for round robin client and smart client --- .../Clients/ReplicaPerformanceTracker.cs | 59 +++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 homework 2/ClusterClient/Clients/ReplicaPerformanceTracker.cs diff --git a/homework 2/ClusterClient/Clients/ReplicaPerformanceTracker.cs b/homework 2/ClusterClient/Clients/ReplicaPerformanceTracker.cs new file mode 100644 index 0000000..1694263 --- /dev/null +++ b/homework 2/ClusterClient/Clients/ReplicaPerformanceTracker.cs @@ -0,0 +1,59 @@ +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; + +namespace ClusterClient.Clients +{ + public class ReplicaPerformanceTracker + { + private readonly ConcurrentDictionary stats = new(); + + public string[] OrderByFastest(string[] replicas) + { + return replicas + .Select((r, i) => new { Replica = r, Index = i }) + .OrderBy(x => + stats.TryGetValue(x.Replica, out var s) && s.HasSamples + ? s.AverageMs + : long.MaxValue) + .ThenBy(x => x.Index) + .Select(x => x.Replica) + .ToArray(); + } + + public void ReportResult(string replica, long elapsedMs) + { + var stat = stats.GetOrAdd(replica, _ => new ReplicaStats()); + stat.AddSample(elapsedMs); + } + + private class ReplicaStats + { + private long avgMs; + private long count; + + public bool HasSamples => Volatile.Read(ref count) > 0; + + public long AverageMs + { + get + { + var avg = Volatile.Read(ref avgMs); + return avg == 0 ? long.MaxValue : avg; + } + } + + public void AddSample(long sampleMs) + { + var newCount = Interlocked.Increment(ref count); + + long oldAvg, newAvg; + do + { + oldAvg = Volatile.Read(ref avgMs); + newAvg = (oldAvg * (newCount - 1) + sampleMs) / newCount; + } while (Interlocked.CompareExchange(ref avgMs, newAvg, oldAvg) != oldAvg); + } + } + } +} \ No newline at end of file From 45cf1947d27c800c1491d60695fc8fb0746e33a2 Mon Sep 17 00:00:00 2001 From: Matvey Date: Tue, 3 Mar 2026 20:26:54 +0500 Subject: [PATCH 3/5] implemented round robin client with performance tracker --- .../Clients/RoundRobinClusterClient.cs | 63 +++++++++++++++---- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..55c01e2 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,23 +1,62 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Diagnostics; +using System.Net; using System.Threading.Tasks; using log4net; -namespace ClusterClient.Clients +namespace ClusterClient.Clients; + +public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public class RoundRobinClusterClient : ClusterClientBase + private readonly ReplicaPerformanceTracker tracker = new(); + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) - { - } + var stopwatch = Stopwatch.StartNew(); + + var orderedReplicas = tracker.OrderByFastest(ReplicaAddresses); + var remainingReplicas = orderedReplicas.Length; - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + foreach (var replica in orderedReplicas) { - throw new NotImplementedException(); + var remainingTime = timeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + throw new TimeoutException("No replica responded within overall timeout"); + + var timeoutPerRequest = remainingTime.Divide(remainingReplicas); + remainingReplicas--; + + var webRequest = CreateRequest(replica + "?query=" + query); + Log.InfoFormat($"Processing {webRequest.RequestUri}"); + + var sw = Stopwatch.StartNew(); + var requestTask = ProcessRequestAsync(webRequest); + + var completed = await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest)); + + if (completed != requestTask) + { + sw.Stop(); + tracker.ReportResult(replica, sw.ElapsedMilliseconds); + continue; + } + + try + { + var result = await requestTask; + sw.Stop(); + tracker.ReportResult(replica, sw.ElapsedMilliseconds); + return result; + } + catch (WebException) + { + sw.Stop(); + tracker.ReportResult(replica, sw.ElapsedMilliseconds); + } } - protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + throw new TimeoutException("No replica responded"); } -} + + protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); +} \ No newline at end of file From bcd4a57f5297f29277b77dd11f29a01206864b39 Mon Sep 17 00:00:00 2001 From: Matvey Date: Tue, 3 Mar 2026 20:27:12 +0500 Subject: [PATCH 4/5] implemented smart client with performance tracker --- .../Clients/SmartClusterClient.cs | 98 +++++++++++++++++-- 1 file changed, 89 insertions(+), 9 deletions(-) diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..7a08bad 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,23 +1,103 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; -using System.Text; +using System.Net; using System.Threading.Tasks; using log4net; -namespace ClusterClient.Clients +namespace ClusterClient.Clients; + +public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public class SmartClusterClient : ClusterClientBase + private readonly ReplicaPerformanceTracker tracker = new(); + + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) + var stopwatch = Stopwatch.StartNew(); + + var orderedReplicas = tracker.OrderByFastest(ReplicaAddresses); + var remainingReplicas = orderedReplicas.Length; + + var inFlight = new List<(string replica, Task task, Stopwatch sw)>(); + + foreach (var replica in orderedReplicas) { - } + var remainingTime = timeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + break; + + var timeoutPerRequest = remainingTime.Divide(remainingReplicas); + remainingReplicas--; + + var request = CreateRequest(replica + "?query=" + query); + Log.InfoFormat($"Processing {request.RequestUri}"); + + var sw = Stopwatch.StartNew(); + var task = ProcessRequestAsync(request); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + inFlight.Add((replica, task, sw)); + + var delayTask = Task.Delay(timeoutPerRequest); + var anyResponseTask = Task.WhenAny(inFlight.Select(x => x.task)); + + var completed = await Task.WhenAny(anyResponseTask, delayTask); + + if (completed != anyResponseTask) continue; + { + await anyResponseTask; + + var winner = inFlight.First(x => x.task.IsCompleted); + + try + { + var result = await winner.task; + winner.sw.Stop(); + tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); + return result; + } + catch (WebException) + { + winner.sw.Stop(); + tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); + inFlight.Remove(winner); + } + } + } + + while (inFlight.Count > 0) { - throw new NotImplementedException(); + var remainingTime = timeout - stopwatch.Elapsed; + if (remainingTime <= TimeSpan.Zero) + break; + + var completed = await Task.WhenAny( + Task.WhenAny(inFlight.Select(x => x.task)), + Task.Delay(remainingTime) + ); + + if (completed is Task) + { + var winner = inFlight.First(x => x.task.IsCompleted); + + try + { + var result = await winner.task; + winner.sw.Stop(); + tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); + return result; + } + catch (WebException) + { + winner.sw.Stop(); + tracker.ReportResult(winner.replica, winner.sw.ElapsedMilliseconds); + inFlight.Remove(winner); + } + } } - protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + throw new TimeoutException("No replica responded within timeout"); } -} + + protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); +} \ No newline at end of file From d73e326b88d7f49fe5e04749e07a7dd23931d4b3 Mon Sep 17 00:00:00 2001 From: Matvey Date: Tue, 3 Mar 2026 20:28:47 +0500 Subject: [PATCH 5/5] added tests for smart and round robin clients to check if performance tracker is working correctly --- .../RoundRobinClusterClientTest.cs | 163 +++++++++++------- .../ClusterTests/SmartClusterClientTest.cs | 28 +++ 2 files changed, 124 insertions(+), 67 deletions(-) diff --git a/homework 2/ClusterTests/RoundRobinClusterClientTest.cs b/homework 2/ClusterTests/RoundRobinClusterClientTest.cs index 7c4066f..69b5254 100644 --- a/homework 2/ClusterTests/RoundRobinClusterClientTest.cs +++ b/homework 2/ClusterTests/RoundRobinClusterClientTest.cs @@ -1,79 +1,108 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading.Tasks; using ClusterClient.Clients; using FluentAssertions; using NUnit.Framework; namespace ClusterTests { - public class RoundRobinClusterClientTest : ClusterTest - { - protected override ClusterClientBase CreateClient(string[] replicaAddresses) - => new RoundRobinClusterClient(replicaAddresses); - - [Test] - public override void Client_should_return_success_when_timeout_is_close() - { - for(int i = 0; i < 3; i++) - CreateServer(Timeout / 3); - - ProcessRequests(Timeout + 100); - } - - [Test] - public void ShouldReturnSuccessWhenLastReplicaIsGoodAndOthersAreSlow() - { - for(int i = 0; i < 3; i++) - CreateServer(Slow); - CreateServer(Fast); - - ProcessRequests(Timeout).Last().Should().BeCloseTo(TimeSpan.FromMilliseconds(3 * Timeout / 4 + Fast), Epsilon); - } - - [Test] - public void ShouldReturnSuccessWhenLastReplicaIsGoodAndOthersAreBad() - { - for(int i = 0; i < 1; i++) - CreateServer(1, status: 500); - CreateServer(Fast); - - ProcessRequests(Timeout).Last().Should().BeCloseTo(TimeSpan.FromMilliseconds(Fast), Epsilon); - } - - [Test] - public void ShouldThrowAfterTimeout() - { - for(var i = 0; i < 10; i++) - CreateServer(Slow); - - var sw = Stopwatch.StartNew(); - Assert.Throws(() => ProcessRequests(Timeout)); - sw.Elapsed.Should().BeCloseTo(TimeSpan.FromMilliseconds(Timeout), Epsilon); - } - - [Test] - public void ShouldForgetPreviousAttemptWhenStartNew() - { - CreateServer(4500); - CreateServer(3000); - CreateServer(10000); - - var sw = Stopwatch.StartNew(); - Assert.Throws(() => ProcessRequests(6000)); - sw.Elapsed.Should().BeCloseTo(TimeSpan.FromMilliseconds(Timeout), Epsilon); - } - - [Test] - public void ShouldNotSpendTimeOnBad() - { - CreateServer(1, status: 500); - CreateServer(1, status: 500); - CreateServer(10000); + public class RoundRobinClusterClientTest : ClusterTest + { + protected override ClusterClientBase CreateClient(string[] replicaAddresses) + => new RoundRobinClusterClient(replicaAddresses); + + [Test] + public override void Client_should_return_success_when_timeout_is_close() + { + for (int i = 0; i < 3; i++) + CreateServer(Timeout / 3); + + ProcessRequests(Timeout + 100); + } + + [Test] + public void ShouldReturnSuccessWhenLastReplicaIsGoodAndOthersAreSlow() + { + for (int i = 0; i < 3; i++) + CreateServer(Slow); + CreateServer(Fast); + + ProcessRequests(Timeout).Last().Should() + .BeCloseTo(TimeSpan.FromMilliseconds(3 * Timeout / 4 + Fast), Epsilon); + } + + [Test] + public void ShouldReturnSuccessWhenLastReplicaIsGoodAndOthersAreBad() + { + for (int i = 0; i < 1; i++) + CreateServer(1, status: 500); + CreateServer(Fast); + + ProcessRequests(Timeout).Last().Should().BeCloseTo(TimeSpan.FromMilliseconds(Fast), Epsilon); + } + + [Test] + public void ShouldThrowAfterTimeout() + { + for (var i = 0; i < 10; i++) + CreateServer(Slow); + + var sw = Stopwatch.StartNew(); + Assert.Throws(() => ProcessRequests(Timeout)); + sw.Elapsed.Should().BeCloseTo(TimeSpan.FromMilliseconds(Timeout), Epsilon); + } + + [Test] + public void ShouldForgetPreviousAttemptWhenStartNew() + { + CreateServer(4500); + CreateServer(3000); + CreateServer(10000); + + var sw = Stopwatch.StartNew(); + Assert.Throws(() => ProcessRequests(6000)); + sw.Elapsed.Should().BeCloseTo(TimeSpan.FromMilliseconds(Timeout), Epsilon); + } + + [Test] + public void ShouldNotSpendTimeOnBad() + { + CreateServer(1, status: 500); + CreateServer(1, status: 500); + CreateServer(10000); CreateServer(2500); - foreach(var time in ProcessRequests(6000)) - time.Should().BeCloseTo(TimeSpan.FromMilliseconds(5500), Epsilon); - } - } + foreach (var time in ProcessRequests(6000)) + time.Should().BeCloseTo(TimeSpan.FromMilliseconds(5500), Epsilon); + } + + [Test] + public async Task ShouldLearnBetweenSequentialRequests() + { + var slow = CreateServer(3000); + var fast = CreateServer(200); + + var addresses = new[] + { + $"http://127.0.0.1:{slow.ServerOptions.Port}/{slow.ServerOptions.MethodName}/", + $"http://127.0.0.1:{fast.ServerOptions.Port}/{fast.ServerOptions.MethodName}/" + }; + + var client = CreateClient(addresses); + + var sw1 = Stopwatch.StartNew(); + await client.ProcessRequestAsync("00000001", TimeSpan.FromMilliseconds(5000)); + sw1.Stop(); + + var sw2 = Stopwatch.StartNew(); + await client.ProcessRequestAsync("00000002", TimeSpan.FromMilliseconds(5000)); + sw2.Stop(); + + sw2.Elapsed.Should().BeLessThan(sw1.Elapsed); + + sw2.Elapsed.Should().BeCloseTo(TimeSpan.FromMilliseconds(200), Epsilon); + } + } } \ No newline at end of file diff --git a/homework 2/ClusterTests/SmartClusterClientTest.cs b/homework 2/ClusterTests/SmartClusterClientTest.cs index 4a9e4e6..b923c2c 100644 --- a/homework 2/ClusterTests/SmartClusterClientTest.cs +++ b/homework 2/ClusterTests/SmartClusterClientTest.cs @@ -1,6 +1,7 @@ using System; using System.Diagnostics; using System.Linq; +using System.Threading.Tasks; using ClusterClient.Clients; using FluentAssertions; using NUnit.Framework; @@ -65,5 +66,32 @@ public void ShouldNotSpendTimeOnBad() foreach(var time in ProcessRequests(6000)) time.Should().BeCloseTo(TimeSpan.FromMilliseconds(4000), Epsilon); } + + [Test] + public async Task ShouldLearnBetweenSequentialRequests() + { + var slow = CreateServer(3000); + var fast = CreateServer(200); + + var addresses = new[] + { + $"http://127.0.0.1:{slow.ServerOptions.Port}/{slow.ServerOptions.MethodName}/", + $"http://127.0.0.1:{fast.ServerOptions.Port}/{fast.ServerOptions.MethodName}/" + }; + + var client = CreateClient(addresses); + + var sw1 = Stopwatch.StartNew(); + await client.ProcessRequestAsync("00000001", TimeSpan.FromMilliseconds(5000)); + sw1.Stop(); + + var sw2 = Stopwatch.StartNew(); + await client.ProcessRequestAsync("00000002", TimeSpan.FromMilliseconds(5000)); + sw2.Stop(); + + sw2.Elapsed.Should().BeLessThan(sw1.Elapsed); + + sw2.Elapsed.Should().BeCloseTo(TimeSpan.FromMilliseconds(200), Epsilon); + } } } \ No newline at end of file