Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 42 additions & 12 deletions homework 2/ClusterClient/Clients/ParallelClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,53 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public class ParallelClusterClient : ClusterClientBase
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
if (ReplicaAddresses == null || ReplicaAddresses.Length == 0)
throw new InvalidOperationException("Реплика адресов не указана");

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
var firstSuccess = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);

var remaining = ReplicaAddresses.Length;
Exception? lastError = null;

foreach (var uri in ReplicaAddresses)
{
throw new NotImplementedException();
var webRequest = CreateRequest(uri + "?query=" + query);
Log.InfoFormat($"Обработка {webRequest.RequestUri}");

var task = ProcessRequestAsync(webRequest);

_ = task.ContinueWith(t =>
{
if (t.Status == TaskStatus.RanToCompletion)
{
firstSuccess.TrySetResult(t.Result);
return;
}

var ex = t.Exception?.GetBaseException() ?? new TaskCanceledException(t);
lastError = ex;
_ = t.Exception;

if (Interlocked.Decrement(ref remaining) == 0)
firstSuccess.TrySetException(lastError ??
new Exception("Не удалось получить ответ ни от одной реплики"));
}, TaskContinuationOptions.ExecuteSynchronously);
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
var completed = await Task.WhenAny(firstSuccess.Task, Task.Delay(timeout));
if (completed != firstSuccess.Task)
throw new TimeoutException();

return await firstSuccess.Task;
}
}

protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient));
}
59 changes: 47 additions & 12 deletions homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,58 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public class RoundRobinClusterClient : ClusterClientBase
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses)
{
}
if (ReplicaAddresses == null || ReplicaAddresses.Length == 0)
throw new InvalidOperationException("Реплика адресов не указана");

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
var sw = Stopwatch.StartNew();
Exception lastError = null;

for (var i = 0; i < ReplicaAddresses.Length; i++)
{
throw new NotImplementedException();
var remaining = timeout - sw.Elapsed;
if (remaining <= TimeSpan.Zero)
break;

var remainingReplicas = ReplicaAddresses.Length - i;
var slice = TimeSpan.FromTicks(remaining.Ticks / remainingReplicas);

var uri = ReplicaAddresses[i];
var webRequest = CreateRequest(uri + "?query=" + query);

Log.InfoFormat($"Обработка {webRequest.RequestUri}");

var resultTask = ProcessRequestAsync(webRequest);

var completed = await Task.WhenAny(resultTask, Task.Delay(slice));
if (completed != resultTask)
{
_ = resultTask.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что делает эта строка?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Что делает эта строка?

В этой строке ловится исключение у запроса, который мы перестали ждать из-за того, что перешли к следующей реплике. Я так сделал для избежания вызова исключения UnobservedTaskException

continue;
}

try
{
return await resultTask;
}
catch (Exception e)
{
lastError = e;
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
if (lastError != null)
throw lastError;

throw new TimeoutException();
}
}

protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient));
}
71 changes: 61 additions & 10 deletions homework 2/ClusterClient/Clients/SmartClusterClient.cs
Original file line number Diff line number Diff line change
@@ -1,23 +1,74 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using log4net;

namespace ClusterClient.Clients
namespace ClusterClient.Clients;

public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses)
{
public class SmartClusterClient : ClusterClientBase
public override async Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
{
public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses)
if (ReplicaAddresses == null || ReplicaAddresses.Length == 0)
throw new InvalidOperationException("Реплика адресов не указана");

var sw = Stopwatch.StartNew();

var perReplicaTimeout = TimeSpan.FromTicks(timeout.Ticks / ReplicaAddresses.Length);
var firstSuccess = new TaskCompletionSource<string>(TaskCreationOptions.RunContinuationsAsynchronously);

var remaining = ReplicaAddresses.Length;
Exception lastError = null;

foreach (var uri in ReplicaAddresses)
{
var webRequest = CreateRequest(uri + "?query=" + query);
Log.InfoFormat($"Обработка {webRequest.RequestUri}");

var task = ProcessRequestAsync(webRequest);

_ = task.ContinueWith(t =>
{
if (t.Status == TaskStatus.RanToCompletion)
{
firstSuccess.TrySetResult(t.Result);
return;
}

var ex = t.Exception?.GetBaseException() ?? new TaskCanceledException(t);
Interlocked.Exchange(ref lastError, ex);

if (Interlocked.Decrement(ref remaining) == 0)
firstSuccess.TrySetException(
Volatile.Read(ref lastError) ??
new Exception("Не удалось получить успешный ответ ни от одной реплики"));
}, TaskContinuationOptions.ExecuteSynchronously);

var completed = await Task.WhenAny(firstSuccess.Task, task, Task.Delay(perReplicaTimeout));

if (completed == firstSuccess.Task)
return await firstSuccess.Task;

if (completed != task) continue;

if (task.Status == TaskStatus.RanToCompletion)
return task.Result;
}

public override Task<string> ProcessRequestAsync(string query, TimeSpan timeout)
var remainingTotal = timeout - sw.Elapsed;
if (remainingTotal > TimeSpan.Zero)
{
throw new NotImplementedException();
var completed = await Task.WhenAny(firstSuccess.Task, Task.Delay(remainingTotal));
if (completed == firstSuccess.Task)
return await firstSuccess.Task;
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
if (firstSuccess.Task.IsCompleted)
return await firstSuccess.Task;

throw new TimeoutException();
}
}

protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient));
}