Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -10,12 +12,48 @@ namespace ClusterClient.Clients
{
public abstract class ClusterClientBase
{
protected string[] ReplicaAddresses { get; set; }
protected string[] ReplicaAddresses { get; }
private readonly object statsLock = new ();
private readonly Dictionary<string, (double AvgMs, int Count)> stats = new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Предложение на будущее: Лучше использовать контракты, создать класс или record или вообще структуру для хранения данных. Это более удобно использовать, читать и поддерживать)

private readonly Dictionary<string, int> originalIndex;


protected ClusterClientBase(string[] replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
originalIndex = replicaAddresses
.Select((a, i) => (Addr: a, Index: i))
.ToDictionary(x => x.Addr, x => x.Index);
}

protected string[] GetReplicasOrderedBySpeed()
{
lock (statsLock)
Comment on lines +29 to +31
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Лучше в асинхроном коде не использовать синхронные блокировки. Это влияет на производительность

Какие варианты более правильны:

  1. Если есть возможность сделать эти методы асинхронными (Task<string[]>), то за место обычного lock(obj) использовать SemaphoreSlim(1,1) —  его ожидание асинхронное, не сильно влияет на производительность
  2. Если нет возможности превратить эти методы в асинхронные, то лучше тогда исопльзовать LockFree коллекции, к примеру ConcurrentDictionary и использовать его возможности (в основном для записи AddOrUpdate или если данные не слишком важны, то TryUpdate

{
return ReplicaAddresses
.OrderBy(a => stats.TryGetValue(a, out var s) ? s.AvgMs : double.MaxValue)
.ThenBy(a => originalIndex[a])
.ToArray();
}
}

protected void RecordReplicaTime(string address, long elapsedMs)
{
lock (statsLock)
Comment on lines +40 to +42
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут то же замечание, что и выше

{
if (!stats.TryGetValue(address, out var s))
{
stats[address] = (elapsedMs, 1);
return;
}
var newAvg = (s.AvgMs * s.Count + elapsedMs) / (s.Count + 1);
stats[address] = (newAvg, s.Count + 1);
}
}

protected void RecordReplicaPenalty(string address, long penaltyMs)
=> RecordReplicaTime(address, penaltyMs);


public abstract Task<string> ProcessRequestAsync(string query, TimeSpan timeout);
protected abstract ILog Log { get; }
Expand Down
39 changes: 30 additions & 9 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,44 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class ParallelClusterClient : ClusterClientBase
public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
}
var stopwatch = Stopwatch.StartNew();
var tasks = ReplicaAddresses
.Select(addr =>
{
var request = CreateRequest(addr + "?query=" + query);
return ProcessRequestAsync(request);
})
.ToList();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
Exception lastError = null;
while (tasks.Count > 0)
{
var remaining = timeout - stopwatch.Elapsed;
var timeoutTask = Task.Delay(remaining);
var completed = await Task.WhenAny(tasks.Append(timeoutTask));
if (completed == timeoutTask)
throw new TimeoutException();

var finished = (Task<string>)completed;
if (finished.Status == TaskStatus.RanToCompletion)
return finished.Result;

try { await finished; }
catch (Exception ex) { lastError = ex; }
tasks.Remove(finished);
}
throw lastError ?? new Exception("Ни одна реплика не ответила");
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
}
}
2 changes: 0 additions & 2 deletions homework 2/ClusterClient/Clients/RandomClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ public override async Task<string> ProcessRequestAsync(string query, TimeSpan ti
var uri = ReplicaAddresses[random.Next(ReplicaAddresses.Length)];

var webRequest = CreateRequest(uri + "?query=" + query);

Log.InfoFormat($"Processing {webRequest.RequestUri}");

var resultTask = ProcessRequestAsync(webRequest);

Expand Down
48 changes: 38 additions & 10 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,51 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class RoundRobinClusterClient : ClusterClientBase
public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
}
var stopwatch = Stopwatch.StartNew();
Exception lastError = null;
var replicas = GetReplicasOrderedBySpeed();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
for (var i = 0; i < replicas.Length; i++)
{
var remaining = timeout - stopwatch.Elapsed;
var remainingReplicas = replicas.Length - i;
var slice = TimeSpan
.FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas));
var addr = replicas[i];
var request = CreateRequest(addr+ "?query=" + query);
var attemptSw = Stopwatch.StartNew();
var task = ProcessRequestAsync(request);
var completed = await Task.WhenAny(task, Task.Delay(slice));
attemptSw.Stop();
if (completed != task)
{
RecordReplicaPenalty(addr, (long)slice.TotalMilliseconds);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Единственное, у нас при таймауте записываться будет в статистику не реальное время выполнения, а только время текущего кусочка таймаута — в некоторых случаях такое поведение может быть не валидным; Но в данном случае не страшно

continue;
}

try
{
var result = await task;
RecordReplicaTime(addr, attemptSw.ElapsedMilliseconds);
return result;
}
catch (Exception ex)
{
lastError = ex;
RecordReplicaPenalty(addr, Math.Max(1, attemptSw.ElapsedMilliseconds * 2));
}
}
throw lastError ?? new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
}
}
68 changes: 60 additions & 8 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,75 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class SmartClusterClient : ClusterClientBase
public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
}
var stopwatch = Stopwatch.StartNew();
var runningTasks = new List<(string Addr, Task<string> Task, Stopwatch Timer)>();
Exception lastError = null;
var replicas = GetReplicasOrderedBySpeed();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
for (var i = 0; i < replicas.Length; i++)
{
var remaining = timeout - stopwatch.Elapsed;
var remainingReplicas = replicas.Length - i;
var slice = TimeSpan
.FromMilliseconds(Math.Max(1, remaining.TotalMilliseconds / remainingReplicas));
var addr = replicas[i];
var request = CreateRequest(addr + "?query=" + query);

var tmr = Stopwatch.StartNew();
var task = ProcessRequestAsync(request);
runningTasks.Add((addr, task, tmr));
var sliceEnd = stopwatch.Elapsed + slice;

while (true)
{
for (var k = runningTasks.Count - 1; k >= 0; k--)
{
var runningTask = runningTasks[k];
if (!runningTask.Task.IsCompleted) continue;

runningTasks.RemoveAt(k);
runningTask.Timer.Stop();
try
{
var result = await runningTask.Task;
RecordReplicaTime(runningTask.Addr, runningTask.Timer.ElapsedMilliseconds);
return result;
}
catch (Exception ex)
{
lastError = ex;
RecordReplicaPenalty(runningTask.Addr, Math.Max(1, runningTask.Timer.ElapsedMilliseconds * 2));
}
}

var now = stopwatch.Elapsed;
var leftTotal = timeout - now;
var leftSlice = sliceEnd - now;
if (leftSlice <= TimeSpan.Zero)
break;

if (runningTasks.Count == 0)
break;

var delay = Task.Delay(leftSlice < leftTotal ? leftSlice : leftTotal);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Есть небольшая потенциальная проблема: не учитывается, что leftTotal может стать отрицательным (если общий таймаут уже истёк). Передача отрицательного значения в Task.Delay вызовет исключение ArgumentOutOfRangeException

var any = await Task.WhenAny(runningTasks.Select(x => x.Task).Append(delay));
if (any == delay)
break;
}
}
throw lastError ?? new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}
}
}