Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions homework 2/ClusterClient/Clients/ClusterClientBase.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
Expand All @@ -10,6 +12,8 @@ namespace ClusterClient.Clients
{
public abstract class ClusterClientBase
{
protected static ConcurrentDictionary<string, ResponseStatistics> ResponseStatisticsHistory { get; } = [];

protected string[] ReplicaAddresses { get; set; }

protected ClusterClientBase(string[] replicaAddresses)
Expand Down Expand Up @@ -40,5 +44,27 @@ protected async Task<string> ProcessRequestAsync(WebRequest request)
return result;
}
}

protected Task<string> CreateAndRunRequestTask(string replicaAddress, string query)
{
var uri = new UriBuilder(replicaAddress)
{
Query = $"query={query}"
};
var request = CreateRequest(uri.ToString());
Log.InfoFormat($"Processing {request.RequestUri}");
return ProcessRequestAsync(request);
}

protected string[] GetFilteredAndSortedAddresses()
{
return ReplicaAddresses
.Where(t => !ResponseStatisticsHistory.TryGetValue(t, out var statistics)
|| statistics.IsSuccessful)
Comment on lines +62 to +63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

В доп задании требовалось сортировать их по времени, а не совсем исключать в случае ошибок)
Вне лабораторных условий может произойти ситуация, когда такой подход вычеркнет все реплики из-за к примеру кратковременной сетевой недоступности самого приложения. В итоге отправка запросов встанет до перезапуска.

Лучше либо делать скользящее окно — сохранять и использовать время последней ошибки и исключать из списка реплик только когда ошибка "свежая", либо просто делать их ResponseTime максимально плохим, чтобы они становились последними в списке.

Совсем исключать их навсегда — плохой подход

.OrderBy(t => ResponseStatisticsHistory.TryGetValue(t, out var statistics)
? statistics.ResponseTime
: TimeSpan.Zero)
.ToArray();
}
}
}
37 changes: 26 additions & 11 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,38 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class ParallelClusterClient : ClusterClientBase
{
public class ParallelClusterClient : ClusterClientBase
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var requestTasks = ReplicaAddresses
.Select(t => CreateAndRunRequestTask(t, query))
.ToHashSet();
var timeoutTask = Task.Delay(timeout);

while (requestTasks.Count > 0 && !timeoutTask.IsCompleted)
{
throw new NotImplementedException();
var completedTask = await Task.WhenAny(requestTasks.Append(timeoutTask));

if (completedTask == timeoutTask)
throw new TimeoutException();

if (completedTask.IsCompletedSuccessfully)
return await (Task<string>)completedTask;

requestTasks.Remove((Task<string>)completedTask);
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
throw new TimeoutException();
}
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
5 changes: 5 additions & 0 deletions homework 2/ClusterClient/Clients/ResponseStatistics.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
using System;

namespace ClusterClient.Clients;

public record ResponseStatistics(bool IsSuccessful, TimeSpan ResponseTime);
46 changes: 34 additions & 12 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,45 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class RoundRobinClusterClient : ClusterClientBase
{
public class RoundRobinClusterClient : ClusterClientBase
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var preparedAddresses = GetFilteredAndSortedAddresses();
var goodAddressesCount = preparedAddresses.Length;
var sw = new Stopwatch();
foreach (var replicaAddress in preparedAddresses)
{
throw new NotImplementedException();
if (ResponseStatisticsHistory.TryGetValue(replicaAddress, out var statistics) && !statistics.IsSuccessful)
{
goodAddressesCount--;
continue;
}

sw.Restart();
var requestTask = CreateAndRunRequestTask(replicaAddress, query);
var completedTask = await Task.WhenAny(requestTask, Task.Delay(timeout / goodAddressesCount));

if (!completedTask.IsCompletedSuccessfully)
goodAddressesCount--;
Comment on lines +31 to +32
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Если запрос не успел выполнится до истечения таймаута, в Task.WhenAny вернется Task.Delay и его результат, а он собственно всегда будет IsCompletedSuccessfully

В итоге не всегда будет уменьшаться goodAddressCount


ResponseStatisticsHistory[replicaAddress] =
new ResponseStatistics(completedTask.IsCompletedSuccessfully, sw.Elapsed);
Comment on lines +34 to +35
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут по причине выше мы записываем неверную статистику, мы для реплики записываем статистику "удачности" и "времени выполнения", вот только в случае таймаута мы в статистику будет записывать true если completedTask != requestTask.

В итоге у нас почти никогда не будет неработающих реплик, только если ошибка запроса вывалится раньше, чем отработает таймаут Task.Delay —  не совсем корректное поведение получается


if (requestTask.IsCompletedSuccessfully)
return requestTask.Result;
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
throw new TimeoutException();
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
51 changes: 41 additions & 10 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,54 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class SmartClusterClient : ClusterClientBase
{
public class SmartClusterClient : ClusterClientBase
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
var preparedAddresses = GetFilteredAndSortedAddresses();
var goodAddressesCount = preparedAddresses.Length;
var requestTasks = new HashSet<Task<string>>();
foreach (var replicaAddress in preparedAddresses)
{
throw new NotImplementedException();
if (ResponseStatisticsHistory.TryGetValue(replicaAddress, out var statistics) && !statistics.IsSuccessful)
{
goodAddressesCount--;
continue;
}

var sw = Stopwatch.StartNew();
var requestTask = CreateAndRunRequestTask(replicaAddress, query);
_ = requestTask.ContinueWith(t => ResponseStatisticsHistory[replicaAddress]
= new ResponseStatistics(t.IsCompletedSuccessfully, sw.Elapsed));
Comment on lines +31 to +32
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Тут происходит замыкание и переменная sw захватывается делегатом в ContinueWith.
Переменная sw переопределяется на каждой итерации. Все continuation будут захватывать одну и ту же переменную, а не её значение на момент запуска задачи. Когда continuation выполнится (возможно, спустя несколько итераций), sw.Elapsed вернёт время, прошедшее с последнего перезапуска секундомера, а не с момента отправки конкретного запроса.

Здесь надо аккуратнее работать с замыканиями.

Под капотом Stopwatch.StartNew() создается новый экземпляр и гипотетически не должно быть проблем .net может это разрулить, но не во всех ситуациях; Лучше избегать замыканий переменных

requestTasks.Add(requestTask);
var timeoutTask = Task.Delay(timeout / goodAddressesCount);
var completedTask = await Task.WhenAny(requestTasks.Append(timeoutTask));

if (completedTask == timeoutTask)
{
ResponseStatisticsHistory[replicaAddress] = new ResponseStatistics(true, sw.Elapsed);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Таумаут тоже считаем "успешным" завершением? Немного странновато тогда выглядит.

Плюс у нас ContinueWith перезапишет статистику реплики "реальным" временем после завершения запроса, запись этой статистики выглядит избыточной и скорее даже вредной т.к. может привести к состоянию гонки

continue;
}

if (completedTask.IsCompletedSuccessfully)
return await (Task<string>)completedTask;

goodAddressesCount--;
requestTasks.Remove((Task<string>)completedTask);
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
throw new TimeoutException();
}
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}