Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,51 @@ namespace ClusterClient.Clients
{
public class ParallelClusterClient : ClusterClientBase
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
public ParallelClusterClient(string[] replicaAddresses)
: base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var tasks = ReplicaAddresses
.Select(address => ProcessReplica(address, query))
.ToList();

var timeoutTask = Task.Delay(timeout);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По-хорошему timeoutTask бы создать до всех ProcessRequestAsync. Иначе выходит, что запросы уже какое-то время как отправлены


while (tasks.Any())
{
var completed = await Task.WhenAny(tasks.Cast<Task>().Append(timeoutTask));

if (completed == timeoutTask)
throw new TimeoutException();

var task = (Task<string>)completed;
tasks.Remove(task);

if (task.Result != null)
return task.Result;
}

throw new TimeoutException();
}

private async Task<string> ProcessReplica(string address, string query)
{
try
{
var request = CreateRequest(address + "?query=" + query);
Log.InfoFormat($"Processing {request.RequestUri}");
return await ProcessRequestAsync(request);
}
catch
{
return null;
}
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
protected override ILog Log =>
LogManager.GetLogger(typeof(ParallelClusterClient));
}
}
7 changes: 7 additions & 0 deletions homework 2/ClusterClient/Clients/ReplicaStats.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace ClusterClient.Clients;

public class ReplicaStats
{
public double AverageResponseTime;
public int Count;
}
99 changes: 95 additions & 4 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -9,15 +10,105 @@ namespace ClusterClient.Clients
{
public class RoundRobinClusterClient : ClusterClientBase
{
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
private readonly Dictionary<string, ReplicaStats> statistics = new();
private readonly object statsLock = new();

public RoundRobinClusterClient(string[] replicaAddresses)
: base(replicaAddresses)
{
}

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var startTime = DateTime.UtcNow;

var orderedAddresses = GetOrderedReplicas();

for (int i = 0; i < orderedAddresses.Length; i++)
{
var elapsed = DateTime.UtcNow - startTime;
var remaining = timeout - elapsed;

if (remaining <= TimeSpan.Zero)
throw new TimeoutException();

var remainingReplicas = orderedAddresses.Length - i;
var perReplicaTimeout = TimeSpan.FromMilliseconds(
remaining.TotalMilliseconds / remainingReplicas);

var address = orderedAddresses[i];

var stopwatch = Stopwatch.StartNew();
var requestTask = ProcessReplica(address, query);
var delayTask = Task.Delay(perReplicaTimeout);

var completed = await Task.WhenAny(requestTask, delayTask);

if (completed == requestTask)
{
var result = await requestTask;
if (result != null)
{
stopwatch.Stop();
UpdateStatistics(address, stopwatch.ElapsedMilliseconds);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если время вышло или вылетела ошибка, статистика не обновится

return result;
}
}
}

throw new TimeoutException();
}

private async Task<string> ProcessReplica(string address, string query)
{
try
{
var request = CreateRequest(address + "?query=" + query);
Log.InfoFormat($"Processing {request.RequestUri}");
return await ProcessRequestAsync(request);
}
catch
{
return null;
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
private string[] GetOrderedReplicas()
{
throw new NotImplementedException();
lock (statsLock)
{
return ReplicaAddresses
.OrderBy(addr =>
statistics.TryGetValue(addr, out var stat)
? stat.AverageResponseTime
: double.MaxValue)
.ToArray();
}
}

private void UpdateStatistics(string address, long elapsedMs)
{
lock (statsLock)
{
if (!statistics.TryGetValue(address, out var stat))
{
statistics[address] = new ReplicaStats
{
AverageResponseTime = elapsedMs,
Count = 1
};
}
else
{
stat.AverageResponseTime =
(stat.AverageResponseTime * stat.Count + elapsedMs) / (stat.Count + 1);

stat.Count++;
}
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
protected override ILog Log =>
LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
}
168 changes: 150 additions & 18 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,155 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public class SmartClusterClient : ClusterClientBase
 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
{
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
public class SmartClusterClient : ClusterClientBase
{
}
private readonly Dictionary<string, ReplicaStats> statistics = new();
private readonly object statsLock = new();

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
}
public SmartClusterClient(string[] replicaAddresses)
: base(replicaAddresses)
{
}

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var startTime = DateTime.UtcNow;
var tasks = new List<(string Address, Task<string> Task, Stopwatch Timer)>();

var orderedAddresses = GetOrderedReplicas();

for (int i = 0; i < orderedAddresses.Length; i++)
{
var elapsed = DateTime.UtcNow - startTime;
var remaining = timeout - elapsed;

if (remaining <= TimeSpan.Zero)
throw new TimeoutException();

var remainingReplicas = orderedAddresses.Length - i;
var perReplicaTimeout = TimeSpan.FromMilliseconds(
remaining.TotalMilliseconds / remainingReplicas);

var address = orderedAddresses[i];

var stopwatch = Stopwatch.StartNew();
var task = ProcessReplica(address, query);

tasks.Add((address, task, stopwatch));

var delayTask = Task.Delay(perReplicaTimeout);
var completed = await Task.WhenAny(
tasks.Select(t => t.Task).Cast<Task>().Append(delayTask));

if (completed != delayTask)
{
var finished = tasks.First(t => t.Task == completed);
var result = await finished.Task;

if (result != null)
{
finished.Timer.Stop();
UpdateStatistics(finished.Address, finished.Timer.ElapsedMilliseconds);
return result;
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
tasks.Remove(finished);
}
}

var remainingFinal = timeout - (DateTime.UtcNow - startTime);
if (remainingFinal <= TimeSpan.Zero)
throw new TimeoutException();

var timeoutTask = Task.Delay(remainingFinal);

while (tasks.Any())
{
var completed = await Task.WhenAny(
tasks.Select(t => t.Task).Cast<Task>().Append(timeoutTask));

if (completed == timeoutTask)
throw new TimeoutException();

var finished = tasks.First(t => t.Task == completed);
var result = await finished.Task;

if (result != null)
{
finished.Timer.Stop();
UpdateStatistics(finished.Address, finished.Timer.ElapsedMilliseconds);
return result;
}

tasks.Remove(finished);
}

throw new TimeoutException();
}

private async Task<string> ProcessReplica(string address, string query)
{
try
{
var request = CreateRequest(address + "?query=" + query);
Log.InfoFormat($"Processing {request.RequestUri}");
return await ProcessRequestAsync(request);
}
catch
{
return null;
}
}

private string[] GetOrderedReplicas()
{
lock (statsLock)
{
return ReplicaAddresses
.OrderBy(addr =>
statistics.TryGetValue(addr, out var stat)
? stat.AverageResponseTime
: double.MaxValue)
.ToArray();
}
}

private void UpdateStatistics(string address, long elapsedMs)
{
lock (statsLock)
{
if (!statistics.TryGetValue(address, out var stat))
{
statistics[address] = new ReplicaStats
{
AverageResponseTime = elapsedMs,
Count = 1
};
}
else
{
stat.AverageResponseTime =
(stat.AverageResponseTime * stat.Count + elapsedMs) / (stat.Count + 1);

stat.Count++;
Comment on lines +138 to +141
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Свежие данные можно сделать более значимыми, чем старые (удалять слишком старые или умножать их на понижающий коэффициент)

}
}
}

protected override ILog Log =>
LogManager.GetLogger(typeof(SmartClusterClient));

private class ReplicaStats
{
public double AverageResponseTime;
public int Count;
}
}
}
}