Skip to content

Commit

Permalink
remove OrchestrationState from the persisted BatchProcessed to save s…
Browse files Browse the repository at this point in the history
…pace in commit log (#94)
  • Loading branch information
sebastianburckhardt authored Dec 23, 2021
1 parent 266c31d commit beaeeb7
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,21 @@ class BatchProcessed : PartitionUpdateEvent, IRequiresPrefetch
public List<HistoryEvent> NewEvents { get; set; }

[DataMember]
public string CustomStatus { get; set; }

[DataMember]
public bool CustomStatusUpdated { get; set; }

[DataMember]
public bool NotExecutable { get; set; }

[DataMember]
public string ExecutionId { get; set; }

[DataMember]
public OrchestrationStatus OrchestrationStatus { get; set; }

[IgnoreDataMember]
public OrchestrationState State { get; set; }

[DataMember]
Expand Down Expand Up @@ -118,11 +133,8 @@ protected override void ExtraTraceInformation(StringBuilder s)
{
base.ExtraTraceInformation(s);

if (this.State != null)
{
s.Append(' ');
s.Append(this.State.OrchestrationStatus);
}
s.Append(' ');
s.Append(this.OrchestrationStatus);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -726,20 +726,28 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(
WorkItemForReuse = cacheWorkItemForReuse ? orchestrationWorkItem : null,
PackPartitionTaskMessages = partition.Settings.PackPartitionTaskMessages,
PersistFirst = partition.Settings.PersistStepsFirst ? BatchProcessed.PersistFirstStatus.Required : BatchProcessed.PersistFirstStatus.NotRequired,
OrchestrationStatus = state.OrchestrationStatus,
ExecutionId = state.OrchestrationInstance.ExecutionId,
State = state,
ActivityMessages = (List<TaskMessage>)activityMessages,
LocalMessages = localMessages,
RemoteMessages = remoteMessages,
TimerMessages = (List<TaskMessage>)timerMessages,
Timestamp = DateTime.UtcNow,
Timestamp = state.LastUpdatedTime,
};

if (state.Status != orchestrationWorkItem.PreStatus)
{
batchProcessedEvent.CustomStatusUpdated = true;
batchProcessedEvent.CustomStatus = state.Status;
}

this.workItemTraceHelper.TraceWorkItemCompleted(
partition.PartitionId,
WorkItemTraceHelper.WorkItemType.Orchestration,
messageBatch.WorkItemId,
workItem.InstanceId,
batchProcessedEvent.State.OrchestrationStatus,
batchProcessedEvent.OrchestrationStatus,
latencyMs,
sequenceNumber);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,8 @@ public override void OnReadComplete(TrackedObject s, Partition partition)
InstanceId = this.InstanceId,
BatchStartPosition = this.BatchStartPosition,
BatchLength = this.BatchLength,
NewEvents = null,
State = null,
WorkItemForReuse = null,
ActivityMessages = null,
LocalMessages = null,
RemoteMessages = null,
TimerMessages = null,
Timestamp = DateTime.UtcNow,
NotExecutable = true,
});
}
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public enum ExecutionType { Fresh, ContinueFromHistory, ContinueFromCursor };

public double StartedAt { get; set; }

public string PreStatus { get; set; } // we store this prior to executing the work item so we can diff the status afterwards to detect changes

public OrchestrationWorkItem(Partition partition, OrchestrationMessageBatch messageBatch, List<HistoryEvent> previousHistory = null)
{
this.Partition = partition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ public void EnqueueOrchestrationWorkItem(OrchestrationWorkItem item)
item.HistorySize,
WorkItemTraceHelper.FormatMessageIdList(item.MessageBatch.TracedMessages));

item.PreStatus = item.OrchestrationRuntimeState.Status;
this.OrchestrationWorkItemQueue.Add(item);
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/DurableTask.Netherite/PartitionState/HistoryState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
// can add events to the history, or replace it with a new history

// update the stored history
if (this.History == null || evt.State.OrchestrationInstance.ExecutionId != this.ExecutionId)
if (this.History == null || evt.ExecutionId != this.ExecutionId)
{
this.History = new List<HistoryEvent>();
this.Episode = 0;
this.ExecutionId = evt.State.OrchestrationInstance.ExecutionId;
this.ExecutionId = evt.ExecutionId;
}

this.Partition.Assert(!string.IsNullOrEmpty(this.InstanceId) || string.IsNullOrEmpty(this.ExecutionId));
Expand All @@ -75,9 +75,9 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
{
this.Partition.EventTraceHelper.TraceInstanceUpdate(
evt.EventIdString,
evt.State.OrchestrationInstance.InstanceId,
evt.State.OrchestrationInstance.ExecutionId,
evt.State.OrchestrationStatus,
evt.InstanceId,
evt.ExecutionId,
evt.OrchestrationStatus,
this.History.Count,
evt.NewEvents,
this.Episode);
Expand All @@ -86,9 +86,9 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
this.CachedOrchestrationWorkItem = evt.WorkItemForReuse;

if (this.CachedOrchestrationWorkItem != null
&& this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != evt.State.OrchestrationInstance.ExecutionId)
&& this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != evt.ExecutionId)
{
effects.Partition.EventTraceHelper.TraceEventProcessingWarning($"Dropping bad workitem cache instance={this.InstanceId} expected_executionid={evt.State.OrchestrationInstance.ExecutionId} actual_executionid={this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}");
effects.Partition.EventTraceHelper.TraceEventProcessingWarning($"Dropping bad workitem cache instance={this.InstanceId} expected_executionid={evt.ExecutionId} actual_executionid={this.CachedOrchestrationWorkItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}");
this.CachedOrchestrationWorkItem = null;
}
}
Expand Down
81 changes: 78 additions & 3 deletions src/DurableTask.Netherite/PartitionState/InstanceState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public override void Process(CreationRequestReceived creationRequestReceived, Ef
Input = ee.Input,
Tags = ee.Tags,
CreatedTime = ee.Timestamp,
LastUpdatedTime = DateTime.UtcNow,
LastUpdatedTime = ee.Timestamp,
CompletedTime = Core.Common.DateTimeUtils.MinDateTime,
ScheduledStartTime = ee.ScheduledStartTime
};
Expand Down Expand Up @@ -93,9 +93,52 @@ void CullWaiters(DateTime threshold)

public override void Process(BatchProcessed evt, EffectTracker effects)
{
// update the state of an orchestration
this.OrchestrationState = evt.State;
// update the current orchestration state based on the new events
this.OrchestrationState = UpdateOrchestrationState(this.OrchestrationState, evt.NewEvents);

if (evt.CustomStatusUpdated)
{
this.OrchestrationState.Status = evt.CustomStatus;
}

this.OrchestrationState.LastUpdatedTime = evt.Timestamp;

if (evt.State != null)
{
this.Partition.Assert(
(
evt.State.Version,
evt.State.Status,
evt.State.Output,
evt.State.Name,
evt.State.Input,
evt.State.OrchestrationInstance.InstanceId,
evt.State.OrchestrationInstance.ExecutionId,
evt.State.CompletedTime,
evt.State.OrchestrationStatus,
evt.State.LastUpdatedTime,
evt.State.CreatedTime,
evt.State.ScheduledStartTime,
evt.State.OrchestrationInstance.ExecutionId
)
==
(
this.OrchestrationState.Version,
this.OrchestrationState.Status,
this.OrchestrationState.Output,
this.OrchestrationState.Name,
this.OrchestrationState.Input,
this.OrchestrationState.OrchestrationInstance.InstanceId,
this.OrchestrationState.OrchestrationInstance.ExecutionId,
this.OrchestrationState.CompletedTime,
this.OrchestrationState.OrchestrationStatus,
this.OrchestrationState.LastUpdatedTime,
this.OrchestrationState.CreatedTime,
this.OrchestrationState.ScheduledStartTime,
evt.ExecutionId
));
}

// if the orchestration is complete, notify clients that are waiting for it
if (this.Waiters != null)
{
Expand All @@ -113,6 +156,38 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
}
}

static OrchestrationState UpdateOrchestrationState(OrchestrationState orchestrationState, List<HistoryEvent> events)
{
if (orchestrationState == null)
{
orchestrationState = new OrchestrationState();
}
foreach (var evt in events)
{
if (evt is ExecutionStartedEvent executionStartedEvent)
{
orchestrationState.OrchestrationInstance = executionStartedEvent.OrchestrationInstance;
orchestrationState.CreatedTime = executionStartedEvent.Timestamp;
orchestrationState.Input = executionStartedEvent.Input;
orchestrationState.Name = executionStartedEvent.Name;
orchestrationState.Version = executionStartedEvent.Version;
orchestrationState.Tags = executionStartedEvent.Tags;
orchestrationState.ParentInstance = executionStartedEvent.ParentInstance;
orchestrationState.ScheduledStartTime = executionStartedEvent.ScheduledStartTime;
orchestrationState.CompletedTime = Core.Common.Utils.DateTimeSafeMaxValue;
orchestrationState.Output = null;
orchestrationState.OrchestrationStatus = OrchestrationStatus.Running;
}
else if (evt is ExecutionCompletedEvent executionCompletedEvent)
{
orchestrationState.CompletedTime = executionCompletedEvent.Timestamp;
orchestrationState.Output = executionCompletedEvent.Result;
orchestrationState.OrchestrationStatus = executionCompletedEvent.OrchestrationStatus;
}
}
return orchestrationState;
}

public override void Process(WaitRequestReceived evt, EffectTracker effects)
{
if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
Expand Down
39 changes: 20 additions & 19 deletions src/DurableTask.Netherite/PartitionState/SessionsState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -314,32 +314,33 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
return;
};

if (evt.ActivityMessages?.Count > 0)
if (!evt.NotExecutable)
{
effects.Add(TrackedObjectKey.Activities);
}

if (evt.TimerMessages?.Count > 0)
{
effects.Add(TrackedObjectKey.Timers);
}
if (evt.ActivityMessages?.Count > 0)
{
effects.Add(TrackedObjectKey.Activities);
}

if (evt.RemoteMessages?.Count > 0 || WaitRequestReceived.SatisfiesWaitCondition(evt.State))
{
effects.Add(TrackedObjectKey.Outbox);
}
if (evt.TimerMessages?.Count > 0)
{
effects.Add(TrackedObjectKey.Timers);
}

// deliver orchestrator messages destined for this partition directly to the relevant session(s)
if (evt.LocalMessages?.Count > 0)
{
foreach (var group in evt.LocalMessages.GroupBy(tm => tm.OrchestrationInstance.InstanceId))
if (evt.RemoteMessages?.Count > 0 || WaitRequestReceived.SatisfiesWaitCondition(evt.State))
{
this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying);
effects.Add(TrackedObjectKey.Outbox);
}

// deliver orchestrator messages destined for this partition directly to the relevant session(s)
if (evt.LocalMessages?.Count > 0)
{
foreach (var group in evt.LocalMessages.GroupBy(tm => tm.OrchestrationInstance.InstanceId))
{
this.AddMessagesToSession(group.Key, evt.WorkItemId, group, effects.IsReplaying);
}
}
}

if (evt.State != null)
{
effects.Add(TrackedObjectKey.Instance(evt.InstanceId));
effects.Add(TrackedObjectKey.History(evt.InstanceId));
}
Expand Down

0 comments on commit beaeeb7

Please sign in to comment.