Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -10,6 +13,12 @@ namespace ClusterClient.Clients
{
public abstract class ClusterClientBase
{
private class ReplicaStats
{
public double AverageMs;
public int Count;
}

protected string[] ReplicaAddresses { get; set; }

protected ClusterClientBase(string[] replicaAddresses)
Expand All @@ -19,7 +28,9 @@ protected ClusterClientBase(string[] replicaAddresses)

public abstract Task<string> ProcessRequestAsync(string query, TimeSpan timeout);
protected abstract ILog Log { get; }


private readonly ConcurrentDictionary<string, ReplicaStats> _stats = [];

protected static HttpWebRequest CreateRequest(string uriStr)
{
var request = WebRequest.CreateHttp(Uri.EscapeUriString(uriStr));
Expand All @@ -40,5 +51,42 @@ protected async Task<string> ProcessRequestAsync(WebRequest request)
return result;
}
}

protected List<string> OrderReplicas()
{
return ReplicaAddresses
.OrderBy(r => _stats.TryGetValue(r, out var s) ? s.AverageMs : double.MaxValue).ToList();
}

protected void UpdateStats(string replica, long elapsedMs)
{
_stats.AddOrUpdate(
replica,
_ => new ReplicaStats { AverageMs = elapsedMs, Count = 1 },
(_, old) =>
{
var newCount = old.Count + 1;
var newAvg = (old.AverageMs * old.Count + elapsedMs) / newCount;

return new ReplicaStats
{
AverageMs = newAvg,
Count = newCount
};
});
}

protected async Task<string> ProcessWithStatsAsync(string replica, string query)
{
var request = CreateRequest($"{replica}?query={query}");
var stopwatch = Stopwatch.StartNew();

var result = await ProcessRequestAsync(request);

stopwatch.Stop();
UpdateStats(replica, stopwatch.ElapsedMilliseconds);
Comment on lines +81 to +87
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут статистика обновляется только если запрос успешный, если он не успешный, то статистика никогда, не критично, но не будет отображать реальную картину


return result;
}
}
}
28 changes: 26 additions & 2 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,33 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var tasks = ReplicaAddresses
.Select(address =>
{
var request = CreateRequest($"{address}?query={query}");
return ProcessRequestAsync(request);
})
.ToList();

var timeoutTask = Task.Delay(timeout);

while (tasks.Count != 0)
{
var completed = await Task.WhenAny(tasks.Append(timeoutTask));

if (completed == timeoutTask)
throw new TimeoutException();

var finishedTask = (Task<string>)completed;
tasks.Remove(finishedTask);

if (finishedTask.IsCompletedSuccessfully)
return finishedTask.Result;
}

throw new Exception("All replicas failed");
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
Expand Down
15 changes: 13 additions & 2 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var replicas = OrderReplicas().ToList();
foreach (var replica in replicas)
{
var task = ProcessWithStatsAsync(replica, query);

await Task.WhenAny(task, Task.Delay(timeout / ReplicaAddresses.Length));

if (task.IsCompletedSuccessfully)
return task.Result;
}

throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
Expand Down
24 changes: 22 additions & 2 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,29 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var replicas = OrderReplicas().ToArray();
var tasks = new List<Task<string>>();
var timeoutTask = Task.Delay(timeout);

foreach (var replica in replicas)
{
var task = ProcessWithStatsAsync(replica, query);
tasks.Add(task);

await Task.WhenAny(task, Task.Delay(timeout / replicas.Length), timeoutTask);
Copy link
Copy Markdown
Contributor

@AMEST AMEST Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нужно было учесть, что запрос может упасть и он занял меньше времени чем выделенный ему таймаут и потратить оставшееся время на других запросы, чтобы они прошли

В целом логика должна была быть такая:

  • Сохраняем в отдельной переменной время которое у нас есть на выполнение (инициализируем временем = timeout)
  • Сохраняем количество реплик (replicas.Lenght)
  • В цикле высчитываем perReplicaTimeout = время которое есть на выполнение / количество реплик
  • В цикле запускаем таймер перед await Task.WhenAny и останавливаем его после
  • Проверяем, можем ли мы отдать ответ сейчас, не общий ли таймаут, не упавший ли запрос
  • В конце перед переходом к следующей итерации, чтобы правильно высчитать perReplicaTimeout для следующей реплики:
    • уменьшать количество реплик на 1
    • уменьшаем время на выполнение на основе данных таймера (remainingTime - sw.Elapsed)

В итоге мы для каждого последующего запроса будем делать индивидуальный perReplicaTimeout учитывая сколько времени (реального) потратили на предыдущую попытку

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Так же для эффективности, здесь можно ждать не только текущую таску и таймауты, но и все уже запущенные.

Тогда мы можем быстрее среагировать, если кто-то какой-то предыдущий запрос успел выполнится.

НО нужно выкидывать из списка выполняющихся таск упавшие, чтобы они не мешались в WhenAny


if (task.IsCompletedSuccessfully)
return task.Result;
}

var final = await Task.WhenAny(tasks.Append(timeoutTask));

if (final == timeoutTask)
throw new TimeoutException();

return await (Task<string>)final;
Comment on lines +33 to +38
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь в финале нам может вернуться не успешно выполненная, а упавшая таска и тогда мы в return получим исключение. Тут бы хорошо в цикле проходится по всем таскам и возвращать только если она успешно выполнена, попутно проверяя не закончилось ли время у timeoutTask

}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
Expand Down