diff --git a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs index 5531800..037396b 100644 --- a/homework 2/ClusterClient/Clients/ParallelClusterClient.cs +++ b/homework 2/ClusterClient/Clients/ParallelClusterClient.cs @@ -1,23 +1,53 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Threading; using System.Threading.Tasks; using log4net; -namespace ClusterClient.Clients +namespace ClusterClient.Clients; + +public class ParallelClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public class ParallelClusterClient : ClusterClientBase + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - public ParallelClusterClient(string[] replicaAddresses) : base(replicaAddresses) - { - } + if (ReplicaAddresses == null || ReplicaAddresses.Length == 0) + throw new InvalidOperationException("Реплика адресов не указана"); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + var firstSuccess = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var remaining = ReplicaAddresses.Length; + Exception? lastError = null; + + foreach (var uri in ReplicaAddresses) { - throw new NotImplementedException(); + var webRequest = CreateRequest(uri + "?query=" + query); + Log.InfoFormat($"Обработка {webRequest.RequestUri}"); + + var task = ProcessRequestAsync(webRequest); + + _ = task.ContinueWith(t => + { + if (t.Status == TaskStatus.RanToCompletion) + { + firstSuccess.TrySetResult(t.Result); + return; + } + + var ex = t.Exception?.GetBaseException() ?? new TaskCanceledException(t); + lastError = ex; + _ = t.Exception; + + if (Interlocked.Decrement(ref remaining) == 0) + firstSuccess.TrySetException(lastError ?? + new Exception("Не удалось получить ответ ни от одной реплики")); + }, TaskContinuationOptions.ExecuteSynchronously); } - protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); + var completed = await Task.WhenAny(firstSuccess.Task, Task.Delay(timeout)); + if (completed != firstSuccess.Task) + throw new TimeoutException(); + + return await firstSuccess.Task; } -} + + protected override ILog Log => LogManager.GetLogger(typeof(ParallelClusterClient)); +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs index 0293628..fbfdd50 100644 --- a/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs +++ b/homework 2/ClusterClient/Clients/RoundRobinClusterClient.cs @@ -1,23 +1,58 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Diagnostics; using System.Threading.Tasks; using log4net; -namespace ClusterClient.Clients +namespace ClusterClient.Clients; + +public class RoundRobinClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public class RoundRobinClusterClient : ClusterClientBase + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - public RoundRobinClusterClient(string[] replicaAddresses) : base(replicaAddresses) - { - } + if (ReplicaAddresses == null || ReplicaAddresses.Length == 0) + throw new InvalidOperationException("Реплика адресов не указана"); - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + var sw = Stopwatch.StartNew(); + Exception lastError = null; + + for (var i = 0; i < ReplicaAddresses.Length; i++) { - throw new NotImplementedException(); + var remaining = timeout - sw.Elapsed; + if (remaining <= TimeSpan.Zero) + break; + + var remainingReplicas = ReplicaAddresses.Length - i; + var slice = TimeSpan.FromTicks(remaining.Ticks / remainingReplicas); + + var uri = ReplicaAddresses[i]; + var webRequest = CreateRequest(uri + "?query=" + query); + + Log.InfoFormat($"Обработка {webRequest.RequestUri}"); + + var resultTask = ProcessRequestAsync(webRequest); + + var completed = await Task.WhenAny(resultTask, Task.Delay(slice)); + if (completed != resultTask) + { + _ = resultTask.ContinueWith(t => _ = t.Exception, TaskContinuationOptions.OnlyOnFaulted); + continue; + } + + try + { + return await resultTask; + } + catch (Exception e) + { + lastError = e; + } } - protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); + if (lastError != null) + throw lastError; + + throw new TimeoutException(); } -} + + protected override ILog Log => LogManager.GetLogger(typeof(RoundRobinClusterClient)); +} \ No newline at end of file diff --git a/homework 2/ClusterClient/Clients/SmartClusterClient.cs b/homework 2/ClusterClient/Clients/SmartClusterClient.cs index eb06d8b..6ad225b 100644 --- a/homework 2/ClusterClient/Clients/SmartClusterClient.cs +++ b/homework 2/ClusterClient/Clients/SmartClusterClient.cs @@ -1,23 +1,74 @@ using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; +using System.Diagnostics; +using System.Threading; using System.Threading.Tasks; using log4net; -namespace ClusterClient.Clients +namespace ClusterClient.Clients; + +public class SmartClusterClient(string[] replicaAddresses) : ClusterClientBase(replicaAddresses) { - public class SmartClusterClient : ClusterClientBase + public override async Task ProcessRequestAsync(string query, TimeSpan timeout) { - public SmartClusterClient(string[] replicaAddresses) : base(replicaAddresses) + if (ReplicaAddresses == null || ReplicaAddresses.Length == 0) + throw new InvalidOperationException("Реплика адресов не указана"); + + var sw = Stopwatch.StartNew(); + + var perReplicaTimeout = TimeSpan.FromTicks(timeout.Ticks / ReplicaAddresses.Length); + var firstSuccess = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + var remaining = ReplicaAddresses.Length; + Exception lastError = null; + + foreach (var uri in ReplicaAddresses) { + var webRequest = CreateRequest(uri + "?query=" + query); + Log.InfoFormat($"Обработка {webRequest.RequestUri}"); + + var task = ProcessRequestAsync(webRequest); + + _ = task.ContinueWith(t => + { + if (t.Status == TaskStatus.RanToCompletion) + { + firstSuccess.TrySetResult(t.Result); + return; + } + + var ex = t.Exception?.GetBaseException() ?? new TaskCanceledException(t); + Interlocked.Exchange(ref lastError, ex); + + if (Interlocked.Decrement(ref remaining) == 0) + firstSuccess.TrySetException( + Volatile.Read(ref lastError) ?? + new Exception("Не удалось получить успешный ответ ни от одной реплики")); + }, TaskContinuationOptions.ExecuteSynchronously); + + var completed = await Task.WhenAny(firstSuccess.Task, task, Task.Delay(perReplicaTimeout)); + + if (completed == firstSuccess.Task) + return await firstSuccess.Task; + + if (completed != task) continue; + + if (task.Status == TaskStatus.RanToCompletion) + return task.Result; } - public override Task ProcessRequestAsync(string query, TimeSpan timeout) + var remainingTotal = timeout - sw.Elapsed; + if (remainingTotal > TimeSpan.Zero) { - throw new NotImplementedException(); + var completed = await Task.WhenAny(firstSuccess.Task, Task.Delay(remainingTotal)); + if (completed == firstSuccess.Task) + return await firstSuccess.Task; } - protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); + if (firstSuccess.Task.IsCompleted) + return await firstSuccess.Task; + + throw new TimeoutException(); } -} + + protected override ILog Log => LogManager.GetLogger(typeof(SmartClusterClient)); +} \ No newline at end of file