Skip to content

Commit

Permalink
Akka.Actor: IStash API and configuration enhancements (#6660)
Browse files Browse the repository at this point in the history
* close #6658

Add APIs to track content of Stash

* added basic stashing test cases

* updated API approvals

---------

Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
Aaronontheweb and Arkatufus authored Apr 24, 2023
1 parent 61fb874 commit 8b6c0ff
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1134,6 +1134,11 @@ namespace Akka.Actor
}
public interface IStash
{
int Capacity { get; }
int Count { get; }
bool IsEmpty { get; }
bool IsFull { get; }
bool NonEmpty { get; }
System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash();
void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes);
void Stash();
Expand Down Expand Up @@ -1933,6 +1938,11 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
public bool NonEmpty { get; }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,11 @@ namespace Akka.Actor
}
public interface IStash
{
int Capacity { get; }
int Count { get; }
bool IsEmpty { get; }
bool IsFull { get; }
bool NonEmpty { get; }
System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash();
void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes);
void Stash();
Expand Down Expand Up @@ -1931,6 +1936,11 @@ namespace Akka.Actor.Internal
public abstract class AbstractStash : Akka.Actor.IStash
{
protected AbstractStash(Akka.Actor.IActorContext context) { }
public int Capacity { get; }
public int Count { get; }
public bool IsEmpty { get; }
public bool IsFull { get; }
public bool NonEmpty { get; }
public System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> ClearStash() { }
public void Prepend(System.Collections.Generic.IEnumerable<Akka.Actor.Envelope> envelopes) { }
public void Stash() { }
Expand Down
6 changes: 6 additions & 0 deletions src/core/Akka.Persistence/Eventsourced.cs
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,12 @@ public void Prepend(IEnumerable<Envelope> envelopes)
{
_userStash.Prepend(envelopes);
}

public int Count => _userStash.Count;
public bool IsEmpty => _userStash.IsEmpty;
public bool NonEmpty => _userStash.NonEmpty;
public bool IsFull => _userStash.IsFull;
public int Capacity => _userStash.Capacity;
}
}
}
145 changes: 145 additions & 0 deletions src/core/Akka.Tests/Actor/Stash/StashCapacitySpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//-----------------------------------------------------------------------
// <copyright file="StashCapacitySpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2023 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2023 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using FluentAssertions;

namespace Akka.Tests.Actor.Stash;

public class StashCapacitySpecs : AkkaSpec
{
public StashCapacitySpecs(ITestOutputHelper output) : base(Config.Empty, output: output)
{

}

[Fact]
public async Task ShouldGetAccurateStashReadingForUnboundedStash()
{
var stashActor = Sys.ActorOf(Props.Create(() => new StashActor()));
stashActor.Tell(new StashActor.StashMessage("1"));
stashActor.Tell(new StashActor.StashMessage("2"));
stashActor.Tell(StashActor.GetStashReadout.Instance);
var readout1 = await ExpectMsgAsync<StashActor.StashReadout>();
readout1.Capacity.Should().Be(-1); // unbounded stash returns -1 for capacity
readout1.Size.Should().Be(2);
readout1.IsEmpty.Should().BeFalse();
readout1.IsFull.Should().BeFalse();

stashActor.Tell(StashActor.UnstashMessage.Instance);
stashActor.Tell(StashActor.GetStashReadout.Instance);
var readout2 = await ExpectMsgAsync<StashActor.StashReadout>();
readout2.Capacity.Should().Be(-1);
readout2.Size.Should().Be(1);
readout2.IsEmpty.Should().BeFalse();
readout2.IsFull.Should().BeFalse();

stashActor.Tell(StashActor.UnstashMessage.Instance);
stashActor.Tell(StashActor.GetStashReadout.Instance);
var readout3 = await ExpectMsgAsync<StashActor.StashReadout>();
readout3.Capacity.Should().Be(-1);
readout3.Size.Should().Be(0);
readout3.IsEmpty.Should().BeTrue();
readout3.IsFull.Should().BeFalse();
}

private class StashActor : UntypedActorWithStash
{
public class StashMessage
{
public StashMessage(string message)
{
Message = message;
}

public string Message { get; }
}

public class UnstashMessage
{
private UnstashMessage()
{

}
public static readonly UnstashMessage Instance = new();
}

public class GetStashReadout
{
private GetStashReadout()
{

}
public static readonly GetStashReadout Instance = new();
}

public class StashReadout
{
public StashReadout(int capacity, int size, bool isEmpty, bool isFull)
{
Capacity = capacity;
Size = size;
IsEmpty = isEmpty;
IsFull = isFull;
}

public int Capacity { get; }
public int Size { get; }

public bool IsEmpty { get; }

public bool IsFull { get; }
}

protected override void OnReceive(object message)
{
switch (message)
{
case StashMessage msg:
Stash.Stash();
break;
case UnstashMessage _:
Stash.Unstash();
Context.Become(Unstashing); // switch behaviors so we're not stuck in stash loop
break;
case GetStashReadout _:
Sender.Tell(new StashReadout(Stash.Capacity, Stash.Count, Stash.IsEmpty, Stash.IsFull));
break;
default:
Unhandled(message);
break;
}
}

private void Unstashing(object message)
{
switch (message)
{
case StashMessage msg: // do nothing
break;
case UnstashMessage when Stash.NonEmpty:
Stash.Unstash();
break;
case UnstashMessage _: // when the stash is empty, go back to stashing
Context.Become(OnReceive);
break;
case GetStashReadout _:
Sender.Tell(new StashReadout(Stash.Capacity, Stash.Count, Stash.IsEmpty, Stash.IsFull));
break;
default:
Unhandled(message);
break;
}
}
}
}
32 changes: 30 additions & 2 deletions src/core/Akka/Actor/Stash/IStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,42 @@ public interface IStash
/// Returns all messages and clears the stash.
/// The stash is guaranteed to be empty afterwards.
/// </summary>
/// <returns>TBD</returns>
/// <returns>The previous stashed messages.</returns>
IEnumerable<Envelope> ClearStash();

/// <summary>
/// TBD
/// Prepend a set of envelopes to the front of the stash.
/// </summary>
/// <param name="envelopes">TBD</param>
void Prepend(IEnumerable<Envelope> envelopes);

/// <summary>
/// The number of messages currently inside the stash.
/// </summary>
public int Count { get; }

/// <summary>
/// Returns <c>true</c> when <see cref="Count"/> is zero.
/// </summary>
public bool IsEmpty { get; }

/// <summary>
/// Returns <c>true</c> when <see cref="Count"/> is greater than zero.
/// </summary>
public bool NonEmpty { get; }

/// <summary>
/// When using a bounded stash, this returns <c>true</c> when the stash is full.
/// </summary>
/// <remarks>
/// Always returns <c>false</c> when using an unbounded stash.
/// </remarks>
public bool IsFull { get; }

/// <summary>
/// The total capacity of the stash.
/// </summary>
public int Capacity { get; }
}
}

22 changes: 15 additions & 7 deletions src/core/Akka/Actor/Stash/Internal/AbstractStash.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ public abstract class AbstractStash : IStash

private readonly ActorCell _actorCell;

/// <summary>
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
/// </summary>
private readonly int _capacity;

/// <summary>
/// The actor's deque-based message queue.
/// `mailbox.queue` is the underlying `Deque`.
Expand All @@ -61,7 +56,7 @@ protected AbstractStash(IActorContext context)
_actorCell = actorCell;

// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
_capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox);
Capacity = context.System.Mailboxes.StashCapacity(context.Props.Dispatcher, context.Props.Mailbox);
}

private int _currentEnvelopeId;
Expand All @@ -84,7 +79,7 @@ public void Stash()
}
_currentEnvelopeId = _actorCell.CurrentEnvelopeId;

if (_capacity <= 0 || _theStash.Count < _capacity)
if (Capacity <= 0 || _theStash.Count < Capacity)
_theStash.AddLast(new Envelope(currMsg, sender));
else
throw new StashOverflowException($"Couldn't enqueue message {currMsg} from ${sender} to stash of {_actorCell.Self}");
Expand Down Expand Up @@ -189,6 +184,19 @@ public void Prepend(IEnumerable<Envelope> envelopes)
}
}

public int Count => _theStash.Count;
public bool IsEmpty => Count == 0;
public bool NonEmpty => !IsEmpty;
public bool IsFull => Capacity >= 0 && _theStash.Count >= Capacity;

/// <summary>
/// The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
/// </summary>
/// <remarks>
/// If capacity is negative, then we're using an Unbounded stash.
/// </remarks>
public int Capacity { get; }

/// <summary>
/// Enqueues <paramref name="msg"/> at the first position in the mailbox. If the message contained in
/// the envelope is a <see cref="Terminated"/> message, it will be ensured that it can be re-received
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/Dispatch/ISemantics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
namespace Akka.Dispatch
{
/// <summary>
/// TBD
/// Describes the message queue semantics of a mailbox.
/// </summary>
public interface ISemantics
{
Expand Down

0 comments on commit 8b6c0ff

Please sign in to comment.