Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix client responses to wait for persistence #98

Merged
merged 3 commits into from
Dec 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class CreationRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public override TrackedObjectKey Target => TrackedObjectKey.Instance(this.InstanceId);

[IgnoreDataMember]
public CreationResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState

public override bool OnReadComplete(TrackedObject target, Partition partition)
{
// Use this moment of time as the creation timestamp, replacing the original timestamp taken on the client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class DeletionRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;

[IgnoreDataMember]
public DeletionResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public override void OnReadComplete(TrackedObject target, Partition partition)
History = historyState?.History?.ToList(),
};

partition.Send(response);
partition.Send(response); //TODO wait for persistence
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ PurgeBatchIssued makeNewBatchObject()

PurgeBatchIssued batch = makeNewBatchObject();

// TODO : while the request itself is reliable, the client response is not.
// We should probably fix that by using the ClientState to track progress.

async Task ExecuteBatch()
{
await partition.State.Prefetch(batch.KeysToPrefetch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public override void OnReadComplete(TrackedObject target, Partition partition)
OrchestrationState = editedState,
};

partition.Send(response);
partition.Send(response); //TODO wait for persistence
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ class WaitRequestReceived : ClientRequestEventWithPrefetch
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;

[IgnoreDataMember]
public WaitResponseReceived ResponseToSend { get; set; } // used to communicate response to ClientState

public override void ApplyTo(TrackedObject trackedObject, EffectTracker effects)
{
trackedObject.Process(this, effects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public enum PersistFirstStatus { NotRequired, Required, Done };
[IgnoreDataMember]
public override string TracedInstanceId => this.InstanceId;

[IgnoreDataMember]
public List<WaitResponseReceived> ResponsesToSend { get; set; } // used to communicate responses to ClientState

IEnumerable<TrackedObjectKey> IRequiresPrefetch.KeysToPrefetch
{
get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace DurableTask.Netherite
using System.Threading.Tasks;

[DataContract]
class PurgeBatchIssued : PartitionUpdateEvent, IRequiresPrefetch
class PurgeBatchIssued : PartitionUpdateEvent, IRequiresPrefetch, TransportAbstraction.IDurabilityListener
{
[DataMember]
public string QueryEventId { get; set; }
Expand Down Expand Up @@ -71,5 +71,11 @@ public override void DetermineEffects(EffectTracker effects)
effects.Add(TrackedObjectKey.Instance(instanceId));
}
}

public void ConfirmDurable(Event evt)
{
// lets the client know this batch has been persisted
this.WhenProcessed.TrySetResult(null);
}
}
}
48 changes: 18 additions & 30 deletions src/DurableTask.Netherite/PartitionState/InstanceState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,14 @@ public override void Process(CreationRequestReceived creationRequestReceived, Ef
}
}

if (!effects.IsReplaying)
effects.Add(TrackedObjectKey.Outbox);
creationRequestReceived.ResponseToSend = new CreationResponseReceived()
{
// send response to client
effects.Partition.Send(new CreationResponseReceived()
{
ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
});
}
ClientId = creationRequestReceived.ClientId,
RequestId = creationRequestReceived.RequestId,
Succeeded = !filterDuplicate,
ExistingInstanceOrchestrationStatus = this.OrchestrationState?.OrchestrationStatus,
};
}


Expand All @@ -92,13 +89,8 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
// if the orchestration is complete, notify clients that are waiting for it
if (this.Waiters != null && WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
{
if (!effects.IsReplaying)
{
foreach (var request in this.Waiters)
{
this.Partition.Send(request.CreateResponse(this.OrchestrationState));
}
}
// we do not need effects.Add(TrackedObjectKey.Outbox) because it has already been added by SessionsState
evt.ResponsesToSend = this.Waiters.Select(request => request.CreateResponse(this.OrchestrationState)).ToList();

this.Waiters = null;
}
Expand All @@ -108,10 +100,8 @@ public override void Process(WaitRequestReceived evt, EffectTracker effects)
{
if (WaitRequestReceived.SatisfiesWaitCondition(this.OrchestrationState))
{
if (!effects.IsReplaying)
{
this.Partition.Send(evt.CreateResponse(this.OrchestrationState));
}
effects.Add(TrackedObjectKey.Outbox);
evt.ResponseToSend = evt.CreateResponse(this.OrchestrationState);
}
else
{
Expand All @@ -135,7 +125,7 @@ public override void Process(DeletionRequestReceived deletionRequest, EffectTrac
{
int numberInstancesDeleted = 0;

if (this.OrchestrationState != null
if (this.OrchestrationState != null
&& (!deletionRequest.CreatedTime.HasValue || deletionRequest.CreatedTime.Value == this.OrchestrationState.CreatedTime))
{
numberInstancesDeleted++;
Expand All @@ -148,15 +138,13 @@ public override void Process(DeletionRequestReceived deletionRequest, EffectTrac
effects.Add(TrackedObjectKey.Sessions);
}

if (!effects.IsReplaying)
effects.Add(TrackedObjectKey.Outbox);
deletionRequest.ResponseToSend = new DeletionResponseReceived()
{
this.Partition.Send(new DeletionResponseReceived()
{
ClientId = deletionRequest.ClientId,
RequestId = deletionRequest.RequestId,
NumberInstancesDeleted = numberInstancesDeleted,
});
}
ClientId = deletionRequest.ClientId,
RequestId = deletionRequest.RequestId,
NumberInstancesDeleted = numberInstancesDeleted,
};
}

public override void Process(PurgeBatchIssued purgeBatchIssued, EffectTracker effects)
Expand Down
163 changes: 108 additions & 55 deletions src/DurableTask.Netherite/PartitionState/OutboxState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ void Send(Batch batch)
outmessage.OriginPosition = batch.Position;
this.Partition.Send(outmessage);
}
foreach (var outresponse in batch.OutgoingResponses)
{
DurabilityListeners.Register(outresponse, batch);
this.Partition.Send(outresponse);
}
}

[DataContract]
Expand All @@ -102,6 +107,9 @@ public class Batch : TransportAbstraction.IDurabilityListener
[DataMember]
public List<PartitionMessageEvent> OutgoingMessages { get; set; } = new List<PartitionMessageEvent>();

[DataMember]
public List<ClientEvent> OutgoingResponses { get; set; } = new List<ClientEvent>();

[IgnoreDataMember]
public long Position { get; set; }

Expand All @@ -119,21 +127,22 @@ public class Batch : TransportAbstraction.IDurabilityListener

public void ConfirmDurable(Event evt)
{
var partitionMessageEvent = (PartitionMessageEvent)evt;

var workItemTraceHelper = this.Partition.WorkItemTraceHelper;
if (workItemTraceHelper.TraceTaskMessages)
if (evt is PartitionMessageEvent partitionMessageEvent)
{
double? persistenceDelayMs = this.ProcessedTimestamp.HasValue ? (this.ReadyToSendTimestamp - this.ProcessedTimestamp.Value) : null;
double sendDelayMs = this.Partition.CurrentTimeMs - this.ReadyToSendTimestamp;

foreach (var entry in partitionMessageEvent.TracedTaskMessages)
var workItemTraceHelper = this.Partition.WorkItemTraceHelper;
if (workItemTraceHelper.TraceTaskMessages)
{
workItemTraceHelper.TraceTaskMessageSent(this.Partition.PartitionId, entry.message, entry.workItemId, persistenceDelayMs, sendDelayMs);
double? persistenceDelayMs = this.ProcessedTimestamp.HasValue ? (this.ReadyToSendTimestamp - this.ProcessedTimestamp.Value) : null;
double sendDelayMs = this.Partition.CurrentTimeMs - this.ReadyToSendTimestamp;

foreach (var entry in partitionMessageEvent.TracedTaskMessages)
{
workItemTraceHelper.TraceTaskMessageSent(this.Partition.PartitionId, entry.message, entry.workItemId, persistenceDelayMs, sendDelayMs);
}
}
}

if (++this.numAcks == this.OutgoingMessages.Count)
if (++this.numAcks == this.OutgoingMessages.Count + this.OutgoingResponses.Count)
{
this.Partition.SubmitEvent(new SendConfirmed()
{
Expand Down Expand Up @@ -171,73 +180,93 @@ public override void Process(BatchProcessed evt, EffectTracker effects)
var batch = new Batch();
int subPosition = 0;

IEnumerable<(uint,TaskMessage)> Messages()
bool sendResponses = evt.ResponsesToSend != null;
bool sendMessages = evt.RemoteMessages?.Count > 0;

if (! (sendResponses || sendMessages))
{
foreach (var message in evt.RemoteMessages)
{
var instanceId = message.OrchestrationInstance.InstanceId;
var destination = this.Partition.PartitionFunction(instanceId);
yield return (destination, message);
}
return;
}

void AddMessage(TaskMessagesReceived outmessage, TaskMessage message)
if (sendResponses)
{
if (Entities.IsDelayedEntityMessage(message, out _))
foreach(var r in evt.ResponsesToSend)
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
batch.OutgoingResponses.Add(r);
}
else if (message.Event is ExecutionStartedEvent executionStartedEvent && executionStartedEvent.ScheduledStartTime.HasValue)
}

if (sendMessages)
{
IEnumerable<(uint, TaskMessage)> Messages()
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
foreach (var message in evt.RemoteMessages)
{
var instanceId = message.OrchestrationInstance.InstanceId;
var destination = this.Partition.PartitionFunction(instanceId);
yield return (destination, message);
}
}
else

void AddMessage(TaskMessagesReceived outmessage, TaskMessage message)
{
(outmessage.TaskMessages ??= new List<TaskMessage>()).Add(message);
if (Entities.IsDelayedEntityMessage(message, out _))
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
}
else if (message.Event is ExecutionStartedEvent executionStartedEvent && executionStartedEvent.ScheduledStartTime.HasValue)
{
(outmessage.DelayedTaskMessages ??= new List<TaskMessage>()).Add(message);
}
else
{
(outmessage.TaskMessages ??= new List<TaskMessage>()).Add(message);
}
outmessage.SubPosition = ++subPosition;
}
outmessage.SubPosition = ++subPosition;
}

if (evt.PackPartitionTaskMessages > 1)
{
// pack multiple TaskMessages for the same destination into a single TaskMessagesReceived event
var sorted = new Dictionary<uint, TaskMessagesReceived>();
foreach ((uint destination, TaskMessage message) in Messages())
if (evt.PackPartitionTaskMessages > 1)
{
if (!sorted.TryGetValue(destination, out var outmessage))
// pack multiple TaskMessages for the same destination into a single TaskMessagesReceived event
var sorted = new Dictionary<uint, TaskMessagesReceived>();
foreach ((uint destination, TaskMessage message) in Messages())
{
sorted[destination] = outmessage = new TaskMessagesReceived()
if (!sorted.TryGetValue(destination, out var outmessage))
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
}
sorted[destination] = outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
}

AddMessage(outmessage, message);
AddMessage(outmessage, message);

// send the message if we have reached the pack limit
if (outmessage.NumberMessages >= evt.PackPartitionTaskMessages)
{
batch.OutgoingMessages.Add(outmessage);
sorted.Remove(destination);
// send the message if we have reached the pack limit
if (outmessage.NumberMessages >= evt.PackPartitionTaskMessages)
{
batch.OutgoingMessages.Add(outmessage);
sorted.Remove(destination);
}
}
batch.OutgoingMessages.AddRange(sorted.Values);
}
batch.OutgoingMessages.AddRange(sorted.Values);
}
else
{
// send each TaskMessage as a separate TaskMessagesReceived event
foreach ((uint destination, TaskMessage message) in Messages())
else
{
var outmessage = new TaskMessagesReceived()
// send each TaskMessage as a separate TaskMessagesReceived event
foreach ((uint destination, TaskMessage message) in Messages())
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
AddMessage(outmessage, message);
batch.OutgoingMessages.Add(outmessage);
var outmessage = new TaskMessagesReceived()
{
PartitionId = destination,
WorkItemId = evt.WorkItemId,
};
AddMessage(outmessage, message);
batch.OutgoingMessages.Add(outmessage);
}
}
}

this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}

Expand Down Expand Up @@ -270,5 +299,29 @@ public override void Process(TransferCommandReceived evt, EffectTracker effects)

this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}

public override void Process(WaitRequestReceived evt, EffectTracker effects)
{
this.Partition.Assert(evt.ResponseToSend != null);
var batch = new Batch();
batch.OutgoingResponses.Add(evt.ResponseToSend);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}

public override void Process(CreationRequestReceived evt, EffectTracker effects)
{
this.Partition.Assert(evt.ResponseToSend != null);
var batch = new Batch();
batch.OutgoingResponses.Add(evt.ResponseToSend);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}

public override void Process(DeletionRequestReceived evt, EffectTracker effects)
{
this.Partition.Assert(evt.ResponseToSend != null);
var batch = new Batch();
batch.OutgoingResponses.Add(evt.ResponseToSend);
this.SendBatchOnceEventIsPersisted(evt, effects, batch);
}
}
}
Loading