Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions Source/Core/Actors/ActorExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,19 +1027,13 @@ private ActorOperation GetOrCreateActorOperation(ActorId id, Actor actor)
}

/// <inheritdoc/>
public override void SendEvent(ActorId targetId, Event initialEvent, EventGroup eventGroup = default, SendOptions options = null)
{
var senderOp = this.Runtime.GetExecutingOperation<ActorOperation>();
this.SendEvent(targetId, initialEvent, senderOp?.Actor, eventGroup, options);
}
public override void SendEvent(ActorId targetId, Event initialEvent, EventGroup eventGroup = default,
SendOptions options = null) => this.SendEvent(targetId, initialEvent, null, eventGroup, options);

/// <inheritdoc/>
public override Task<bool> SendEventAndExecuteAsync(ActorId targetId, Event initialEvent,
EventGroup eventGroup = null, SendOptions options = null)
{
var senderOp = this.Runtime.GetExecutingOperation<ActorOperation>();
return this.SendEventAndExecuteAsync(targetId, initialEvent, senderOp?.Actor, eventGroup, options);
}
EventGroup eventGroup = null, SendOptions options = null) =>
this.SendEventAndExecuteAsync(targetId, initialEvent, null, eventGroup, options);

/// <summary>
/// Sends an asynchronous <see cref="Event"/> to an actor.
Expand Down
20 changes: 10 additions & 10 deletions Source/Core/Actors/EventQueues/EventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ public EnqueueStatus Enqueue(Event e, EventGroup eventGroup, EventInfo info)
{
this.OnReceiveEvent(e, eventGroup, info);
this.ReceiveCompletionSource.SetResult(e);
return enqueueStatus;
}
else
{
Expand Down Expand Up @@ -147,34 +146,35 @@ public EnqueueStatus Enqueue(Event e, EventGroup eventGroup, EventInfo info)
while (node != null)
{
// Iterates through the events in the inbox.
if (this.IsEventIgnored(node.Value.e))
var currentEvent = node.Value;
if (this.IsEventIgnored(currentEvent.e))
{
// Removes an ignored event.
var nextNode = node.Next;
this.Queue.Remove(node);
this.OnIgnoreEvent(node.Value.e, node.Value.eventGroup, null);
this.OnIgnoreEvent(currentEvent.e, currentEvent.eventGroup, null);
node = nextNode;
continue;
}
else if (this.IsEventDeferred(node.Value.e))
else if (this.IsEventDeferred(currentEvent.e))
{
// Skips a deferred event.
this.OnDeferEvent(node.Value.e, node.Value.eventGroup, null);
this.OnDeferEvent(currentEvent.e, currentEvent.eventGroup, null);
node = node.Next;
continue;
}

// Found next event that can be dequeued.
this.Queue.Remove(node);
return (DequeueStatus.Success, node.Value.e, node.Value.eventGroup, null);
return (DequeueStatus.Success, currentEvent.e, currentEvent.eventGroup, null);
}

// No event can be dequeued, so check if there is a default event handler.
if (!this.IsDefaultHandlerAvailable())
{
// There is no default event handler installed, so do not return an event.
// Setting IsEventHandlerRunning must happen inside the lock as it needs
// to be synchronized with the enqueue and starting a new event handler.
// There is no default event handler installed, so do not return an event. Setting the
// IsEventHandlerRunning field must happen inside the lock as it needs to be synchronized
// with the enqueue and starting a new event handler.
this.IsEventHandlerRunning = false;
return (DequeueStatus.Unavailable, null, null, null);
}
Expand Down Expand Up @@ -259,7 +259,7 @@ private Task<Event> ReceiveEventAsync(Dictionary<Type, Func<Event, bool>> eventW

if (receivedEvent == default)
{
// Note that EventWaitTypes is racy, so should not be accessed outside
// Note that the EventWaitTypes field is racy, so should not be accessed outside
// the lock, this is why we access eventWaitTypes instead.
this.OnWaitEvent(eventWaitTypes.Keys);
return this.ReceiveCompletionSource.Task;
Expand Down
175 changes: 116 additions & 59 deletions Source/Core/Actors/EventQueues/MockEventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Coyote.Actors.Mocks
Expand Down Expand Up @@ -45,6 +46,11 @@ internal class MockEventQueue : IEventQueue
/// </summary>
private TaskCompletionSource<Event> ReceiveCompletionSource;

/// <summary>
/// A lock that synchronizes accesses to the queue.
/// </summary>
private readonly SemaphoreSlim Lock;

/// <summary>
/// Checks if the queue is accepting new events.
/// </summary>
Expand Down Expand Up @@ -76,52 +82,71 @@ internal MockEventQueue(Actor owner)
{
this.Owner = owner;
this.Queue = new LinkedList<(Event, EventGroup, EventInfo)>();
this.EventWaitTypes = new Dictionary<Type, Func<Event, bool>>();
this.Lock = new SemaphoreSlim(1, 1);
this.IsClosed = false;
}

/// <inheritdoc/>
public EnqueueStatus Enqueue(Event e, EventGroup eventGroup, EventInfo info)
{
if (this.IsClosed)
EnqueueStatus enqueueStatus = EnqueueStatus.EventHandlerRunning;
this.Lock.Wait();
try
{
return EnqueueStatus.Dropped;
if (this.IsClosed)
{
return EnqueueStatus.Dropped;
}

if (this.EventWaitTypes != null &&
this.EventWaitTypes.TryGetValue(e.GetType(), out Func<Event, bool> predicate) &&
(predicate is null || predicate(e)))
{
this.EventWaitTypes = null;
enqueueStatus = EnqueueStatus.Received;
}
else
{
this.Queue.AddLast((e, eventGroup, info));
if (info.Assert >= 0)
{
var eventCount = this.Queue.Count(val => val.e.GetType().Equals(e.GetType()));
this.Assert(eventCount <= info.Assert,
"There are more than {0} instances of '{1}' in the input queue of {2}.",
info.Assert, info.EventName, this.Owner.Id);
}

if (!this.IsEventHandlerRunning)
{
if (this.TryDequeueEvent(true).e is null)
{
enqueueStatus = EnqueueStatus.NextEventUnavailable;
}
else
{
this.IsEventHandlerRunning = true;
enqueueStatus = EnqueueStatus.EventHandlerNotRunning;
}
}
}
}
finally
{
this.Lock.Release();
}

if (this.EventWaitTypes.TryGetValue(e.GetType(), out Func<Event, bool> predicate) &&
(predicate is null || predicate(e)))
if (enqueueStatus is EnqueueStatus.Received)
{
this.EventWaitTypes.Clear();
this.OnReceiveEvent(e, eventGroup, info);
this.ReceiveCompletionSource.SetResult(e);
return EnqueueStatus.EventHandlerRunning;
enqueueStatus = EnqueueStatus.EventHandlerRunning;
}

this.OnEnqueueEvent(e, eventGroup, info);
this.Queue.AddLast((e, eventGroup, info));

if (info.Assert >= 0)
else
{
var eventCount = this.Queue.Count(val => val.e.GetType().Equals(e.GetType()));
this.Assert(eventCount <= info.Assert,
"There are more than {0} instances of '{1}' in the input queue of {2}.",
info.Assert, info.EventName, this.Owner.Id);
this.OnEnqueueEvent(e, eventGroup, info);
}

if (!this.IsEventHandlerRunning)
{
if (this.TryDequeueEvent(true).e is null)
{
return EnqueueStatus.NextEventUnavailable;
}
else
{
this.IsEventHandlerRunning = true;
return EnqueueStatus.EventHandlerNotRunning;
}
}

return EnqueueStatus.EventHandlerRunning;
return enqueueStatus;
}

/// <inheritdoc/>
Expand All @@ -148,21 +173,31 @@ public EnqueueStatus Enqueue(Event e, EventGroup eventGroup, EventInfo info)

// Make sure this happens before a potential dequeue.
var hasDefaultHandler = this.IsDefaultHandlerAvailable();

// Try to dequeue the next event, if there is one.
var (e, eventGroup, info) = this.TryDequeueEvent();
if (e != null)
Console.WriteLine($"Try Dequeue");
this.Lock.Wait();
try
{
// Found next event that can be dequeued.
return (DequeueStatus.Success, e, eventGroup, info);
}
// Try to dequeue the next event, if there is one.
var (e, eventGroup, info) = this.TryDequeueEvent();
if (e != null)
{
// Found next event that can be dequeued.
return (DequeueStatus.Success, e, eventGroup, info);
}

// No event can be dequeued, so check if there is a default event handler.
if (!hasDefaultHandler)
// No event can be dequeued, so check if there is a default event handler.
if (!hasDefaultHandler)
{
// There is no default event handler installed, so do not return an event. Setting the
// IsEventHandlerRunning field must happen inside the lock as it needs to be synchronized
// with the enqueue and starting a new event handler.
this.IsEventHandlerRunning = false;
return (DequeueStatus.Unavailable, null, null, null);
}
}
finally
{
// There is no default event handler installed, so do not return an event.
this.IsEventHandlerRunning = false;
return (DequeueStatus.Unavailable, null, null, null);
this.Lock.Release();
}

// TODO: check op-id of default event.
Expand All @@ -183,11 +218,10 @@ public EnqueueStatus Enqueue(Event e, EventGroup eventGroup, EventInfo info)
while (node != null)
{
// Iterates through the events and metadata in the inbox.
var nextNode = node.Next;
var currentEvent = node.Value;

if (this.IsEventIgnored(currentEvent.e))
{
var nextNode = node.Next;
if (!checkOnly)
{
// Removes an ignored event.
Expand All @@ -202,12 +236,13 @@ public EnqueueStatus Enqueue(Event e, EventGroup eventGroup, EventInfo info)
{
// Skips a deferred event.
this.OnDeferEvent(currentEvent.e, currentEvent.eventGroup, currentEvent.info);
node = nextNode;
node = node.Next;
continue;
}

if (!checkOnly)
{
// Found next event that can be dequeued.
this.Queue.Remove(node);
}

Expand Down Expand Up @@ -264,33 +299,47 @@ public Task<Event> ReceiveEventAsync(params Tuple<Type, Func<Event, bool>>[] eve
}

/// <summary>
/// Waits for an event to be enqueued.
/// Waits for an event to be enqueued based on the conditions defined in the event wait types.
/// </summary>
private Task<Event> ReceiveEventAsync(Dictionary<Type, Func<Event, bool>> eventWaitTypes)
{
this.OnReceiveInvoked();

(Event e, EventGroup eventGroup, EventInfo info) receivedEvent = default;
var node = this.Queue.First;
while (node != null)
this.Lock.Wait();
try
{
// Dequeue the first event that the caller waits to receive, if there is one in the queue.
if (eventWaitTypes.TryGetValue(node.Value.e.GetType(), out Func<Event, bool> predicate) &&
(predicate is null || predicate(node.Value.e)))
var node = this.Queue.First;
while (node != null)
{
receivedEvent = node.Value;
this.Queue.Remove(node);
break;
// Dequeue the first event that the caller waits to receive, if there is one in the queue.
if (eventWaitTypes.TryGetValue(node.Value.e.GetType(), out Func<Event, bool> predicate) &&
(predicate is null || predicate(node.Value.e)))
{
receivedEvent = node.Value;
this.Queue.Remove(node);
break;
}

node = node.Next;
}

node = node.Next;
if (receivedEvent == default)
{
this.ReceiveCompletionSource = new TaskCompletionSource<Event>();
this.EventWaitTypes = eventWaitTypes;
}
}
finally
{
this.Lock.Release();
}

if (receivedEvent == default)
{
this.ReceiveCompletionSource = new TaskCompletionSource<Event>();
this.EventWaitTypes = eventWaitTypes;
this.OnWaitEvent(this.EventWaitTypes.Keys);
// Note that the EventWaitTypes field is racy, so should not be accessed outside
// the lock, this is why we access eventWaitTypes instead.
this.OnWaitEvent(eventWaitTypes.Keys);
return this.ReceiveCompletionSource.Task;
}

Expand Down Expand Up @@ -415,7 +464,15 @@ public int GetCachedState()
/// <inheritdoc/>
public void Close()
{
this.IsClosed = true;
this.Lock.Wait();
try
{
this.IsClosed = true;
}
finally
{
this.Lock.Release();
}
}

/// <summary>
Expand Down
Loading