From 7fc86515aac39fe7267740e44b484912f2bf5761 Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Wed, 15 Dec 2021 16:30:50 -0800 Subject: [PATCH 1/2] implement visitor pattern instead of using C# double-dynamic --- .../PartitionState/TrackedObject.cs | 18 ++++++++++++++++++ .../Events/Fragments/PartitionEventFragment.cs | 7 ++++++- .../FromClients/ClientTaskMessagesReceived.cs | 5 +++++ .../FromClients/CreationRequestReceived.cs | 5 +++++ .../FromClients/DeletionRequestReceived.cs | 5 +++++ .../FromClients/InstanceQueryReceived.cs | 5 +++++ .../FromClients/PurgeRequestReceived.cs | 5 +++++ .../FromClients/WaitRequestReceived.cs | 5 +++++ .../FromLoadMonitor/SolicitationReceived.cs | 5 +++++ .../FromLoadMonitor/TransferCommandReceived.cs | 5 +++++ .../FromPartitions/ActivityTransferReceived.cs | 5 +++++ .../RemoteActivityResultReceived.cs | 5 +++++ .../FromPartitions/TaskMessagesReceived.cs | 5 +++++ .../Internal/ActivityCompleted.cs | 5 +++++ .../PartitionEvents/Internal/BatchProcessed.cs | 5 +++++ .../Internal/OffloadDecision.cs | 5 +++++ .../Internal/PurgeBatchIssued.cs | 5 +++++ .../PartitionEvents/Internal/SendConfirmed.cs | 5 +++++ .../PartitionEvents/Internal/TimerFired.cs | 5 +++++ .../PartitionEvents/PartitionUpdateEvent.cs | 2 ++ .../PartitionState/ActivitiesState.cs | 12 ++++++------ .../PartitionState/HistoryState.cs | 2 +- .../PartitionState/InstanceState.cs | 10 +++++----- .../PartitionState/OutboxState.cs | 10 +++++----- .../PartitionState/QueriesState.cs | 14 ++++++++++++-- .../PartitionState/SessionsState.cs | 18 +++++++++--------- 26 files changed, 149 insertions(+), 29 deletions(-) diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs index e4615dee..82521f4d 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs @@ -84,5 +84,23 @@ public virtual void Process(PartitionEventFragment e, EffectTracker effects) dynamic dynamicPartitionEvent = e.ReassembledEvent; dynamicThis.Process(dynamicPartitionEvent, effects); } + + public virtual void Process(BatchProcessed evt, EffectTracker tracker) { } + public virtual void Process(CreationRequestReceived evt, EffectTracker tracker) { } + public virtual void Process(DeletionRequestReceived evt, EffectTracker tracker) { } + public virtual void Process(InstanceQueryReceived evt, EffectTracker tracker) { } + public virtual void Process(PurgeRequestReceived evt, EffectTracker tracker) { } + public virtual void Process(WaitRequestReceived evt, EffectTracker tracker) { } + public virtual void Process(PurgeBatchIssued evt, EffectTracker tracker) { } + public virtual void Process(ClientTaskMessagesReceived evt, EffectTracker tracker) { } + public virtual void Process(SolicitationReceived evt, EffectTracker tracker) { } + public virtual void Process(TransferCommandReceived evt, EffectTracker tracker) { } + public virtual void Process(ActivityTransferReceived evt, EffectTracker tracker) { } + public virtual void Process(RemoteActivityResultReceived evt, EffectTracker tracker) { } + public virtual void Process(TaskMessagesReceived evt, EffectTracker tracker) { } + public virtual void Process(ActivityCompleted evt, EffectTracker tracker) { } + public virtual void Process(OffloadDecision evt, EffectTracker tracker) { } + public virtual void Process(SendConfirmed evt, EffectTracker tracker) { } + public virtual void Process(TimerFired evt, EffectTracker tracker) { } } } diff --git a/src/DurableTask.Netherite/Events/Fragments/PartitionEventFragment.cs b/src/DurableTask.Netherite/Events/Fragments/PartitionEventFragment.cs index c55b1c72..c37d7354 100644 --- a/src/DurableTask.Netherite/Events/Fragments/PartitionEventFragment.cs +++ b/src/DurableTask.Netherite/Events/Fragments/PartitionEventFragment.cs @@ -38,10 +38,15 @@ protected override void ExtraTraceInformation(StringBuilder s) } } - public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Reassembly); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs index 286d9e91..c383638b 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/ClientTaskMessagesReceived.cs @@ -20,5 +20,10 @@ public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Sessions); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs index 91016047..4c0b8388 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/CreationRequestReceived.cs @@ -50,5 +50,10 @@ public override bool OnReadComplete(TrackedObject target, Partition partition) return true; } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs index 3beb267b..4224f4ee 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/DeletionRequestReceived.cs @@ -24,5 +24,10 @@ class DeletionRequestReceived : ClientRequestEventWithPrefetch [IgnoreDataMember] public override string TracedInstanceId => this.InstanceId; + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/InstanceQueryReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/InstanceQueryReceived.cs index 13da185c..7ac98a7a 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/InstanceQueryReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/InstanceQueryReceived.cs @@ -49,5 +49,10 @@ public async override Task OnQueryCompleteAsync(IAsyncEnumerable instances, Partition partition) { int batchCount = 0; diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs index da0c7926..aa00e58e 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromClients/WaitRequestReceived.cs @@ -22,6 +22,11 @@ class WaitRequestReceived : ClientRequestEventWithPrefetch [IgnoreDataMember] public override string TracedInstanceId => this.InstanceId; + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + protected override void ExtraTraceInformation(StringBuilder s) { s.Append(' '); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/SolicitationReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/SolicitationReceived.cs index ade8398e..253143ea 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/SolicitationReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/SolicitationReceived.cs @@ -25,5 +25,10 @@ public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Activities); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/TransferCommandReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/TransferCommandReceived.cs index 2833084c..6c833e6c 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/TransferCommandReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromLoadMonitor/TransferCommandReceived.cs @@ -36,6 +36,11 @@ public override void DetermineEffects(EffectTracker effects) effects.Add(TrackedObjectKey.Activities); } + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + protected override void ExtraTraceInformation(StringBuilder s) { base.ExtraTraceInformation(s); diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/ActivityTransferReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/ActivityTransferReceived.cs index aff080ae..41c0892b 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/ActivityTransferReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/ActivityTransferReceived.cs @@ -24,5 +24,10 @@ class ActivityTransferReceived : PartitionMessageEvent [IgnoreDataMember] public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages => this.TransferredActivities; + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/RemoteActivityResultReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/RemoteActivityResultReceived.cs index 44320ee9..4b2fb8df 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/RemoteActivityResultReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/RemoteActivityResultReceived.cs @@ -32,6 +32,11 @@ class RemoteActivityResultReceived : PartitionMessageEvent [IgnoreDataMember] public string WorkItemId => ActivitiesState.GetWorkItemId(this.OriginPartition, this.ActivityId); + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + [IgnoreDataMember] public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages { diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/TaskMessagesReceived.cs b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/TaskMessagesReceived.cs index 2be375e3..01ef2a9d 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/TaskMessagesReceived.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/External/FromPartitions/TaskMessagesReceived.cs @@ -32,6 +32,11 @@ class TaskMessagesReceived : PartitionMessageEvent [IgnoreDataMember] public int NumberMessages => (this.TaskMessages?.Count ?? 0) + (this.DelayedTaskMessages?.Count ?? 0); + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + [IgnoreDataMember] public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages { diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/ActivityCompleted.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/ActivityCompleted.cs index 92b30e6c..6e73c473 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/ActivityCompleted.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/ActivityCompleted.cs @@ -39,5 +39,10 @@ public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Activities); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs index 7603b085..c1c6ec5e 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/BatchProcessed.cs @@ -81,6 +81,11 @@ public override void DetermineEffects(EffectTracker effects) effects.Add(TrackedObjectKey.Sessions); } + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + public IEnumerable LoopBackMessages() { if (this.ActivityMessages != null) diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/OffloadDecision.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/OffloadDecision.cs index 06197d79..21c389b1 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/OffloadDecision.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/OffloadDecision.cs @@ -27,5 +27,10 @@ public override void DetermineEffects(EffectTracker effects) // and if offloading, fills in the fields, and adds the outbox to the effects effects.Add(TrackedObjectKey.Activities); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/PurgeBatchIssued.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/PurgeBatchIssued.cs index 1a569481..0c293a2a 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/PurgeBatchIssued.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/PurgeBatchIssued.cs @@ -52,6 +52,11 @@ public IEnumerable KeysToPrefetch } } + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } + public override void DetermineEffects(EffectTracker effects) { // the last-added effects are processed first diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/SendConfirmed.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/SendConfirmed.cs index 56c68353..f944e7ad 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/SendConfirmed.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/SendConfirmed.cs @@ -21,5 +21,10 @@ public override void DetermineEffects(EffectTracker effects) { effects.Add(TrackedObjectKey.Outbox); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs index 08b678d1..ad1bbaf7 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/Internal/TimerFired.cs @@ -39,5 +39,10 @@ public override void DetermineEffects(EffectTracker effects) effects.Add(TrackedObjectKey.Sessions); effects.Add(TrackedObjectKey.Timers); } + + public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects) + { + trackedObject.Process(this, effects); + } } } diff --git a/src/DurableTask.Netherite/Events/PartitionEvents/PartitionUpdateEvent.cs b/src/DurableTask.Netherite/Events/PartitionEvents/PartitionUpdateEvent.cs index 2289b6d6..a7689d30 100644 --- a/src/DurableTask.Netherite/Events/PartitionEvents/PartitionUpdateEvent.cs +++ b/src/DurableTask.Netherite/Events/PartitionEvents/PartitionUpdateEvent.cs @@ -24,5 +24,7 @@ abstract class PartitionUpdateEvent : PartitionEvent public OutboxState.Batch OutboxBatch { get; set; } public abstract void DetermineEffects(EffectTracker effects); + + public abstract void ApplyTo(TrackedObject trackedObject, EffectTracker effectTracker); } } \ No newline at end of file diff --git a/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs b/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs index c64928a5..269114f9 100644 --- a/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs +++ b/src/DurableTask.Netherite/PartitionState/ActivitiesState.cs @@ -158,7 +158,7 @@ public bool TryGetNextActivity(out ActivityInfo activityInfo) } } - public void Process(BatchProcessed evt, EffectTracker effects) + public override void Process(BatchProcessed evt, EffectTracker effects) { // the completed orchestration work item can launch activities foreach (var msg in evt.ActivityMessages) @@ -198,7 +198,7 @@ public void Process(BatchProcessed evt, EffectTracker effects) } } - public void Process(ActivityTransferReceived evt, EffectTracker effects) + public override void Process(ActivityTransferReceived evt, EffectTracker effects) { // may bring in offloaded activities from other partitions foreach (var msg in evt.TransferredActivities) @@ -239,7 +239,7 @@ public void Process(ActivityTransferReceived evt, EffectTracker effects) } } - public void Process(ActivityCompleted evt, EffectTracker effects) + public override void Process(ActivityCompleted evt, EffectTracker effects) { // records the result of a finished activity @@ -284,7 +284,7 @@ public void Process(ActivityCompleted evt, EffectTracker effects) } } - public void Process(TransferCommandReceived evt, EffectTracker effects) + public override void Process(TransferCommandReceived evt, EffectTracker effects) { evt.TransferredActivities = new List<(TaskMessage, string)>(); @@ -310,12 +310,12 @@ public void Process(TransferCommandReceived evt, EffectTracker effects) effects.Add(TrackedObjectKey.Outbox); } - public void Process(SolicitationReceived evt, EffectTracker effects) + public override void Process(SolicitationReceived evt, EffectTracker effects) { this.LastSolicitation = evt.Timestamp; } - public void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects) + public override void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects) { // check for offload conditions and if satisfied, send batch to remote if (this.LocalBacklog.Count == 0) diff --git a/src/DurableTask.Netherite/PartitionState/HistoryState.cs b/src/DurableTask.Netherite/PartitionState/HistoryState.cs index b6c67701..09342b44 100644 --- a/src/DurableTask.Netherite/PartitionState/HistoryState.cs +++ b/src/DurableTask.Netherite/PartitionState/HistoryState.cs @@ -43,7 +43,7 @@ public override string ToString() return $"History InstanceId={this.InstanceId} ExecutionId={this.ExecutionId} Events={this.History.Count}"; } - public void Process(BatchProcessed evt, EffectTracker effects) + public override void Process(BatchProcessed evt, EffectTracker effects) { // can add events to the history, or replace it with a new history diff --git a/src/DurableTask.Netherite/PartitionState/InstanceState.cs b/src/DurableTask.Netherite/PartitionState/InstanceState.cs index 84ee3fdd..a7b813fc 100644 --- a/src/DurableTask.Netherite/PartitionState/InstanceState.cs +++ b/src/DurableTask.Netherite/PartitionState/InstanceState.cs @@ -32,7 +32,7 @@ public override string ToString() return $"History InstanceId={this.InstanceId} Status={this.OrchestrationState?.OrchestrationStatus}"; } - public void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) + public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) { bool filterDuplicate = this.OrchestrationState != null && creationRequestReceived.DedupeStatuses != null @@ -84,7 +84,7 @@ public void Process(CreationRequestReceived creationRequestReceived, EffectTrack } - public void Process(BatchProcessed evt, EffectTracker effects) + public override void Process(BatchProcessed evt, EffectTracker effects) { // update the state of an orchestration this.OrchestrationState = evt.State; @@ -104,7 +104,7 @@ public void Process(BatchProcessed evt, EffectTracker effects) } } - public void Process(WaitRequestReceived evt, EffectTracker effects) + public override void Process(WaitRequestReceived evt, EffectTracker effects) { if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState)) { @@ -131,7 +131,7 @@ public void Process(WaitRequestReceived evt, EffectTracker effects) } } - public void Process(DeletionRequestReceived deletionRequest, EffectTracker effects) + public override void Process(DeletionRequestReceived deletionRequest, EffectTracker effects) { int numberInstancesDeleted = 0; @@ -159,7 +159,7 @@ public void Process(DeletionRequestReceived deletionRequest, EffectTracker effec } } - public void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects) + public override void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects) { OrchestrationState state = this.OrchestrationState; if (this.OrchestrationState != null diff --git a/src/DurableTask.Netherite/PartitionState/OutboxState.cs b/src/DurableTask.Netherite/PartitionState/OutboxState.cs index 65cb454f..ba4bab8a 100644 --- a/src/DurableTask.Netherite/PartitionState/OutboxState.cs +++ b/src/DurableTask.Netherite/PartitionState/OutboxState.cs @@ -144,7 +144,7 @@ public void ConfirmDurable(Event evt) } } - public void Process(SendConfirmed evt, EffectTracker _) + public override void Process(SendConfirmed evt, EffectTracker _) { this.Partition.EventDetailTracer?.TraceEventProcessingDetail($"Store has sent all outbound messages by event {evt} id={evt.EventIdString}"); @@ -152,7 +152,7 @@ public void Process(SendConfirmed evt, EffectTracker _) this.Outbox.Remove(evt.Position); } - public void Process(ActivityCompleted evt, EffectTracker effects) + public override void Process(ActivityCompleted evt, EffectTracker effects) { var batch = new Batch(); batch.OutgoingMessages.Add(new RemoteActivityResultReceived() @@ -166,7 +166,7 @@ public void Process(ActivityCompleted evt, EffectTracker effects) this.SendBatchOnceEventIsPersisted(evt, effects, batch); } - public void Process(BatchProcessed evt, EffectTracker effects) + public override void Process(BatchProcessed evt, EffectTracker effects) { var batch = new Batch(); int subPosition = 0; @@ -241,7 +241,7 @@ void AddMessage(TaskMessagesReceived outmessage, TaskMessage message) this.SendBatchOnceEventIsPersisted(evt, effects, batch); } - public void Process(OffloadDecision evt, EffectTracker effects) + public override void Process(OffloadDecision evt, EffectTracker effects) { var batch = new Batch(); @@ -258,7 +258,7 @@ public void Process(OffloadDecision evt, EffectTracker effects) this.SendBatchOnceEventIsPersisted(evt, effects, batch); } - public void Process(TransferCommandReceived evt, EffectTracker effects) + public override void Process(TransferCommandReceived evt, EffectTracker effects) { var batch = new Batch(); batch.OutgoingMessages.Add(new ActivityTransferReceived() diff --git a/src/DurableTask.Netherite/PartitionState/QueriesState.cs b/src/DurableTask.Netherite/PartitionState/QueriesState.cs index 0af5c980..eb0147a7 100644 --- a/src/DurableTask.Netherite/PartitionState/QueriesState.cs +++ b/src/DurableTask.Netherite/PartitionState/QueriesState.cs @@ -41,7 +41,17 @@ public override string ToString() return $"Queries ({this.PendingQueries.Count} pending)"; } - public void Process(ClientRequestEventWithQuery clientRequestEvent, EffectTracker effects) + public override void Process(InstanceQueryReceived clientRequestEvent, EffectTracker effects) + { + this.Process(clientRequestEvent, effects); + } + + public override void Process(PurgeRequestReceived clientRequestEvent, EffectTracker effects) + { + this.Process(clientRequestEvent, effects); + } + + void Process(ClientRequestEventWithQuery clientRequestEvent, EffectTracker effects) { if (clientRequestEvent.Phase == ClientRequestEventWithQuery.ProcessingPhase.Query) { @@ -56,7 +66,7 @@ public void Process(ClientRequestEventWithQuery clientRequestEvent, EffectTracke } } - public void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects) + public override void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects) { var purgeRequest = (PurgeRequestReceived)this.PendingQueries[purgeBatchIssued.QueryEventId]; purgeRequest.NumberInstancesPurged += purgeBatchIssued.Purged.Count; diff --git a/src/DurableTask.Netherite/PartitionState/SessionsState.cs b/src/DurableTask.Netherite/PartitionState/SessionsState.cs index 99314778..44f58933 100644 --- a/src/DurableTask.Netherite/PartitionState/SessionsState.cs +++ b/src/DurableTask.Netherite/PartitionState/SessionsState.cs @@ -215,7 +215,7 @@ void AddMessagesToSession(string instanceId, string originWorkItemId, IEnumerabl return lastOccurence; } - public void Process(TaskMessagesReceived evt, EffectTracker effects) + public override void Process(TaskMessagesReceived evt, EffectTracker effects) { // queues task message (from another partition) in a new or existing session foreach (var group in evt.TaskMessages @@ -225,44 +225,44 @@ public void Process(TaskMessagesReceived evt, EffectTracker effects) } } - public void Process(RemoteActivityResultReceived evt, EffectTracker effects) + public override void Process(RemoteActivityResultReceived evt, EffectTracker effects) { // queues task message (from another partition) in a new or existing session this.AddMessageToSession(evt.Result, evt.WorkItemId, effects.IsReplaying); } - public void Process(ClientTaskMessagesReceived evt, EffectTracker effects) + public override void Process(ClientTaskMessagesReceived evt, EffectTracker effects) { // queues task message (from a client) in a new or existing session var instanceId = evt.TaskMessages[0].OrchestrationInstance.InstanceId; this.AddMessagesToSession(instanceId, evt.WorkItemId, evt.TaskMessages, effects.IsReplaying); } - public void Process(TimerFired timerFired, EffectTracker effects) + public override void Process(TimerFired timerFired, EffectTracker effects) { // queues a timer fired message in a session this.AddMessageToSession(timerFired.TaskMessage, timerFired.OriginWorkItemId, effects.IsReplaying); } - public void Process(ActivityCompleted activityCompleted, EffectTracker effects) + public override void Process(ActivityCompleted activityCompleted, EffectTracker effects) { // queues an activity-completed message in a session this.AddMessageToSession(activityCompleted.Response, activityCompleted.WorkItemId, effects.IsReplaying); } - public void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) + public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) { // queues the execution started message this.AddMessageToSession(creationRequestReceived.TaskMessage, creationRequestReceived.WorkItemId, effects.IsReplaying); } - public void Process(DeletionRequestReceived deletionRequestReceived, EffectTracker effects) + public override void Process(DeletionRequestReceived deletionRequestReceived, EffectTracker effects) { // removing the session means that all pending messages will be deleted also. this.Sessions.Remove(deletionRequestReceived.InstanceId); } - public void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects) + public override void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects) { foreach (string instanceId in purgeBatchIssued.Purged) { @@ -278,7 +278,7 @@ public void ConfirmDurable(Event evt) this.Partition.SubmitEvent(evtCopy); } - public void Process(BatchProcessed evt, EffectTracker effects) + public override void Process(BatchProcessed evt, EffectTracker effects) { // if speculation is disabled, if (evt.PersistFirst != BatchProcessed.PersistFirstStatus.NotRequired) From bfba742b1e4b9b715ed658be340aee21dea1705d Mon Sep 17 00:00:00 2001 From: Sebastian Burckhardt Date: Wed, 22 Dec 2021 10:16:49 -0800 Subject: [PATCH 2/2] fix missing cases and remove all occurrences of dynamic. --- .../PartitionState/EffectTracker.cs | 10 ++++------ .../PartitionState/TrackedObject.cs | 4 +--- .../LoadMonitor/LoadMonitor.cs | 8 ++++++-- .../PartitionState/DedupState.cs | 6 +++--- .../PartitionState/PrefetchState.cs | 17 ++++++++++++++++- .../PartitionState/QueriesState.cs | 6 +++--- .../PartitionState/TimersState.cs | 8 ++++---- .../Tracing/EventTraceHelper.cs | 1 - 8 files changed, 37 insertions(+), 23 deletions(-) diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs index 3709529d..826253b6 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/EffectTracker.cs @@ -52,24 +52,22 @@ public EffectTracker(Partition partition, Func - /// Applies the event to the given tracked object, using dynamic dispatch to + /// Applies the event to the given tracked object, using visitor pattern to /// select the correct Process method overload for the event. /// /// The tracked object on which the event should be applied. /// Called by the storage layer when this object calls applyToStore. - public void ProcessEffectOn(dynamic trackedObject) + public void ProcessEffectOn(TrackedObject trackedObject) { - this.Partition.Assert(this.currentUpdate != null); try { - trackedObject.Process(this.effect, this); + this.currentUpdate.ApplyTo(trackedObject, this); } catch (Exception exception) when (!Utils.IsFatal(exception)) { diff --git a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs index 81b789b1..2f9c3b6b 100644 --- a/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs +++ b/src/DurableTask.Netherite/Abstractions/PartitionState/TrackedObject.cs @@ -83,9 +83,7 @@ public virtual void UpdateLoadInfo(PartitionLoadInfo info) public virtual void Process(PartitionEventFragment e, EffectTracker effects) { // processing a reassembled event just applies the original event - dynamic dynamicThis = this; - dynamic dynamicPartitionEvent = e.ReassembledEvent; - dynamicThis.Process(dynamicPartitionEvent, effects); + ((PartitionUpdateEvent) e.ReassembledEvent).ApplyTo(this, effects); } public virtual void Process(BatchProcessed evt, EffectTracker tracker) { } diff --git a/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs b/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs index b7b7d4ec..76f60bb5 100644 --- a/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs +++ b/src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs @@ -94,8 +94,12 @@ public Task StopAsync() void TransportAbstraction.ILoadMonitor.Process(LoadMonitorEvent loadMonitorEvent) { - // dispatch call to matching method - this.Process((dynamic)loadMonitorEvent); + switch (loadMonitorEvent) + { + case LoadInformationReceived loadInformationReceived: + this.Process(loadInformationReceived); + break; + } } void SendTransferCommand(uint from, uint to, int num) diff --git a/src/DurableTask.Netherite/PartitionState/DedupState.cs b/src/DurableTask.Netherite/PartitionState/DedupState.cs index 99ba515b..c6b1b064 100644 --- a/src/DurableTask.Netherite/PartitionState/DedupState.cs +++ b/src/DurableTask.Netherite/PartitionState/DedupState.cs @@ -40,7 +40,7 @@ bool IsNotDuplicate(PartitionMessageEvent evt) } } - public void Process(ActivityTransferReceived evt, EffectTracker effects) + public override void Process(ActivityTransferReceived evt, EffectTracker effects) { // queues activities originating from a remote partition to execute on this partition if (this.IsNotDuplicate(evt)) @@ -49,7 +49,7 @@ public void Process(ActivityTransferReceived evt, EffectTracker effects) } } - public void Process(RemoteActivityResultReceived evt, EffectTracker effects) + public override void Process(RemoteActivityResultReceived evt, EffectTracker effects) { // returns a response to an ongoing orchestration, and reports load data to the offload logic if (this.IsNotDuplicate(evt)) @@ -58,7 +58,7 @@ public void Process(RemoteActivityResultReceived evt, EffectTracker effects) } } - public void Process(TaskMessagesReceived evt, EffectTracker effects) + public override void Process(TaskMessagesReceived evt, EffectTracker effects) { // contains messages to be processed by sessions and/or to be scheduled by timer if (this.IsNotDuplicate(evt)) diff --git a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs index 057cc108..22441d8b 100644 --- a/src/DurableTask.Netherite/PartitionState/PrefetchState.cs +++ b/src/DurableTask.Netherite/PartitionState/PrefetchState.cs @@ -41,7 +41,22 @@ public override string ToString() return $"Prefetch ({this.PendingPrefetches.Count} pending)"; } - public void Process(ClientRequestEventWithPrefetch clientRequestEvent, EffectTracker effects) + public override void Process(CreationRequestReceived creationRequestEvent, EffectTracker effects) + { + this.ProcessClientRequestEventWithPrefetch(creationRequestEvent, effects); + } + + public override void Process(DeletionRequestReceived deletionRequestEvent, EffectTracker effects) + { + this.ProcessClientRequestEventWithPrefetch(deletionRequestEvent, effects); + } + + public override void Process(WaitRequestReceived waitRequestEvent, EffectTracker effects) + { + this.ProcessClientRequestEventWithPrefetch(waitRequestEvent, effects); + } + + void ProcessClientRequestEventWithPrefetch(ClientRequestEventWithPrefetch clientRequestEvent, EffectTracker effects) { if (clientRequestEvent.Phase == ClientRequestEventWithPrefetch.ProcessingPhase.Read) { diff --git a/src/DurableTask.Netherite/PartitionState/QueriesState.cs b/src/DurableTask.Netherite/PartitionState/QueriesState.cs index eb0147a7..7f0ef10e 100644 --- a/src/DurableTask.Netherite/PartitionState/QueriesState.cs +++ b/src/DurableTask.Netherite/PartitionState/QueriesState.cs @@ -43,15 +43,15 @@ public override string ToString() public override void Process(InstanceQueryReceived clientRequestEvent, EffectTracker effects) { - this.Process(clientRequestEvent, effects); + this.ProcessClientRequestEventWithQuery(clientRequestEvent, effects); } public override void Process(PurgeRequestReceived clientRequestEvent, EffectTracker effects) { - this.Process(clientRequestEvent, effects); + this.ProcessClientRequestEventWithQuery(clientRequestEvent, effects); } - void Process(ClientRequestEventWithQuery clientRequestEvent, EffectTracker effects) + void ProcessClientRequestEventWithQuery(ClientRequestEventWithQuery clientRequestEvent, EffectTracker effects) { if (clientRequestEvent.Phase == ClientRequestEventWithQuery.ProcessingPhase.Query) { diff --git a/src/DurableTask.Netherite/PartitionState/TimersState.cs b/src/DurableTask.Netherite/PartitionState/TimersState.cs index 6b4eb8f9..c1fb3a97 100644 --- a/src/DurableTask.Netherite/PartitionState/TimersState.cs +++ b/src/DurableTask.Netherite/PartitionState/TimersState.cs @@ -101,13 +101,13 @@ static DateTime GetDueTime(TaskMessage message) } } - public void Process(TimerFired evt, EffectTracker effects) + public override void Process(TimerFired evt, EffectTracker effects) { // removes the entry for the pending timer, and then adds it to the sessions queue this.PendingTimers.Remove(evt.TimerId); } - public void Process(BatchProcessed evt, EffectTracker effects) + public override void Process(BatchProcessed evt, EffectTracker effects) { // starts new timers as specified by the batch foreach (var taskMessage in evt.TimerMessages) @@ -116,7 +116,7 @@ public void Process(BatchProcessed evt, EffectTracker effects) } } - public void Process(TaskMessagesReceived evt, EffectTracker effects) + public override void Process(TaskMessagesReceived evt, EffectTracker effects) { // starts new timers as specified by the batch foreach (var taskMessage in evt.DelayedTaskMessages) @@ -125,7 +125,7 @@ public void Process(TaskMessagesReceived evt, EffectTracker effects) } } - public void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) + public override void Process(CreationRequestReceived creationRequestReceived, EffectTracker effects) { this.AddTimer(creationRequestReceived.TaskMessage, creationRequestReceived.EventIdString, effects.IsReplaying); } diff --git a/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs b/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs index c97eff8a..bce8b793 100644 --- a/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs +++ b/src/DurableTask.Netherite/Tracing/EventTraceHelper.cs @@ -6,7 +6,6 @@ namespace DurableTask.Netherite using DurableTask.Core; using DurableTask.Core.History; using Dynamitey; - using Dynamitey.DynamicObjects; using FASTER.core; using Microsoft.Extensions.Logging; using System;