Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -12,6 +14,11 @@ public abstract class ClusterClientBase
{
protected string[] ReplicaAddresses { get; set; }

private readonly Dictionary<string, List<TimeSpan>> _responseHistory =
new Dictionary<string, List<TimeSpan>>();

private readonly object _historyLock = new object();

protected ClusterClientBase(string[] replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
Expand Down Expand Up @@ -40,5 +47,34 @@ protected async Task<string> ProcessRequestAsync(WebRequest request)
return result;
}
}


protected void UpdateStatistics(string replicaAddress,
TimeSpan responseTime)
{
lock (_historyLock)
{
if (!_responseHistory.ContainsKey(replicaAddress))
_responseHistory[replicaAddress] = [];

var list = _responseHistory[replicaAddress];
list.Add(responseTime);
if (list.Count > 10)
list.RemoveAt(0);
}
}

protected IEnumerable<string> GetOrderedReplicas()
{
lock (_historyLock)
{
return ReplicaAddresses.OrderBy(addr =>
{
if (_responseHistory.TryGetValue(addr, out var times) && times.Count > 0)
return times.Average(t => t.TotalMilliseconds);
return double.MaxValue;
});
}
Comment on lines +69 to +77
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Сам по себе вызов OrderBy ничего не делает. Лишь когда после OrderBy применишь foreach, ToList() или подобное -- лишь тогда действительно произойдёт сортировка, и, соответственно, лишь тогда будет вызван делегат переданный аргументом в OrderBy -- т.е. обращения к словарю будут вне lock'а

}
}
}
73 changes: 68 additions & 5 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using log4net;

Expand All @@ -12,12 +12,75 @@ public class ParallelClusterClient : ClusterClientBase
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
using var cts = new CancellationTokenSource();
var tasks = CreateRequestTasks(query, cts.Token);
return await AwaitFirstSuccessAsync(tasks, timeout, cts);
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
private List<(Task<string> Task, string Address)> CreateRequestTasks(string query, CancellationToken token)
{
var tasks = new List<(Task<string>, string)>();
foreach (var addr in ReplicaAddresses)
{
var task = ProcessSingleRequestAsync(addr, query, token);
tasks.Add((task, addr));
}
return tasks;
}

private async Task<string> AwaitFirstSuccessAsync(List<(Task<string> Task, string Address)> tasks, TimeSpan timeout, CancellationTokenSource cts)
{
var timeoutTask = Task.Delay(timeout, cts.Token);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По-хорошему timeoutTask бы создать до цикла c ProcessSingleRequestAsync. Иначе выходит, что запросы уже какое-то время как отправлены


while (tasks.Count > 0)
{
var completed = await Task.WhenAny(tasks.Select(t => t.Task).Append(timeoutTask));
if (completed == timeoutTask)
{
cts.Cancel();
throw new TimeoutException("Request timed out");
}

var index = tasks.FindIndex(t => t.Task == completed);
var (_, address) = tasks[index];
tasks.RemoveAt(index);

try
{
var result = await (Task<string>)completed;
cts.Cancel();
return result;
}
catch (Exception ex)
{
Log.InfoFormat("Request to {0} failed: {1}", address, ex.Message);
}
}

throw new Exception("All replicas failed to return a successful response");
}

private async Task<string> ProcessSingleRequestAsync(string replicaAddress, string query, CancellationToken token)
{
var uri = $"{replicaAddress}?query={Uri.EscapeDataString(query)}";
var request = CreateRequest(uri);

await using (token.Register(() => request.Abort()))
{
try
{
return await ProcessRequestAsync(request).ConfigureAwait(false);
}
catch (Exception ex) when (token.IsCancellationRequested)
{
throw new OperationCanceledException("Request was cancelled", ex, token);
}
}
}
}
}
}
87 changes: 82 additions & 5 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using log4net;

Expand All @@ -13,11 +16,85 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var orderedReplicas = GetOrderedReplicas().ToArray();
var totalTimeout = timeout;
var remainingTime = totalTimeout;
var stopwatch = Stopwatch.StartNew();

for (var i = 0; i < orderedReplicas.Length; i++)
{
var replicasLeft = orderedReplicas.Length - i;
var perReplicaTimeout = remainingTime / replicasLeft;

var result = await TryRequestReplicaAsync(orderedReplicas[i], query, perReplicaTimeout).ConfigureAwait(false);
if (result != null)
return result;

remainingTime = totalTimeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
throw new TimeoutException("Request timed out");
}

throw new Exception("All replicas failed to return a successful response");
}

private async Task<string?> TryRequestReplicaAsync(string replicaAddress, string query, TimeSpan timeout)
{
var uri = $"{replicaAddress}?query={Uri.EscapeDataString(query)}";
var request = CreateRequest(uri);
using var timeoutCts = new CancellationTokenSource();
var delayTask = Task.Delay(timeout, timeoutCts.Token);
var stopwatch = Stopwatch.StartNew();

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
try
{
var responseTask = request.GetResponseAsync();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Почему не используешь ProcessRequestAsync(WebRequest)?

var completedTask = await Task.WhenAny(responseTask, delayTask).ConfigureAwait(false);

if (completedTask == delayTask)
{
timeoutCts.Cancel();
request.Abort();
try
{
await responseTask;
}
catch (Exception ex)
{
Log.InfoFormat("Replica {0} timed out: {1}", replicaAddress, ex.Message);
}

return null;
}

timeoutCts.Cancel();
using var response = await responseTask.ConfigureAwait(false);
var httpResponse = (HttpWebResponse)response;

if (httpResponse.StatusCode != HttpStatusCode.OK)
{
Log.InfoFormat("Replica {0} returned status {1}", replicaAddress, httpResponse.StatusCode);
return null;
}

using var reader = new StreamReader(response.GetResponseStream(), Encoding.UTF8);
var result = await reader.ReadToEndAsync().ConfigureAwait(false);
Log.InfoFormat("Replica {0} succeeded", replicaAddress);

stopwatch.Stop();
UpdateStatistics(replicaAddress, stopwatch.Elapsed);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Обновляешь статистику только для успешных и вовремя ответивших реплик. Может случиться так, что реплика всё время 500-тит или отвечает дольше чем за perReplicaTimeout, а при этом опрашивается всегда первой


return result;
}
catch (Exception ex)
{
Log.InfoFormat("Replica {0} failed: {1}", replicaAddress, ex.Message);
return null;
}
}
}
}
}
113 changes: 109 additions & 4 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using log4net;
Expand All @@ -13,11 +16,113 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));

public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var orderedReplicas = GetOrderedReplicas().ToArray();
var totalTimeout = timeout;
var stopwatch = Stopwatch.StartNew();
var taskToAddress = new Dictionary<Task<string>, string>();
var i = 0;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В чём смысл этой переменной? По использованию не очень понятно, что именно она считает


while (i < orderedReplicas.Length)
{
if (!taskToAddress.Values.Contains(orderedReplicas[i]))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Разве есть случаи, когда условие может оказаться ложным?

{
var addr = orderedReplicas[i];
var task = RequestReplicaAsync(addr, query);
taskToAddress[task] = addr;
}

var remainingTime = totalTimeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
throw new TimeoutException("Request timed out");

var replicasLeft = orderedReplicas.Length - i;
var stepTimeout = remainingTime / replicasLeft;

var completedTask = await WaitForAnyTaskOrTimeoutAsync(taskToAddress.Keys, stepTimeout)
.ConfigureAwait(false);

if (completedTask != null)
{
var addr = taskToAddress[completedTask];
var result = await TryGetResultAsync(completedTask, addr).ConfigureAwait(false);
if (result != null)
return result;

taskToAddress.Remove(completedTask);

if (taskToAddress.Count == 0)
i++;
}
else
i++;
}

while (taskToAddress.Count > 0)
{
var remainingTime = totalTimeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
throw new TimeoutException("Request timed out");

var completedTask = await WaitForAnyTaskOrTimeoutAsync(taskToAddress.Keys, remainingTime)
.ConfigureAwait(false);
if (completedTask == null)
throw new TimeoutException("Request timed out");

var addr = taskToAddress[completedTask];
var result = await TryGetResultAsync(completedTask, addr).ConfigureAwait(false);
if (result != null)
return result;

taskToAddress.Remove(completedTask);
}

throw new Exception("All replicas failed to return a successful response");
}

private async Task<string> RequestReplicaAsync(string replicaAddress, string query)
{
throw new NotImplementedException();
var uri = $"{replicaAddress}?query={Uri.EscapeDataString(query)}";
var request = CreateRequest(uri);
var stopwatch = Stopwatch.StartNew();

using var response = await request.GetResponseAsync().ConfigureAwait(false);
var httpResponse = (HttpWebResponse)response;
if (httpResponse.StatusCode != HttpStatusCode.OK)
throw new Exception($"Replica returned status {httpResponse.StatusCode}");

using var reader = new StreamReader(response.GetResponseStream(), Encoding.UTF8);
var result = await reader.ReadToEndAsync().ConfigureAwait(false);

stopwatch.Stop();
UpdateStatistics(replicaAddress, stopwatch.Elapsed);

return result;
}

private async Task<Task<string>> WaitForAnyTaskOrTimeoutAsync(IEnumerable<Task<string>> tasks, TimeSpan timeout)
{
var timeoutTask = Task.Delay(timeout);
var completed = await Task.WhenAny(tasks.Append(timeoutTask)).ConfigureAwait(false);
return completed == timeoutTask ? null : (Task<string>)completed;
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
private async Task<string> TryGetResultAsync(Task<string> task, string address)
{
try
{
var result = await task.ConfigureAwait(false);
Log.InfoFormat("Replica {0} succeeded", address);
return result;
}
catch (Exception ex)
{
Log.InfoFormat("Replica {0} failed: {1}", address, ex.Message);
return null;
}
}
}
}
}