Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -12,11 +15,25 @@ public abstract class ClusterClientBase
{
protected string[] ReplicaAddresses { get; set; }

private static readonly ConcurrentDictionary<string, long> _responseTimes
= new ConcurrentDictionary<string, long>();

protected ClusterClientBase(string[] replicaAddresses)
{
ReplicaAddresses = replicaAddresses;
}

protected IEnumerable<string> SortedReplicas =>
ReplicaAddresses.OrderBy(r => _responseTimes.GetOrAdd(r, _ => 1000));

protected void RecordResponse(string replica, long elapsedMs, bool success)
{
if (success)
_responseTimes.AddOrUpdate(replica, elapsedMs, (_, avg) => (avg + elapsedMs) / 2);
else
_responseTimes.AddOrUpdate(replica, 2000, (_, avg) => Math.Min(avg * 2, 30000));
}

public abstract Task<string> ProcessRequestAsync(string query, TimeSpan timeout);
protected abstract ILog Log { get; }

Expand Down
28 changes: 26 additions & 2 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using log4net;
Expand All @@ -13,9 +15,31 @@ public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var tasks = ReplicaAddresses
.Select(addr => CreateRequest(addr + "?query=" + query))
.Select(req => ProcessRequestAsync(req))
.ToList();

var overallTimeoutTask = Task.Delay(timeout);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

По-хорошему timeoutTask бы создать до вызовов ProcessRequestAsync. Иначе выходит, что запросы уже какое-то время как отправлены


while (tasks.Count > 0)
{
var completedTask = await Task.WhenAny(tasks.Concat(new[] { overallTimeoutTask }));

if (completedTask == overallTimeoutTask)
throw new TimeoutException("No replica responded within timeout");

tasks.Remove((Task<string>)completedTask);

if (completedTask.IsCompletedSuccessfully)
return await (Task<string>)completedTask;

Log.ErrorFormat("Replica failed: {0}", completedTask.Exception?.InnerException?.Message);
}

throw new Exception("All replicas failed");
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
Expand Down
47 changes: 45 additions & 2 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using log4net;
Expand All @@ -13,9 +15,50 @@ public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresse
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var timeoutPerRequest = new TimeSpan();
var stopwatch = new Stopwatch();
var remainingReplicas = ReplicaAddresses.Length;

stopwatch.Start();

foreach (var replica in SortedReplicas)
{
var remainingTime = timeout - stopwatch.Elapsed;
if (remainingTime <= TimeSpan.Zero)
throw new TimeoutException("No replica responded within overall timeout");

timeoutPerRequest = remainingTime.Divide(remainingReplicas);

remainingReplicas--;

var webRequest = CreateRequest(replica + "?query=" + query);
Log.InfoFormat($"Processing {webRequest.RequestUri}");

var requestTask = ProcessRequestAsync(webRequest);

var completedTask = await Task.WhenAny(requestTask, Task.Delay(timeoutPerRequest));

if (completedTask != requestTask)
{
RecordResponse(replica, 0, false);
continue;
}

try
{
RecordResponse(replica, stopwatch.ElapsedMilliseconds, true);
return requestTask.Result;
}
catch (Exception ex)
{
RecordResponse(replica, 0, false);
Log.ErrorFormat($"Replica {replica} failed: {ex.Message}");
}
}

throw new TimeoutException("All replicas failed");
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
Expand Down
83 changes: 81 additions & 2 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
using log4net;
Expand All @@ -13,9 +15,86 @@ public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
throw new NotImplementedException();
var stopwatch = Stopwatch.StartNew();
var tasks = new Dictionary<Task<string>, string>();
var replicasToCall = ReplicaAddresses.Length;

var ordered = SortedReplicas.ToList();
var timeoutPerReplica = timeout.Divide(ordered.Count);
var replicasLeft = ordered.Count;

foreach (var replica in ordered)
{
if (stopwatch.Elapsed >= timeout)
break;

var webRequest = CreateRequest(replica + "?query=" + query);
Log.InfoFormat($"Processing {webRequest.RequestUri}");

var requestTask = ProcessRequestAsync(webRequest);
tasks[requestTask] = replica;


var weight = ReplicaAddresses.Length - replicasToCall + 1;
var targetTimeForNextStart = timeoutPerReplica * weight;

var waitTime = targetTimeForNextStart - stopwatch.Elapsed;
var waitTask = Task.Delay(waitTime);

replicasToCall--;

if (waitTime > TimeSpan.Zero)
{
var completedTask = await Task.WhenAny(tasks.Keys.Concat(new[] { waitTask }));

if(completedTask == waitTask)
continue;

var addr = tasks[(Task<string>)completedTask];
tasks.Remove((Task<string>)completedTask);

try
{
RecordResponse(addr, stopwatch.ElapsedMilliseconds, true);
return await (Task<string>)completedTask;
}
catch (Exception ex)
{
RecordResponse(addr, 0, false);
Log.ErrorFormat($"Replica request failed: {ex.Message}");
}
}
}

while (tasks.Count > 0 && stopwatch.Elapsed < timeout)
{
var remainingTime = timeout - stopwatch.Elapsed;
var waitTask = Task.Delay(remainingTime);

var completedTask = await Task.WhenAny(tasks.Keys.Concat(new[] { waitTask }));

if (completedTask == waitTask)
break;
Comment on lines 78 to 79
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В этом случае не вызовется RecordResponse для затормозивших реплик
Аналогично, если нарушится условие цикла (если перед очередной итерацией станет stopwatch.Elapsed >= timeout)


var addr = tasks[(Task<string>)completedTask];
tasks.Remove((Task<string>)completedTask);


try
{
RecordResponse(addr, stopwatch.ElapsedMilliseconds, true);
return await (Task<string>)completedTask;
}
catch (Exception ex)
{
RecordResponse(addr, 0, false);
Log.ErrorFormat($"Replica request failed: {ex.Message}");
}
}

throw new TimeoutException("All replicas failed");
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
Expand Down