Skip to content

Commit

Permalink
change tracing of task messages to provide more consistent results (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastianburckhardt authored Jul 28, 2021
1 parent 71993f2 commit 0799410
Show file tree
Hide file tree
Showing 19 changed files with 174 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,6 @@ class ClientTaskMessagesReceived : ClientRequestEvent
[DataMember]
public TaskMessage[] TaskMessages { get; set; }

[IgnoreDataMember]
public override IEnumerable<(TaskMessage, string)> TracedTaskMessages
{
get
{
foreach (var message in this.TaskMessages)
{
yield return (message, this.WorkItemId);
}
}
}


public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Sessions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public string InstanceId => this.ExecutionStartedEvent.OrchestrationInstance.InstanceId;

[IgnoreDataMember]
public override IEnumerable<(TaskMessage,string)> TracedTaskMessages
{
get
{
yield return (this.TaskMessage, this.WorkItemId);
}
}


[IgnoreDataMember]
public override TrackedObjectKey Target => TrackedObjectKey.Instance(this.InstanceId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace DurableTask.Netherite
class ActivityOffloadReceived : PartitionMessageEvent
{
[DataMember]
public List<(TaskMessage,string)> OffloadedActivities { get; set; }
public List<(TaskMessage message, string workItemId)> OffloadedActivities { get; set; }

[DataMember]
public DateTime Timestamp { get; set; }
Expand All @@ -21,6 +21,6 @@ class ActivityOffloadReceived : PartitionMessageEvent
public override EventId EventId => EventId.MakePartitionToPartitionEventId(OffloadDecision.GetWorkItemId(this.OriginPartition, this.Timestamp), this.PartitionId);

[IgnoreDataMember]
public override IEnumerable<(TaskMessage,string)> TracedTaskMessages => this.OffloadedActivities;
public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages => this.OffloadedActivities;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

namespace DurableTask.Netherite
{
using System.Collections.Generic;
using System.Runtime.Serialization;
using DurableTask.Core;

[DataContract]
abstract class PartitionMessageEvent : PartitionUpdateEvent
Expand All @@ -17,6 +19,9 @@ abstract class PartitionMessageEvent : PartitionUpdateEvent
[IgnoreDataMember]
public virtual (long, int) DedupPosition => (this.OriginPosition, 0); // overridden if a subposition is needed

[IgnoreDataMember]
public abstract IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages { get; }

public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Dedup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class RemoteActivityResultReceived : PartitionMessageEvent
public string WorkItemId => ActivitiesState.GetWorkItemId(this.OriginPartition, this.ActivityId);

[IgnoreDataMember]
public override IEnumerable<(TaskMessage,string)> TracedTaskMessages
public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages
{
get
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@ class TaskMessagesReceived : PartitionMessageEvent
[IgnoreDataMember]
public int NumberMessages => (this.TaskMessages?.Count ?? 0) + (this.DelayedTaskMessages?.Count ?? 0);

[IgnoreDataMember]
public override IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages
{
get
{
if (this.TaskMessages?.Count > 0)
{
foreach (var taskMessage in this.TaskMessages)
{
yield return (taskMessage, this.WorkItemId);
}
}
if (this.DelayedTaskMessages?.Count > 0)
{
foreach (var taskMessage in this.DelayedTaskMessages)
{
yield return (taskMessage, this.WorkItemId);
}
}
}
}

protected override void ExtraTraceInformation(StringBuilder s)
{
var tCount = this.TaskMessages?.Count ?? 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,6 @@ class ActivityCompleted : PartitionUpdateEvent
[IgnoreDataMember]
public string WorkItemId => ActivitiesState.GetWorkItemId(this.PartitionId, this.ActivityId);

[IgnoreDataMember]
public override IEnumerable<(TaskMessage,string)> TracedTaskMessages
{
get
{
yield return (this.Response, this.WorkItemId);
}
}

public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Activities);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,43 +63,6 @@ public enum PersistFirstStatus { NotRequired, Required, Done };
[IgnoreDataMember]
public override EventId EventId => EventId.MakePartitionInternalEventId(this.PersistFirst == PersistFirstStatus.Done ? this.WorkItemId + "P" : this.WorkItemId);

[IgnoreDataMember]
public override IEnumerable<(TaskMessage, string)> TracedTaskMessages
{
get
{
string workItemId = SessionsState.GetWorkItemId(this.PartitionId, this.SessionId, this.BatchStartPosition);
if (this.ActivityMessages != null)
{
foreach (TaskMessage a in this.ActivityMessages)
{
yield return (a, workItemId);
}
}
if (this.TimerMessages != null)
{
foreach (TaskMessage t in this.TimerMessages)
{
yield return (t, workItemId);
}
}
if (this.LocalMessages != null)
{
foreach (TaskMessage l in this.LocalMessages)
{
yield return (l, workItemId);
}
}
if (this.RemoteMessages != null)
{
foreach (TaskMessage r in this.RemoteMessages)
{
yield return (r, workItemId);
}
}
}
}

IEnumerable<TrackedObjectKey> IRequiresPrefetch.KeysToPrefetch
{
get
Expand All @@ -115,6 +78,31 @@ public override void DetermineEffects(EffectTracker effects)
effects.Add(TrackedObjectKey.Sessions);
}

public IEnumerable<TaskMessage> LoopBackMessages()
{
if (this.ActivityMessages != null)
{
foreach (var message in this.ActivityMessages)
{
yield return message;
}
}
if (this.LocalMessages != null)
{
foreach (var message in this.LocalMessages)
{
yield return message;
}
}
if (this.TimerMessages != null)
{
foreach (var message in this.TimerMessages)
{
yield return message;
}
}
}

protected override void ExtraTraceInformation(StringBuilder s)
{
base.ExtraTraceInformation(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,6 @@ class TimerFired : PartitionUpdateEvent
[IgnoreDataMember]
public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId);

[IgnoreDataMember]
public override IEnumerable<(TaskMessage, string)> TracedTaskMessages
{
get
{
yield return (this.TaskMessage, this.OriginWorkItemId);
}
}

public override void DetermineEffects(EffectTracker effects)
{
effects.Add(TrackedObjectKey.Sessions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ abstract class PartitionEvent : Event
[IgnoreDataMember]
public double ReceivedTimestamp { get; set; }

[IgnoreDataMember]
public double ReadyToSendTimestamp { get; set; }

[IgnoreDataMember]
public double SentTimestamp { get; set; }

[IgnoreDataMember]
public double IssuedTimestamp { get; set; }

Expand All @@ -46,9 +40,7 @@ public PartitionEvent Clone()
evt.Serialized = default;
evt.NextInputQueuePosition = 0;

// clear the timestamps that will be overwritten
evt.ReadyToSendTimestamp = 0;
evt.SentTimestamp = 0;
// clear the timestamp
evt.IssuedTimestamp = 0;

return evt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,6 @@ abstract class PartitionUpdateEvent : PartitionEvent
[IgnoreDataMember]
public OutboxState.Batch OutboxBatch { get; set; }

[IgnoreDataMember]
public virtual IEnumerable<(TaskMessage message, string workItemId)> TracedTaskMessages => PartitionUpdateEvent.noTaskMessages;

static readonly IEnumerable<(TaskMessage, string)> noTaskMessages = Enumerable.Empty<(TaskMessage, string)>();

public abstract void DetermineEffects(EffectTracker effects);
}
}
25 changes: 19 additions & 6 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,22 @@ void TransportAbstraction.IDurabilityListener.ConfirmDurable(Event evt)
// we create a separate trace message for each destination partition
foreach (var group in request.TaskMessages.GroupBy((message) => message.OrchestrationInstance.InstanceId))
{
double delay = this.client.workItemStopwatch.Elapsed.TotalMilliseconds - this.startTime;
string workItemId = request.WorkItemId;

this.client.workItemTraceHelper.TraceWorkItemCompleted(
request.PartitionId,
WorkItemTraceHelper.WorkItemType.Client,
request.WorkItemId,
workItemId,
group.Key,
WorkItemTraceHelper.ClientStatus.Send,
this.client.workItemStopwatch.Elapsed.TotalMilliseconds - this.startTime,
WorkItemTraceHelper.FormatMessageIdList(group.Select((message) => (message, request.WorkItemId))));
delay,
request.TaskMessages.Length);

foreach (var message in request.TaskMessages)
{
this.client.workItemTraceHelper.TraceTaskMessageSent(request.PartitionId, message, workItemId, null, delay);
}
}

// this request is considered completed at the time of durability
Expand All @@ -202,14 +210,19 @@ void TransportAbstraction.IDurabilityListener.ConfirmDurable(Event evt)
}
else if (evt is CreationRequestReceived creationRequestReceived)
{
double delay = this.client.workItemStopwatch.Elapsed.TotalMilliseconds - this.startTime;
string workItemId = creationRequestReceived.WorkItemId;

this.client.workItemTraceHelper.TraceWorkItemCompleted(
creationRequestReceived.PartitionId,
WorkItemTraceHelper.WorkItemType.Client,
creationRequestReceived.WorkItemId,
workItemId,
creationRequestReceived.InstanceId,
WorkItemTraceHelper.ClientStatus.Create,
this.client.workItemStopwatch.Elapsed.TotalMilliseconds - this.startTime,
WorkItemTraceHelper.FormatMessageIdList(creationRequestReceived.TracedTaskMessages));
delay,
1);

this.client.workItemTraceHelper.TraceTaskMessageSent(creationRequestReceived.PartitionId, creationRequestReceived.TaskMessage, workItemId, null, delay);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ async Task<TaskOrchestrationWorkItem> IOrchestrationService.LockNextTaskOrchestr
Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(
TaskOrchestrationWorkItem workItem,
OrchestrationRuntimeState newOrchestrationRuntimeState,
IList<TaskMessage> outboundMessages,
IList<TaskMessage> activityMessages,
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
Expand All @@ -625,9 +625,9 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(
// we assign sequence numbers to all outgoing messages, to help us track them using unique message ids
long sequenceNumber = 0;

if (outboundMessages != null)
if (activityMessages != null)
{
foreach(TaskMessage taskMessage in outboundMessages)
foreach(TaskMessage taskMessage in activityMessages)
{
taskMessage.SequenceNumber = sequenceNumber++;
}
Expand Down Expand Up @@ -698,7 +698,7 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(
PackPartitionTaskMessages = partition.Settings.PackPartitionTaskMessages,
PersistFirst = partition.Settings.PersistStepsFirst ? BatchProcessed.PersistFirstStatus.Required : BatchProcessed.PersistFirstStatus.NotRequired,
State = state,
ActivityMessages = (List<TaskMessage>)outboundMessages,
ActivityMessages = (List<TaskMessage>)activityMessages,
LocalMessages = localMessages,
RemoteMessages = remoteMessages,
TimerMessages = (List<TaskMessage>)timerMessages,
Expand All @@ -712,10 +712,18 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(
workItem.InstanceId,
batchProcessedEvent.State.OrchestrationStatus,
latencyMs,
WorkItemTraceHelper.FormatMessageIdList(batchProcessedEvent.TracedTaskMessages));

partition.SubmitInternalEvent(batchProcessedEvent);

sequenceNumber);

partition.SubmitInternalEvent(batchProcessedEvent);

if (this.workItemTraceHelper.TraceTaskMessages)
{
foreach (var taskMessage in batchProcessedEvent.LoopBackMessages())
{
this.workItemTraceHelper.TraceTaskMessageSent(partition.PartitionId, taskMessage, messageBatch.WorkItemId, null, null);
}
}

return Task.CompletedTask;
}

Expand Down Expand Up @@ -838,11 +846,12 @@ Task IOrchestrationService.CompleteTaskActivityWorkItemAsync(TaskActivityWorkIte
activityWorkItem.TaskMessage.OrchestrationInstance.InstanceId,
WorkItemTraceHelper.ActivityStatus.Completed,
latencyMs,
WorkItemTraceHelper.FormatMessageId(responseMessage, activityWorkItem.WorkItemId));
1);

try
{
partition.SubmitInternalEvent(activityCompletedEvent);
this.workItemTraceHelper.TraceTaskMessageSent(partition.PartitionId, activityCompletedEvent.Response, activityWorkItem.WorkItemId, null, null);
}
catch (OperationCanceledException e)
{
Expand Down
Loading

0 comments on commit 0799410

Please sign in to comment.