Skip to content

Commit

Permalink
Document new timers and scheduling APIs (#4608)
Browse files Browse the repository at this point in the history
* Add Scheduler, IWithTimers documentation

close #4357
  • Loading branch information
Aaronontheweb authored Nov 6, 2020
1 parent 25009c0 commit 6a491ea
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 22 deletions.
51 changes: 51 additions & 0 deletions docs/articles/actors/schedulers.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
---
uid: schedulers
title: Scheduling Future and Recurring Actor Messages
---

# Scheduling Future and Recurring Actor Messages
A useful feature of Akka.NET is the ability to schedule messages to be delivered in the future or on a recurring basis. This functionality can be used for all sorts of use cases, such as:

1. Creating conditional timeouts;
2. Execute recurring tasks; or
3. Throttling or delaying work.

## Scheduling Actor Messages Using `IWithTimers` (Recommended Approach)
As of Akka.NET v1.4.4 we introduced the [`IWithTimers` interface](xref:Akka.Actor.IWithTimers), which gives Akka.NET actors a way of accessing the `ActorSystem`'s scheduler without having to remember to manually dispose of scheduled tasks afterwards. Any scheduled or recurring tasks created by the `IWithTimers` interface will be automatically cancelled once the [actor terminates](xref:supervision).

[!code-csharp[IWithTimersSample](../../../src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs?name=TimerActor)]

In this approach, all timers are created using a specific key that can also be used to stop a timer again in the future:

[!code-csharp[StartTimers](../../../src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs?name=StartTimers)]

The key that was used to create a timer can also be used to query whether that timer is still running or not:

[!code-csharp[CheckTimer](../../../src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs?name=CheckTimer)]

And that key can be used to stop those timers as well:

[!code-csharp[StopTimers](../../../src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs?name=StartTimers)]

To use the `IWithTimer` interface, simply decorate your actor class with it and call the [`Timer.StartPeriodicTimer`](xref:Akka.Actor.ITimerScheduler#Akka_Actor_ITimerScheduler_StartPeriodicTimer_System_Object_System_Object_System_TimeSpan_) and [`Timer.StartSingleTimer`](xref:Akka.Actor.ITimerScheduler#Akka_Actor_ITimerScheduler_StartSingleTimer_System_Object_System_Object_System_TimeSpan_) methods. All of those timers will automatically be cancelled when the actor terminates.

### Testing for Idle Timeouts with `ReceiveTimeout`
One specific case with actors, and this is particularly useful for areas like [Akka.Cluster.Sharding](xref:cluster-sharding), is the ability to time out "idle" actors after a specified period of inactivity.

This can be accomplished using the `ReceiveTimeout` capability.

[!code-csharp[ReceiveTimeout](../../../src/core/Akka.Docs.Tests/Actors/ReceiveTimeoutSpecs.cs?name=ReceiveTimeoutActor)]

* `ReceiveTimeout` is a sliding window timeout - the timeout gets reset every time an actor receives a message that does not implement the [`INotInfluenceReceiveTimeout` interface](xref:Akka.Actor.INotInfluenceReceiveTimeout) the timer is reset back to its original duration.
* If the timeout expires, the actor will be notified by receiving a copy of the `ReceiveTimeout` message - at this stage the actor can do things like shut itself down, flush its state to a database, or whatever else you might need the actor to do once it becomes idle.
* The `SetReceiveTimeout(TimeSpan? time = null)` value can be changed at runtime or it can be cancelled altogether by calling `Context.SetReceiveTimeout(null)`; and
* The `ReceiveTimeout` will automatically be cancelled when the actor terminates.

## Scheduling Recurring Tasks with `IScheduler`
While the `IWithTimers` interface is the recommended approach for working with actors, the `ActorSystem` itself includes the underlying [`IScheduler` interface](xref:Akka.Actor.IScheduler), which exposes timing primitives that can be used inside or outside of individual actors.

[!code-csharp[Scheduler](../../../src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs?name=Scheduler)]

The `ActorSystem.Scheduler` can be used for any number of different types of tasks, but those tasks will not be cancelled automatically. You have to call the `IScheduler.Schedule_{method}_RepeatedlyCancelable` method, store the `ICancelable` returned by that method, and then call `ICancelable.Cancel()` once you're finished with it to dispose the method.

To learn more about working with the `IScheduler`, please see [Akka.NET Bootcamp Unit 2 Lesson 3 - Using the Scheduler to Send Messages Later](https://github.com/petabridge/akka-bootcamp/blob/master/src/Unit-2/lesson3/README.md).
2 changes: 2 additions & 0 deletions docs/articles/toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
href: actors/dispatchers.md
- name: Mailboxes
href: actors/mailboxes.md
- name: Scheduling Future and Recurring Messages
href: actors/schedulers.md
- name: Inbox
href: actors/inbox.md
- name: Finite State Machines
Expand Down
24 changes: 2 additions & 22 deletions src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<PropertyGroup>
<Copyright>Copyright © 2013-2020 Akka.NET Team</Copyright>
<Authors>Akka.NET Team</Authors>
<VersionPrefix>1.4.11</VersionPrefix>
<VersionPrefix>1.4.12</VersionPrefix>
<PackageIconUrl>https://getakka.net/images/akkalogo.png</PackageIconUrl>
<PackageProjectUrl>https://github.com/akkadotnet/akka.net</PackageProjectUrl>
<PackageLicenseUrl>https://github.com/akkadotnet/akka.net/blob/master/LICENSE</PackageLicenseUrl>
Expand All @@ -28,27 +28,7 @@
<CopyLocalLockFileAssemblies>true</CopyLocalLockFileAssemblies>
</PropertyGroup>
<PropertyGroup>
<PackageReleaseNotes>Maintenance Release for Akka.NET 1.4**
Akka.NET v1.4.11 includes some significant additions to Akka.NET:
[Akka: How prevent "Scheduled sending of heartbeat was delayed" and occasionally network partitions](https://github.com/akkadotnet/akka.net/issues/4432) - all `/system` actors now run on a dedicated dispatcher. This should significantly improve reliablity for Akka.Cluster, Akka.Persistence, and other built-in Akka.NET systems inside heavily utilized environments.
[Akka: Double wildcard implementation for ActorSelection](https://github.com/akkadotnet/akka.net/pull/4375)
[Akka.Remote: Null reference exception due to RemoteActorRefProvider.RemoteInternals](https://github.com/akkadotnet/akka.net/issues/4579)
[Akka.Persistence: Fix premature reset of the 'writeInProgress' flag in case of persistence failure](https://github.com/akkadotnet/akka.net/pull/4556)
[Akka.Cluster: disseminate downing decisions faste](https://github.com/akkadotnet/akka.net/pull/4598)
[Cluster - Add app-version to the Member information](https://github.com/akkadotnet/akka.net/pull/4577) - you can now specify which version of your software is running on each node.
[Akka.Cluster.Sharding: Bring ShardedDaemonProcess up to date](https://github.com/akkadotnet/akka.net/pull/4571)
To see the [full set of fixes in Akka.NET v1.4.11, please see the milestone on Github](https://github.com/akkadotnet/akka.net/milestone/42).
| COMMITS | LOC+ | LOC- | AUTHOR |
| --- | --- | --- | --- |
| 8 | 1020 | 164 | Gregorius Soedharmo |
| 7 | 399 | 178 | Ismael Hamed |
| 4 | 5 | 5 | dependabot-preview[bot] |
| 4 | 108 | 104 | Aaron Stannard |
| 2 | 232 | 26 | to11mtm |
| 2 | 2 | 2 | Pierre Irrmann |
| 2 | 1969 | 269 | zbynek001 |
| 2 | 155 | 445 | huysentruitw |
| 1 | 1 | 1 | Guillaume Caya-Letourneau |</PackageReleaseNotes>
<PackageReleaseNotes>Placeholder for nightlies**</PackageReleaseNotes>
</PropertyGroup>
<!-- SourceLink support for all Akka.NET projects -->
<ItemGroup>
Expand Down
69 changes: 69 additions & 0 deletions src/core/Akka.Docs.Tests/Actors/ReceiveTimeoutSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//-----------------------------------------------------------------------
// <copyright file="CoordinatedShutdownSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;

namespace DocsExamples.Actors
{

public class ReceiveTimeoutSpecs : TestKit
{
// <ReceiveTimeoutActor>
/// <summary>
/// Used to query if a <see cref="ReceiveTimeout"/> has been observed.
///
/// Can't influence the <see cref="ReceiveTimeout"/> since it implements
/// <see cref="INotInfluenceReceiveTimeout"/>.
/// </summary>
public class CheckTimeout : INotInfluenceReceiveTimeout { }
public class ReceiveTimeoutActor : ReceiveActor
{
private bool _timedOut = false;

public ReceiveTimeoutActor()
{
// if we don't
Receive<ReceiveTimeout>(_ =>
{
_timedOut = true;
});

Receive<CheckTimeout>(_ =>
{
Sender.Tell(_timedOut);
});
}

protected override void PreStart()
{
Context.SetReceiveTimeout(TimeSpan.FromMilliseconds(100));
}
}
// <ReceiveTimeoutActor>

[Fact]
public async Task ShouldReceiveTimeoutActors()
{
var receiveTimeout = Sys.ActorOf(Props.Create(() => new ReceiveTimeoutActor()), "receive-timeout");
var timedout1 = await receiveTimeout.Ask<bool>(new CheckTimeout(), TimeSpan.FromMilliseconds(500));
timedout1.Should().BeFalse();

await Task.Delay(200); // wait 200 ms

var timedout2 = await receiveTimeout.Ask<bool>(new CheckTimeout(), TimeSpan.FromMilliseconds(500));
timedout2.Should().BeTrue();
}
}
}
181 changes: 181 additions & 0 deletions src/core/Akka.Docs.Tests/Actors/SchedulerSpecs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
//-----------------------------------------------------------------------
// <copyright file="CoordinatedShutdownSpecs.cs" company="Akka.NET Project">
// Copyright (C) 2009-2020 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2020 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Event;
using Akka.TestKit.Xunit2;
using FluentAssertions;
using Xunit;

namespace DocsExamples.Actors
{
public class SchedulerSpecs : TestKit
{
public sealed class StopPrinting{}
public sealed class CanPrint{}

// <TimerActor>
public sealed class Print { }
public sealed class Total { }

public sealed class TimerActor : ReceiveActor, IWithTimers
{
public ITimerScheduler Timers { get; set; }

private int _count = 0;
private ILoggingAdapter _log = Context.GetLogger();

public TimerActor()
{
Receive<int>(i =>
{
_count += i;
});

Receive<Print>(_ => _log.Info("Current count is [{0}]", _count));
Receive<Total>(_ => Sender.Tell(_count));
}

protected override void PreStart()
{
// start two recurring timers
// both timers will be automatically disposed when actor is stopped
Timers.StartPeriodicTimer("print", new Print(), TimeSpan.FromSeconds(0.1), TimeSpan.FromSeconds(5));
Timers.StartPeriodicTimer("add", 1, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(20));
}
}
// </TimerActor>

public sealed class StartStopTimerActor : ReceiveActor, IWithTimers
{
public ITimerScheduler Timers { get; set; }

private int _count = 0;
private ILoggingAdapter _log = Context.GetLogger();

public StartStopTimerActor()
{
Receive<int>(i =>
{
_count += i;
});

Receive<StopPrinting>(_ => StopPrintTimer());
Receive<CanPrint>(_ =>
{
// <CheckTimer>
var isPrintTimerActive = Timers.IsTimerActive("print");
Sender.Tell(isPrintTimerActive);
// </CheckTimer>
});
Receive<Print>(_ => _log.Info("Current count is [{0}]", _count));
Receive<Total>(_ => Sender.Tell(_count));
}

protected override void PreStart()
{
StartTimers();
}

private void StartTimers()
{
// <StartTimers>

// start single timer that fires off 5 seconds in the future
Timers.StartSingleTimer("print", new Print(), TimeSpan.FromSeconds(5));

// start recurring timer
Timers.StartPeriodicTimer("add", 1, TimeSpan.FromMilliseconds(0), TimeSpan.FromMilliseconds(20));
// </StartTimers>
}

private void StopPrintTimer()
{
// <StopTimers>
// cancels the print timer
Timers.Cancel("print");

// cancels all of the timers that belong to this actor
// (also called automatically when this actor is stopped)
Timers.CancelAll();
// </StopTimers>
}
}

// <Scheduler>
public class SchedulerActor : ReceiveActor
{
private int _count = 0;
private ILoggingAdapter _log = Context.GetLogger();

private ICancelable _printTask;
private ICancelable _addTask;

public SchedulerActor()
{
Receive<int>(i =>
{
_count += i;
});

Receive<Print>(_ => _log.Info("Current count is [{0}]", _count));
Receive<Total>(_ => Sender.Tell(_count));
}

protected override void PreStart()
{
// start two recurring timers
// both timers will be automatically disposed when actor is stopped
_printTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(0.1),
TimeSpan.FromSeconds(5), Self, new Print(), ActorRefs.NoSender);

_addTask = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromMilliseconds(0),
TimeSpan.FromMilliseconds(20), Self, 1, ActorRefs.NoSender);
}

protected override void PostStop()
{
// have to cancel all recurring scheduled tasks
_printTask?.Cancel();
_addTask?.Cancel();
}
}
// </Scheduler>

[Fact]
public async Task TimerActorShouldIncrementOverTime()
{
var timerActor = Sys.ActorOf(Props.Create(() => new TimerActor()), "timers");

timerActor.Tell(new Total());
var count1 = ExpectMsg<int>();

await Task.Delay(100); // pause for 100ms

timerActor.Tell(new Total());
var count2 = ExpectMsg<int>();

count1.Should().BeLessThan(count2);
}

[Fact]
public async Task TimerShouldStopCorrectly()
{
var timerActor = Sys.ActorOf(Props.Create(() => new StartStopTimerActor()), "timers");

var canPrint1 = await timerActor.Ask<bool>(new CanPrint(), TimeSpan.FromSeconds(1));
canPrint1.Should().BeTrue();

timerActor.Tell(new StopPrinting());

var canPrint2 = await timerActor.Ask<bool>(new CanPrint(), TimeSpan.FromSeconds(1));
canPrint2.Should().BeFalse();
}
}
}
1 change: 1 addition & 0 deletions src/core/Akka.Docs.Tests/Akka.Docs.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="$(FluentAssertionsVersion)" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="System.Net.Http" Version="4.3.4" />
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
Expand Down

0 comments on commit 6a491ea

Please sign in to comment.