Skip to content

Commit

Permalink
Add fault injection test, fix bugs (#101)
Browse files Browse the repository at this point in the history
* implement fault injection tests

* fix tests

* update fault injection tests

* revise MemoryTransport failure handling

* fix serialization of RecoveryCompleted

* refactor concurrent tests into separate file

* revise tracing and timeouts in tests

* fix fault injection tests

* change timeout behavior of wait for orchestration to bring it in line with other backends

* fix bug that causes test to hang when recovering

* revise the recovery mechanism in MemoryTransport

* connect Partition.Assert to TestHooks so it can fail unit tests

* fix missing check for replaying when processing RecoveryCompleted event

* revise fault injector: add timeout to startup and do not inject lease renewals

* annotate tests with whether they support any transport

* adjust timeouts and update pipeline

* add labels to all the assertions and hook them up so they trip unit tests

* remove non-fixture query test from "AnyTransport" test category
  • Loading branch information
sebastianburckhardt authored Jan 6, 2022
1 parent e8f4ab7 commit 0499ba5
Show file tree
Hide file tree
Showing 43 changed files with 1,158 additions and 215 deletions.
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pool:
variables:
solution: 'DurableTask.Netherite.sln'
buildPlatform: 'Any CPU'
buildConfiguration: 'Release'
buildConfiguration: 'Debug'

steps:
- task: NuGetToolInstaller@1
Expand Down Expand Up @@ -51,6 +51,6 @@ steps:
inputs:
command: 'test'
projects: '**/DurableTask.Netherite*Tests.csproj'
arguments: '--configuration $(buildConfiguration) --verbosity normal'
arguments: '--configuration $(buildConfiguration) --verbosity normal --filter "AnyTransport=true"'
testRunTitle: 'Netherite Unit Tests on storageAccount/EventHubs'

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent)

protected abstract void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning);

public abstract void Assert(bool condition);
public abstract void Assert(bool condition, string message);

public abstract uint PartitionId { get; }

Expand Down Expand Up @@ -103,7 +103,7 @@ public async Task ProcessUpdate(PartitionUpdateEvent updateEvent)
{
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying);

this.Assert(updateEvent != null);
this.Assert(updateEvent != null, "null event in ProcessUpdate");

this.SetCurrentUpdateEvent(updateEvent);

Expand Down Expand Up @@ -165,7 +165,7 @@ async ValueTask ProcessRecursively()
public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // read events are never part of the replay
this.Assert(!this.IsReplaying, "read events are not part of the replay");
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, readEvent.EventIdString))
Expand Down Expand Up @@ -223,7 +223,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // query events are never part of the replay
this.Assert(!this.IsReplaying, "query events are never part of the replay");
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, queryEvent.EventIdString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public interface IPartitionErrorHandler
/// </summary>
bool IsTerminated { get; }

/// <summary>
/// A boolean indicating that normal termination has been initiated as part of a shutdown.
/// </summary>
bool NormalTermination { get; }

/// <summary>
/// Error handling for the partition.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public override EventTraceHelper EventDetailTracer
protected override void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning)
=> this.Partition.ErrorHandler.HandleError(where, message, e, terminatePartition, reportAsWarning);

public override void Assert(bool condition)
=> this.Partition.Assert(condition);
public override void Assert(bool condition, string message)
=> this.Partition.Assert(condition, message);

public override uint PartitionId
=> this.Partition.PartitionId;
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.Netherite/Events/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public static IEnumerable<Type> KnownTypes()
yield return typeof(BatchProcessed);
yield return typeof(OffloadDecision);
yield return typeof(PurgeBatchIssued);
yield return typeof(RecoveryCompleted);
yield return typeof(SendConfirmed);
yield return typeof(TimerFired);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ protected override void ExtraTraceInformation(StringBuilder s)
s.Append(this.MaxSessionDequeueCount);
}

public override void OnSubmit(Partition partition)
{
partition.EventTraceHelper.TraceEventProcessingDetail($"Submitted {this}");
}

[IgnoreDataMember]
public override EventId EventId => EventId.MakePartitionInternalEventId($"Recovered-{this.WorkerId}-{this.Timestamp:o}");

Expand Down
11 changes: 9 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,15 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
TimeoutUtc = this.GetTimeoutBucket(timeout),
};

var response = await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
return ((WaitResponseReceived)response)?.OrchestrationState;
try
{
var response = await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
return ((WaitResponseReceived)response)?.OrchestrationState;
}
catch(TimeoutException)
{
return null; // to match semantics of other backends, wait returns null when timing out
}
}

public async Task<OrchestrationState> GetOrchestrationStateAsync(uint partitionId, string instanceId, bool fetchInput = true, bool fetchOutput = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,6 @@ async Task<TaskOrchestrationWorkItem> IOrchestrationService.LockNextTaskOrchestr

if (nextOrchestrationWorkItem != null)
{
if (nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence != null)
{
await nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence.Task;
}

nextOrchestrationWorkItem.MessageBatch.WaitingSince = null;

this.workItemTraceHelper.TraceWorkItemStarted(
Expand Down Expand Up @@ -653,8 +648,8 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(

// all continue as new requests are processed immediately (DurableTask.Core always uses "fast" continue-as-new)
// so by the time we get here, it is not a continue as new
partition.Assert(continuedAsNewMessage == null);
partition.Assert(workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew);
partition.Assert(continuedAsNewMessage == null, "unexpected continueAsNew message");
partition.Assert(workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew, "unexpected continueAsNew status");

// we assign sequence numbers to all outgoing messages, to help us track them using unique message ids
long sequenceNumber = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurab
public string WorkItemId;
public double? WaitingSince; // measures time waiting to execute

// enforces that the activity cannot start executing before the issuing event is persisted
public readonly TaskCompletionSource<object> WaitForDequeueCountPersistence;
readonly PartitionUpdateEvent waitForDequeueCountPersistence;
OrchestrationWorkItem workItem;

public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId);

Expand All @@ -46,23 +46,17 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio

session.CurrentBatch = this;

if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem)
if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem || filingEvent is RecoveryCompleted)
{
this.WaitForDequeueCountPersistence = new TaskCompletionSource<object>();
DurabilityListeners.Register(filingEvent, this);
this.waitForDequeueCountPersistence = filingEvent;
}

partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}");

// continue when we have the history state loaded, which gives us the latest state and/or cursor
partition.SubmitEvent(this);
}

public void ConfirmDurable(Event evt)
{
this.WaitForDequeueCountPersistence.TrySetResult(null);
}

public IEnumerable<(TaskMessage, string)> TracedMessages
{
get
Expand All @@ -87,53 +81,52 @@ public void ConfirmDurable(Event evt)
public override void OnReadComplete(TrackedObject s, Partition partition)
{
var historyState = (HistoryState)s;
OrchestrationWorkItem workItem;


if (this.ForceNewExecution || historyState == null)
{
// we either have no previous instance, or want to replace the previous instance
workItem = new OrchestrationWorkItem(partition, this, previousHistory: null);
workItem.Type = OrchestrationWorkItem.ExecutionType.Fresh;
workItem.HistorySize = 0;
this.workItem = new OrchestrationWorkItem(partition, this, previousHistory: null);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.Fresh;
this.workItem.HistorySize = 0;
}
else if (historyState.CachedOrchestrationWorkItem != null)
{
// we have a cached cursor and can resume where we left off
workItem = historyState.CachedOrchestrationWorkItem;
workItem.SetNextMessageBatch(this);
workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromCursor;
workItem.HistorySize = workItem.OrchestrationRuntimeState?.Events?.Count ?? 0;
this.workItem = historyState.CachedOrchestrationWorkItem;
this.workItem.SetNextMessageBatch(this);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromCursor;
this.workItem.HistorySize = this.workItem.OrchestrationRuntimeState?.Events?.Count ?? 0;

// sanity check: it appears cursor is sometimes corrupted, in that case, construct fresh
// TODO investigate reasons and fix root cause
if (workItem.HistorySize != historyState.History?.Count
|| workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != historyState.ExecutionId)
if (this.workItem.HistorySize != historyState.History?.Count
|| this.workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != historyState.ExecutionId)
{
partition.EventTraceHelper.TraceEventProcessingWarning($"Fixing bad workitem cache instance={this.InstanceId} batch={this.WorkItemId} expected_size={historyState.History?.Count} actual_size={workItem.OrchestrationRuntimeState?.Events?.Count} expected_executionid={historyState.ExecutionId} actual_executionid={workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}");
partition.EventTraceHelper.TraceEventProcessingWarning($"Fixing bad workitem cache instance={this.InstanceId} batch={this.WorkItemId} expected_size={historyState.History?.Count} actual_size={this.workItem.OrchestrationRuntimeState?.Events?.Count} expected_executionid={historyState.ExecutionId} actual_executionid={this.workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}");

// we create a new work item and rehydrate the instance from its history
workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
workItem.HistorySize = historyState.History?.Count ?? 0;
this.workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
this.workItem.HistorySize = historyState.History?.Count ?? 0;
}
}
else
{
// we have to rehydrate the instance from its history
workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
workItem.HistorySize = historyState.History?.Count ?? 0;
this.workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
this.workItem.HistorySize = historyState.History?.Count ?? 0;
}

if (!this.IsExecutableInstance(workItem, out var reason))
if (!this.IsExecutableInstance(this.workItem, out var reason))
{
// this instance cannot be executed, so we are discarding the messages
for (int i = 0; i < workItem.MessageBatch.MessagesToProcess.Count; i++)
for (int i = 0; i < this.workItem.MessageBatch.MessagesToProcess.Count; i++)
{
partition.WorkItemTraceHelper.TraceTaskMessageDiscarded(
this.PartitionId,
workItem.MessageBatch.MessagesToProcess[i],
workItem.MessageBatch.MessagesToProcessOrigin[i],
this.workItem.MessageBatch.MessagesToProcess[i],
this.workItem.MessageBatch.MessagesToProcessOrigin[i],
reason);
}

Expand All @@ -151,11 +144,24 @@ public override void OnReadComplete(TrackedObject s, Partition partition)
}
else
{
// the work item is ready to process - put it into the OrchestrationWorkItemQueue
partition.EnqueueOrchestrationWorkItem(workItem);
if (this.waitForDequeueCountPersistence != null)
{
// process the work item once the filing event is persisted
DurabilityListeners.Register(this.waitForDequeueCountPersistence, this);
}
else
{
// the work item is ready to process now - put it into the OrchestrationWorkItemQueue
partition.EnqueueOrchestrationWorkItem(this.workItem);
}
}
}

public void ConfirmDurable(Event evt)
{
this.workItem.Partition.EnqueueOrchestrationWorkItem(this.workItem);
}

bool IsExecutableInstance(TaskOrchestrationWorkItem workItem, out string message)
{
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null
Expand Down
25 changes: 18 additions & 7 deletions src/DurableTask.Netherite/OrchestrationService/Partition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler

this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, "");

errorHandler.Token.Register(() => this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, ""));
errorHandler.Token.Register(() => {
this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, "");

if (!this.ErrorHandler.NormalTermination && this.Settings.TestHooks != null && this.Settings.TestHooks.FaultInjector == null)
{
this.Settings.TestHooks.Error("Partition", "Unexpected partition termination during test");
}
});

await MaxConcurrentStarts.WaitAsync();

Expand Down Expand Up @@ -136,18 +143,22 @@ public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler
}

[Conditional("DEBUG")]
public void Assert(bool condition)
public void Assert(bool condition, string message)
{
if (!condition)
{
if (System.Diagnostics.Debugger.IsAttached)
if (this.Settings.TestHooks != null)
{
this.Settings.TestHooks.Error("Partition.Assert", $"Assertion Failed: {message}");
}
else if (System.Diagnostics.Debugger.IsAttached)
{
System.Diagnostics.Debugger.Break();
}

var stacktrace = System.Environment.StackTrace;

this.ErrorHandler.HandleError(stacktrace, "Assertion failed", null, false, false);
this.ErrorHandler.HandleError(stacktrace, $"Assertion Failed: {message}", null, false, false);
}
}

Expand Down Expand Up @@ -176,7 +187,7 @@ public async Task StopAsync(bool quickly)
}

// at this point, the partition has been terminated (either cleanly or by exception)
this.Assert(this.ErrorHandler.IsTerminated);
this.Assert(this.ErrorHandler.IsTerminated, "expect termination to be complete");

// tell the load publisher to send all buffered info
if (this.LoadPublisher != null)
Expand Down Expand Up @@ -257,7 +268,7 @@ public void SubmitEvent(PartitionReadEvent readEvent)

public void SubmitParallelEvent(PartitionEvent partitionEvent)
{
this.Assert(!(partitionEvent is PartitionUpdateEvent));
this.Assert(!(partitionEvent is PartitionUpdateEvent), "SubmitParallelEvent must not be called with update event");
partitionEvent.ReceivedTimestamp = this.CurrentTimeMs;
this.State.SubmitParallelEvent(partitionEvent);
}
Expand All @@ -279,7 +290,7 @@ public void SubmitEvents(IList<PartitionEvent> partitionEvents)

public void EnqueueActivityWorkItem(ActivityWorkItem item)
{
this.Assert(!string.IsNullOrEmpty(item.OriginWorkItem));
this.Assert(!string.IsNullOrEmpty(item.OriginWorkItem), "null work item in EnqueueActivityWorkItem");

this.WorkItemTraceHelper.TraceWorkItemQueued(
this.PartitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class PartitionErrorHandler : IPartitionErrorHandler
public CancellationToken Token => this.cts.Token;
public bool IsTerminated => this.cts.Token.IsCancellationRequested;

public bool NormalTermination { get; private set; }

public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelLimit, string storageAccountName, string taskHubName)
{
this.cts = new CancellationTokenSource();
Expand Down Expand Up @@ -62,6 +64,7 @@ void TraceError(bool isWarning, string context, string message, Exception except

public void TerminateNormally()
{
this.NormalTermination = true;
this.Terminate();
}

Expand Down
7 changes: 5 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/TestHooks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ namespace DurableTask.Netherite
/// </summary>
public class TestHooks
{
internal Faster.CacheDebugger CacheDebugger { get; set; } = null;
internal Faster.CacheDebugger CacheDebugger { get; set; }

internal Faster.ReplayChecker ReplayChecker { get; set; }

public Faster.FaultInjector FaultInjector { get; set; }

internal Faster.ReplayChecker ReplayChecker { get; set; } = null;

internal event Action<string> OnError;

Expand Down
Loading

0 comments on commit 0499ba5

Please sign in to comment.