Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,15 @@ protected async Task<string> ProcessRequestAsync(WebRequest request)
return result;
}
}

protected Task<string> CallRequest(string address, string query)
{
var uri = new UriBuilder(address)
{
Query = $"query={query}"
}.Uri.AbsoluteUri;

return ProcessRequestAsync(CreateRequest(uri));
}
}
}
38 changes: 36 additions & 2 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using log4net;

Expand All @@ -11,11 +12,44 @@ public class ParallelClusterClient : ClusterClientBase
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var tasks = ReplicaAddresses
.Select(address => CallRequest(address, query))
.ToList();

var timeoutTask = Task.Delay(timeout).ContinueWith<string>(_ => throw new TimeoutException());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По-хорошему timeoutTask бы создать до всех CallRequest. Иначе выходит, что запросы уже какое-то время как отправлены

tasks.Add(timeoutTask);

while (tasks.Count > 1)
{
var completedTask = await Task.WhenAny(tasks);
if (completedTask == timeoutTask)
await timeoutTask;
try
{
return await completedTask;
}
catch
{
tasks.Remove(completedTask);
if (tasks.Count == 1) throw new Exception();
}
}
throw new TimeoutException();
}

private Task<string> CallRequest(string address, string query)
{
var uri = new UriBuilder(address)
{
Query = $"query={query}"
}.Uri.AbsoluteUri;

return ProcessRequestAsync(CreateRequest(uri));
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
Expand Down
45 changes: 42 additions & 3 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -9,15 +11,52 @@ namespace ClusterClient.Clients
{
public class RoundRobinClusterClient : ClusterClientBase
{
private ConcurrentDictionary<string, TimeSpan> _stats;

public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
_stats = new ();
foreach (var addr in replicaAddresses)
{
_stats.TryAdd(addr, TimeSpan.MaxValue);
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
}
var orderedReplicas = ReplicaAddresses
.OrderBy(addr => _stats.GetValueOrDefault(addr, TimeSpan.MaxValue))
.ToArray();

for (int i = 0; i < ReplicaAddresses.Length; i++)
{
var replicaTimeout = timeout / (ReplicaAddresses.Length - i);
var address = orderedReplicas[i];
var task = CallRequest(address, query);
var timeoutTask = Task.Delay(replicaTimeout);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout и replicaTimeout могут оказаться отрицательными. Task.Delay может или выбросить исключение, или, с маленьким шансом, превратиться в никогда не заканчивающуюся таску

var sw = Stopwatch.StartNew();
var completedTask = await Task.WhenAny(task, timeoutTask);

sw.Stop();
_stats[address] = sw.Elapsed;
timeout -= sw.Elapsed;
if (completedTask == timeoutTask)
{
continue;
}

try
{
return await task;
}
catch
{
_stats[address] = TimeSpan.MaxValue;
}
}
throw new TimeoutException();
}
protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
}
51 changes: 49 additions & 2 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -9,13 +11,58 @@ namespace ClusterClient.Clients
{
public class SmartClusterClient : ClusterClientBase
{
private ConcurrentDictionary<string, TimeSpan> _stats;

public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
_stats = new ();
foreach (var addr in replicaAddresses)
{
_stats.TryAdd(addr, TimeSpan.MaxValue);
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var runningTasks = new Dictionary<Task<string>, string>();

var orderedReplicas = ReplicaAddresses
.OrderBy(addr => _stats.GetValueOrDefault(addr, TimeSpan.MaxValue))
.ToArray();

for (int i = 0; i < ReplicaAddresses.Length; i++)
{
var replicaTimeout = timeout / (ReplicaAddresses.Length - i);
var address = orderedReplicas[i];
var task = CallRequest(address, query);
var timeoutTask = Task.Delay(replicaTimeout).ContinueWith<string>(_ => throw new TimeoutException());
runningTasks.Add(task, address);
runningTasks.Add(timeoutTask, "timeout");
var sw = Stopwatch.StartNew();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Здесь timeoutTask и Stopwatch тоже лучше перед CallRequest делать

var completedTask = await Task.WhenAny(runningTasks.Keys);
sw.Stop();

runningTasks.Remove(timeoutTask);

timeout -= sw.Elapsed;
if (completedTask == timeoutTask)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если реплики будут тормозить, то соответствующие таски так и останутся в runningTasks, и статистика для них не обновится

{
continue;
}

try
{
_stats[runningTasks[completedTask]] = sw.Elapsed;
return await completedTask;
}
catch
{
_stats[runningTasks[completedTask]] = TimeSpan.MaxValue;
}
runningTasks.Remove(completedTask);
}
throw new TimeoutException();
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
Expand Down