Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace double-dynamic invocation #95

Merged
merged 3 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,22 @@ public EffectTracker(Partition partition, Func<TrackedObjectKey, EffectTracker,

void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent)
{
this.effect = this.currentUpdate = updateEvent;
this.currentUpdate = updateEvent;
}

PartitionUpdateEvent currentUpdate;
dynamic effect;

/// <summary>
/// Applies the event to the given tracked object, using dynamic dispatch to
/// Applies the event to the given tracked object, using visitor pattern to
/// select the correct Process method overload for the event.
/// </summary>
/// <param name="trackedObject">The tracked object on which the event should be applied.</param>
/// <remarks>Called by the storage layer when this object calls applyToStore.</remarks>
public void ProcessEffectOn(dynamic trackedObject)
public void ProcessEffectOn(TrackedObject trackedObject)
{
this.Partition.Assert(this.currentUpdate != null);
try
{
trackedObject.Process(this.effect, this);
this.currentUpdate.ApplyTo(trackedObject, this);
}
catch (Exception exception) when (!Utils.IsFatal(exception))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,25 @@ public virtual void UpdateLoadInfo(PartitionLoadInfo info)
public virtual void Process(PartitionEventFragment e, EffectTracker effects)
{
// processing a reassembled event just applies the original event
dynamic dynamicThis = this;
dynamic dynamicPartitionEvent = e.ReassembledEvent;
dynamicThis.Process(dynamicPartitionEvent, effects);
((PartitionUpdateEvent) e.ReassembledEvent).ApplyTo(this, effects);
}

public virtual void Process(BatchProcessed evt, EffectTracker tracker) { }
public virtual void Process(CreationRequestReceived evt, EffectTracker tracker) { }
public virtual void Process(DeletionRequestReceived evt, EffectTracker tracker) { }
public virtual void Process(InstanceQueryReceived evt, EffectTracker tracker) { }
public virtual void Process(PurgeRequestReceived evt, EffectTracker tracker) { }
public virtual void Process(WaitRequestReceived evt, EffectTracker tracker) { }
public virtual void Process(PurgeBatchIssued evt, EffectTracker tracker) { }
public virtual void Process(ClientTaskMessagesReceived evt, EffectTracker tracker) { }
public virtual void Process(SolicitationReceived evt, EffectTracker tracker) { }
public virtual void Process(TransferCommandReceived evt, EffectTracker tracker) { }
public virtual void Process(ActivityTransferReceived evt, EffectTracker tracker) { }
public virtual void Process(RemoteActivityResultReceived evt, EffectTracker tracker) { }
public virtual void Process(TaskMessagesReceived evt, EffectTracker tracker) { }
public virtual void Process(ActivityCompleted evt, EffectTracker tracker) { }
public virtual void Process(OffloadDecision evt, EffectTracker tracker) { }
public virtual void Process(SendConfirmed evt, EffectTracker tracker) { }
public virtual void Process(TimerFired evt, EffectTracker tracker) { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ protected override void ExtraTraceInformation(StringBuilder s)
}
}


public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Reassembly);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,10 @@ public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Sessions);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,10 @@ public override bool OnReadComplete(TrackedObject target, Partition partition)

return true;
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ class DeletionRequestReceived : ClientRequestEventWithPrefetch

[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,10 @@ public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationSt
response.SerializeOrchestrationStates(memoryStream, this.InstanceQuery.FetchInput);
partition.Send(response);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ class PurgeRequestReceived : ClientRequestEventWithQuery

const int MaxBatchSize = 1000;

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

public async override Task OnQueryCompleteAsync(IAsyncEnumerable<OrchestrationState> instances, Partition partition)
{
int batchCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class WaitRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

protected override void ExtraTraceInformation(StringBuilder s)
{
s.Append(' ');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,10 @@ public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Activities);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public override void DetermineEffects(EffectTracker effects)
effects.Add(TrackedObjectKey.Activities);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

protected override void ExtraTraceInformation(StringBuilder s)
{
base.ExtraTraceInformation(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,10 @@ class ActivityTransferReceived : PartitionMessageEvent
[IgnoreDataMember]
public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages =>
this.TransferredActivities;

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class RemoteActivityResultReceived : PartitionMessageEvent
[IgnoreDataMember]
public string WorkItemId => ActivitiesState.GetWorkItemId(this.OriginPartition, this.ActivityId);

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

[IgnoreDataMember]
public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ class TaskMessagesReceived : PartitionMessageEvent
[IgnoreDataMember]
public int NumberMessages => (this.TaskMessages?.Count ?? 0) + (this.DelayedTaskMessages?.Count ?? 0);

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

[IgnoreDataMember]
public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@ public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Activities);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public override void DetermineEffects(EffectTracker effects)
effects.Add(TrackedObjectKey.Sessions);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

public IEnumerable<TaskMessage> LoopBackMessages()
{
if (this.ActivityMessages != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@ public override void DetermineEffects(EffectTracker effects)
// and if offloading, fills in the fields, and adds the outbox to the effects
effects.Add(TrackedObjectKey.Activities);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public IEnumerable<TrackedObjectKey> KeysToPrefetch
}
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}

public override void DetermineEffects(EffectTracker effects)
{
// the last-added effects are processed first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,10 @@ public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Outbox);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,10 @@ public override void DetermineEffects(EffectTracker effects)
effects.Add(TrackedObjectKey.Sessions);
effects.Add(TrackedObjectKey.Timers);
}

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ abstract class PartitionUpdateEvent : PartitionEvent
public OutboxState.Batch OutboxBatch { get; set; }

public abstract void DetermineEffects(EffectTracker effects);

public abstract void ApplyTo(TrackedObject trackedObject, EffectTracker effectTracker);
}
}
8 changes: 6 additions & 2 deletions src/DurableTask.Netherite/LoadMonitor/LoadMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,12 @@ public Task StopAsync()

void TransportAbstraction.ILoadMonitor.Process(LoadMonitorEvent loadMonitorEvent)
{
// dispatch call to matching method
this.Process((dynamic)loadMonitorEvent);
switch (loadMonitorEvent)
{
case LoadInformationReceived loadInformationReceived:
this.Process(loadInformationReceived);
break;
}
}

void SendTransferCommand(uint from, uint to, int num)
Expand Down
12 changes: 6 additions & 6 deletions src/DurableTask.Netherite/PartitionState/ActivitiesState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public bool TryGetNextActivity(out ActivityInfo activityInfo)
}
}

public void Process(BatchProcessed evt, EffectTracker effects)
public override void Process(BatchProcessed evt, EffectTracker effects)
{
// the completed orchestration work item can launch activities
foreach (var msg in evt.ActivityMessages)
Expand Down Expand Up @@ -198,7 +198,7 @@ public void Process(BatchProcessed evt, EffectTracker effects)
}
}

public void Process(ActivityTransferReceived evt, EffectTracker effects)
public override void Process(ActivityTransferReceived evt, EffectTracker effects)
{
// may bring in offloaded activities from other partitions
foreach (var msg in evt.TransferredActivities)
Expand Down Expand Up @@ -239,7 +239,7 @@ public void Process(ActivityTransferReceived evt, EffectTracker effects)
}
}

public void Process(ActivityCompleted evt, EffectTracker effects)
public override void Process(ActivityCompleted evt, EffectTracker effects)
{
// records the result of a finished activity

Expand Down Expand Up @@ -284,7 +284,7 @@ public void Process(ActivityCompleted evt, EffectTracker effects)
}
}

public void Process(TransferCommandReceived evt, EffectTracker effects)
public override void Process(TransferCommandReceived evt, EffectTracker effects)
{
evt.TransferredActivities = new List<(TaskMessage, string)>();

Expand All @@ -310,12 +310,12 @@ public void Process(TransferCommandReceived evt, EffectTracker effects)
effects.Add(TrackedObjectKey.Outbox);
}

public void Process(SolicitationReceived evt, EffectTracker effects)
public override void Process(SolicitationReceived evt, EffectTracker effects)
{
this.LastSolicitation = evt.Timestamp;
}

public void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects)
public override void Process(OffloadDecision offloadDecisionEvent, EffectTracker effects)
{
// check for offload conditions and if satisfied, send batch to remote
if (this.LocalBacklog.Count == 0)
Expand Down
6 changes: 3 additions & 3 deletions src/DurableTask.Netherite/PartitionState/DedupState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ bool IsNotDuplicate(PartitionMessageEvent evt)
}
}

public void Process(ActivityTransferReceived evt, EffectTracker effects)
public override void Process(ActivityTransferReceived evt, EffectTracker effects)
{
// queues activities originating from a remote partition to execute on this partition
if (this.IsNotDuplicate(evt))
Expand All @@ -49,7 +49,7 @@ public void Process(ActivityTransferReceived evt, EffectTracker effects)
}
}

public void Process(RemoteActivityResultReceived evt, EffectTracker effects)
public override void Process(RemoteActivityResultReceived evt, EffectTracker effects)
{
// returns a response to an ongoing orchestration, and reports load data to the offload logic
if (this.IsNotDuplicate(evt))
Expand All @@ -58,7 +58,7 @@ public void Process(RemoteActivityResultReceived evt, EffectTracker effects)
}
}

public void Process(TaskMessagesReceived evt, EffectTracker effects)
public override void Process(TaskMessagesReceived evt, EffectTracker effects)
{
// contains messages to be processed by sessions and/or to be scheduled by timer
if (this.IsNotDuplicate(evt))
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.Netherite/PartitionState/HistoryState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public override string ToString()
return $"History InstanceId={this.InstanceId} ExecutionId={this.ExecutionId} Events={this.History.Count}";
}

public void Process(BatchProcessed evt, EffectTracker effects)
public override void Process(BatchProcessed evt, EffectTracker effects)
{
// can add events to the history, or replace it with a new history

Expand Down
Loading