Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Zbynek001 sharding update2 #5857

Merged
merged 77 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
9919ddb
Add Dropped to Akka.Actor (migrated partially from https://github.com…
zbynek001 Nov 3, 2020
4762240
Logging of UnhandledMessage (migrated from https://github.com/akka/ak…
zbynek001 Nov 14, 2020
220dd0b
MessageBuffer implementations
zbynek001 Nov 11, 2020
c2d789f
TestKit logger with prefix
zbynek001 Nov 11, 2020
cc80e4b
sharding update
zbynek001 Nov 12, 2020
55b4537
sharding tests
zbynek001 Nov 12, 2020
ba28c66
sharding multinode tests
zbynek001 Nov 12, 2020
5cffe62
api approval
zbynek001 Nov 12, 2020
df19783
replace sqlite with MemoryJournalShared and local snapshot store
zbynek001 Nov 12, 2020
fb6ecc8
tests
zbynek001 Nov 13, 2020
5cb90af
snapshot inmem
zbynek001 Nov 13, 2020
ca3a3fa
backwards compatible PersistenceId for PersistentShardCoordinator
zbynek001 Nov 14, 2020
4d67fc2
test fix
zbynek001 Nov 14, 2020
2547e32
SnapshotStoreProxy & MemorySnapshotStoreShared
zbynek001 Nov 14, 2020
a0defd1
test snapshot store switched to shared inmem
zbynek001 Nov 14, 2020
02e2994
ExternalShardAllocationStrategy & tests
zbynek001 Nov 16, 2020
2bb32cb
ExternalShardAllocationStrategy API approval
zbynek001 Nov 16, 2020
aee94d3
test timing fix
zbynek001 Nov 16, 2020
f62f635
Merge branch 'dev' into sharding-update2
Aaronontheweb Dec 23, 2020
fc8d701
Merge branch 'dev' into sharding-update2
Arkatufus Dec 30, 2020
0ba18a7
review comments addressed
zbynek001 Jan 2, 2021
84a3330
IEquatable removed for singleton messages
zbynek001 Jan 3, 2021
0dca2a4
test fixes
zbynek001 Jan 3, 2021
5b4da3c
cleanup
zbynek001 Jan 3, 2021
1ab2252
test cleanup
zbynek001 Jan 4, 2021
6851a44
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Mar 8, 2021
cf60868
protobuf generated
zbynek001 Mar 8, 2021
3b30087
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Mar 10, 2021
20cc2c8
cleanup
zbynek001 Mar 10, 2021
704bfbf
Merge branch 'dev' of https://github.com/akkadotnet/akka.net into sha…
zbynek001 Apr 6, 2021
590f1e7
cleanup
zbynek001 Apr 6, 2021
5bdd442
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Jul 14, 2021
f799689
Race condition in DeprecatedSupervisionSpec fixed (migrated from http…
zbynek001 Jul 14, 2021
92e2a29
cleanup
zbynek001 Jul 14, 2021
2484e93
Small clarification of recovery strategy in config (migrated from htt…
zbynek001 Jul 14, 2021
759f8a9
Resolve snapshot check skipped for some events (migrated from https:/…
zbynek001 Jul 14, 2021
4d19943
additional sharding messages serialization, tests
zbynek001 Jul 14, 2021
cb35f11
api approval update
zbynek001 Jul 14, 2021
f84978b
disable durable storage on ShardRegionSpec
zbynek001 Jul 14, 2021
0a2f7bb
extend timeout for ExternalShardAllocationSpec
zbynek001 Jul 14, 2021
a610c06
naming conventions
zbynek001 Jul 15, 2021
3082d03
missing readonly added, updated syntax
zbynek001 Jul 15, 2021
c8575dd
renaming conventions
zbynek001 Jul 15, 2021
d01d619
Defer coordinator stop until region graceful stop has completed (migr…
zbynek001 Jul 28, 2021
e2efd10
sharding: actively signal 'region stopped' to the coordinator (migrat…
zbynek001 Jul 28, 2021
c68adab
racy test fix
zbynek001 Jul 28, 2021
fc58a9f
racy test verbose logging
zbynek001 Jul 28, 2021
e6fb13b
test update
zbynek001 Jul 28, 2021
cb4cac3
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Aug 3, 2021
be552bc
merge fix
zbynek001 Aug 3, 2021
c38268d
sharding ddata coordinator switch to ReadMajorityPlus/WriteMajorityPlus
zbynek001 Aug 3, 2021
44ccab1
more logs to debug tests
zbynek001 Aug 3, 2021
7c39c9f
more logs
zbynek001 Aug 3, 2021
9d0f515
Merge branch 'dev' into sharding-update2
Arkatufus Aug 3, 2021
c22b342
fix MultiNodeClusterSpec default timeouts
zbynek001 Aug 4, 2021
8916366
revert additional logs
zbynek001 Aug 4, 2021
b9631d6
override single-expect-default only for sharding tests
zbynek001 Aug 4, 2021
7b0e1b6
revert unrelated protobuf serializers
zbynek001 Aug 4, 2021
5b5a9a5
Merge branch 'dev' into sharding-update2
Arkatufus Aug 16, 2021
e11071e
Merge branch 'dev' into sharding-update2
Aaronontheweb Aug 18, 2021
05fd2ad
Fix StartEntitySpec instability (migrated from https://github.com/akk…
zbynek001 Aug 26, 2021
9335fb9
Quieter logging for ShardCoordinator initialization (migrated from ht…
zbynek001 Aug 26, 2021
ef73094
reduce default write-majority-plus for sharding (migrated from https:…
zbynek001 Aug 26, 2021
a6c1b8c
Merge remote-tracking branch 'remotes/akkadotnet/dev' into sharding-u…
zbynek001 Aug 26, 2021
95cd398
merge fix
zbynek001 Aug 26, 2021
fe8fc83
rebalance log fix
zbynek001 Sep 1, 2021
647346d
Merge branch 'dev' into sharding-update2
Aaronontheweb Feb 10, 2022
03abd3d
fixed compilation error from rebase
Aaronontheweb Feb 10, 2022
37af7c2
switch RememberEntitiesShardIdExtractorChangeSpec from ddata to persi…
zbynek001 Feb 10, 2022
645e3f6
disable durable storage on PersistentShardingMigrationSpec
zbynek001 Feb 10, 2022
2bfaf8d
clean up leveldb configuration
zbynek001 Feb 10, 2022
56304f9
Merge branch 'dev' into sharding-update2
Aaronontheweb Mar 4, 2022
ae4750b
Merge branch 'dev' into sharding-update2
Arkatufus Apr 5, 2022
c416295
Merge branch 'dev' into sharding-update2
Arkatufus Apr 12, 2022
4cdde2e
Merge branch 'sharding-update2' of https://github.com/zbynek001/akka.…
Aaronontheweb Apr 20, 2022
8be634f
Merge branch 'dev' into zbynek001-sharding-update2
Aaronontheweb Apr 20, 2022
6bedc7c
fix XML-DOC warnings
Aaronontheweb Apr 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add Dropped to Akka.Actor (migrated partially from akka/akka#27160)
Log Dropped from DeadLetterListener
  • Loading branch information
zbynek001 committed Nov 25, 2020
commit 9919ddb6da229282f2b78ec5399956ca099163ee
27 changes: 26 additions & 1 deletion src/core/Akka/Event/DeadLetter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ public interface IDeadLetterSuppression
}

/// <summary>
/// Represents a message that could not be delivered to it's recipient.
/// Represents a message that could not be delivered to it's recipient.
/// This message wraps the original message, the sender and the intended recipient of the message.
///
/// Subscribe to this class to be notified about all <see cref="DeadLetter"/> (also the suppressed ones)
/// and <see cref="Dropped"/>.
/// </summary>
public abstract class AllDeadLetters
{
Expand Down Expand Up @@ -106,4 +109,26 @@ public SuppressedDeadLetter(IDeadLetterSuppression message, IActorRef sender, IA
if (recipient == null) throw new ArgumentNullException(nameof(recipient), "SuppressedDeadLetter recipient may not be null");
}
}

/// <summary>
/// Envelope that is published on the eventStream wrapped in <see cref="DeadLetter"/> for every message that is
/// dropped due to overfull queues or routers with no routees.
///
/// When this message was sent without a sender <see cref="IActorRef"/>, `sender` will be <see cref="ActorRefs.NoSender"/> , i.e. `null`.
/// </summary>
public sealed class Dropped : AllDeadLetters
{
public Dropped(object message, string reason, IActorRef sender, IActorRef recipient)
: base(message, sender, recipient)
{
Reason = reason;
}

public Dropped(object message, string reason, IActorRef recipient)
: this(message, reason, ActorRefs.NoSender, recipient)
{
}

public string Reason { get; }
}
}
56 changes: 38 additions & 18 deletions src/core/Akka/Event/DeadLetterListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ protected override void PreRestart(Exception reason, object message)
protected override void PreStart()
{
_eventStream.Subscribe(Self, typeof(DeadLetter));
_eventStream.Subscribe(Self, typeof(Dropped));
}

/// <summary>
Expand Down Expand Up @@ -85,10 +86,10 @@ private Receive ReceiveWithAlwaysLogging()
{
return message =>
{
if (message is DeadLetter deadLetter)
if (message is AllDeadLetters d)
{
IncrementCount();
LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, "");
LogDeadLetter(d, "");
return true;
}
return false;
Expand All @@ -99,17 +100,17 @@ private Receive ReceiveWithMaxCountLogging()
{
return message =>
{
if (message is DeadLetter deadLetter)
if (message is AllDeadLetters d)
{
IncrementCount();
if (_count == _maxCount)
{
LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, ", no more dead letters will be logged");
LogDeadLetter(d, ", no more dead letters will be logged");
Context.Stop(Self);
}
else
{
LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, "");
LogDeadLetter(d, "");
}
return true;
}
Expand All @@ -121,18 +122,18 @@ private Receive ReceiveWithSuspendLogging(TimeSpan suspendDuration)
{
return message =>
{
if (message is DeadLetter deadLetter)
if (message is AllDeadLetters d)
{
IncrementCount();
if (_count == _maxCount)
{
var doneMsg = $", no more dead letters will be logged in next [{suspendDuration}]";
LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, doneMsg);
LogDeadLetter(d, doneMsg);
Context.Become(ReceiveWhenSuspended(suspendDuration, Deadline.Now + suspendDuration));
}
else
{
LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, "");
LogDeadLetter(d, "");
}
return true;
}
Expand All @@ -144,13 +145,13 @@ private Receive ReceiveWhenSuspended(TimeSpan suspendDuration, Deadline suspendD
{
return message =>
{
if (message is DeadLetter deadLetter)
if (message is AllDeadLetters d)
{
IncrementCount();
if (suspendDeadline.IsOverdue)
{
var doneMsg = $", of which {(_count - _maxCount - 1).ToString()} were not logged. The counter will be reset now";
LogDeadLetter(deadLetter.Message, deadLetter.Sender, deadLetter.Recipient, doneMsg);
LogDeadLetter(d, doneMsg);
_count = 0;
Context.Become(ReceiveWithSuspendLogging(suspendDuration));
}
Expand All @@ -160,18 +161,37 @@ private Receive ReceiveWhenSuspended(TimeSpan suspendDuration, Deadline suspendD
};
}

private void LogDeadLetter(object message, IActorRef snd, IActorRef recipient, string doneMsg)
private void LogDeadLetter(AllDeadLetters d, string doneMsg)
{
var origin = ReferenceEquals(snd, Context.System.DeadLetters) ? "without sender" : $"from {snd.Path}";
var origin = IsReal(d.Sender) ? $" from {d.Sender}" : "";

string logMessage;
switch (d)
{
case Dropped dropped:
var destination = IsReal(d.Recipient) ? $" to {d.Recipient}" : "";
logMessage = $"Message [{d.Message.GetType().Name}]{origin}{destination} was dropped. {dropped.Reason}. " +
$"[{_count}] dead letters encountered{doneMsg}. ";
break;
default:
logMessage = $"Message [{d.Message.GetType().Name}]{origin} to {d.Recipient} was not delivered. " +
$"[{_count}] dead letters encountered{doneMsg}. " +
$"If this is not an expected behavior then {d.Recipient} may have terminated unexpectedly. ";
break;
}
_eventStream.Publish(new Info(
recipient.Path.ToString(),
recipient.GetType(),
$"Message [{message.GetType().Name}] {origin} to {recipient.Path} was not delivered. [{_count.ToString()}] dead letters encountered{doneMsg}. " +
$"If this is not an expected behavior then {recipient.Path} may have terminated unexpectedly. " +
d.Recipient.Path.ToString(),
d.Recipient.GetType(),
logMessage +
"This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' " +
"and 'akka.log-dead-letters-during-shutdown'."));
}

private bool IsReal(IActorRef snd)
{
return !ReferenceEquals(snd, ActorRefs.NoSender) && !ReferenceEquals(snd, Context.System.DeadLetters) && !(snd is DeadLetterActorRef);
}

/// <summary>
/// This class represents the latest date or time by which an operation should be completed.
/// </summary>
Expand Down Expand Up @@ -209,9 +229,9 @@ private void LogDeadLetter(object message, IActorRef snd, IActorRef recipient, s
public TimeSpan TimeLeft { get { return When - DateTime.UtcNow; } }

#region Overrides

/// <inheritdoc/>
public override bool Equals(object obj) =>
public override bool Equals(object obj) =>
obj is Deadline deadline && Equals(deadline);

/// <inheritdoc/>
Expand Down