Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fault injection test, fix bugs #101

Merged
merged 19 commits into from
Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
6d94621
implement fault injection tests
sebastianburckhardt Dec 22, 2021
4afe223
fix tests
sebastianburckhardt Dec 22, 2021
6b82f43
Merge branch 'dev' into pr/replaytests
sebastianburckhardt Jan 5, 2022
bece01c
update fault injection tests
sebastianburckhardt Jan 5, 2022
d5ed504
revise MemoryTransport failure handling
sebastianburckhardt Jan 5, 2022
fbc2459
fix serialization of RecoveryCompleted
sebastianburckhardt Jan 6, 2022
d9fbc02
refactor concurrent tests into separate file
sebastianburckhardt Jan 6, 2022
10a3bd6
revise tracing and timeouts in tests
sebastianburckhardt Jan 6, 2022
f9da402
fix fault injection tests
sebastianburckhardt Jan 6, 2022
7989324
change timeout behavior of wait for orchestration to bring it in line…
sebastianburckhardt Jan 6, 2022
7dc35ba
fix bug that causes test to hang when recovering
sebastianburckhardt Jan 6, 2022
c29fd4d
revise the recovery mechanism in MemoryTransport
sebastianburckhardt Jan 6, 2022
53b0955
connect Partition.Assert to TestHooks so it can fail unit tests
sebastianburckhardt Jan 6, 2022
d2adf16
fix missing check for replaying when processing RecoveryCompleted event
sebastianburckhardt Jan 6, 2022
19aff27
revise fault injector: add timeout to startup and do not inject lease…
sebastianburckhardt Jan 6, 2022
c360461
annotate tests with whether they support any transport
sebastianburckhardt Jan 6, 2022
da19fdd
adjust timeouts and update pipeline
sebastianburckhardt Jan 6, 2022
11b9ebb
add labels to all the assertions and hook them up so they trip unit t…
sebastianburckhardt Jan 6, 2022
df10d1f
remove non-fixture query test from "AnyTransport" test category
sebastianburckhardt Jan 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pool:
variables:
solution: 'DurableTask.Netherite.sln'
buildPlatform: 'Any CPU'
buildConfiguration: 'Release'
buildConfiguration: 'Debug'

steps:
- task: NuGetToolInstaller@1
Expand Down Expand Up @@ -51,6 +51,6 @@ steps:
inputs:
command: 'test'
projects: '**/DurableTask.Netherite*Tests.csproj'
arguments: '--configuration $(buildConfiguration) --verbosity normal'
arguments: '--configuration $(buildConfiguration) --verbosity normal --filter "AnyTransport=true"'
testRunTitle: 'Netherite Unit Tests on storageAccount/EventHubs'

Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void SetCurrentUpdateEvent(PartitionUpdateEvent updateEvent)

protected abstract void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning);

public abstract void Assert(bool condition);
public abstract void Assert(bool condition, string message);

public abstract uint PartitionId { get; }

Expand Down Expand Up @@ -103,7 +103,7 @@ public async Task ProcessUpdate(PartitionUpdateEvent updateEvent)
{
this.EventDetailTracer?.TraceEventProcessingStarted(commitLogPosition, updateEvent, EventTraceHelper.EventCategory.UpdateEvent, this.IsReplaying);

this.Assert(updateEvent != null);
this.Assert(updateEvent != null, "null event in ProcessUpdate");

this.SetCurrentUpdateEvent(updateEvent);

Expand Down Expand Up @@ -165,7 +165,7 @@ async ValueTask ProcessRecursively()
public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key, TrackedObject target)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // read events are never part of the replay
this.Assert(!this.IsReplaying, "read events are not part of the replay");
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, readEvent.EventIdString))
Expand Down Expand Up @@ -223,7 +223,7 @@ public void ProcessReadResult(PartitionReadEvent readEvent, TrackedObjectKey key
public async Task ProcessQueryResultAsync(PartitionQueryEvent queryEvent, IAsyncEnumerable<OrchestrationState> instances)
{
(long commitLogPosition, long inputQueuePosition) = this.GetPositions();
this.Assert(!this.IsReplaying); // query events are never part of the replay
this.Assert(!this.IsReplaying, "query events are never part of the replay");
double startedTimestamp = this.CurrentTimeMs;

using (EventTraceContext.MakeContext(commitLogPosition, queryEvent.EventIdString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public interface IPartitionErrorHandler
/// </summary>
bool IsTerminated { get; }

/// <summary>
/// A boolean indicating that normal termination has been initiated as part of a shutdown.
/// </summary>
bool NormalTermination { get; }

/// <summary>
/// Error handling for the partition.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public override EventTraceHelper EventDetailTracer
protected override void HandleError(string where, string message, Exception e, bool terminatePartition, bool reportAsWarning)
=> this.Partition.ErrorHandler.HandleError(where, message, e, terminatePartition, reportAsWarning);

public override void Assert(bool condition)
=> this.Partition.Assert(condition);
public override void Assert(bool condition, string message)
=> this.Partition.Assert(condition, message);

public override uint PartitionId
=> this.Partition.PartitionId;
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.Netherite/Events/Event.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public static IEnumerable<Type> KnownTypes()
yield return typeof(BatchProcessed);
yield return typeof(OffloadDecision);
yield return typeof(PurgeBatchIssued);
yield return typeof(RecoveryCompleted);
yield return typeof(SendConfirmed);
yield return typeof(TimerFired);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ protected override void ExtraTraceInformation(StringBuilder s)
s.Append(this.MaxSessionDequeueCount);
}

public override void OnSubmit(Partition partition)
{
partition.EventTraceHelper.TraceEventProcessingDetail($"Submitted {this}");
}

[IgnoreDataMember]
public override EventId EventId => EventId.MakePartitionInternalEventId($"Recovered-{this.WorkerId}-{this.Timestamp:o}");

Expand Down
11 changes: 9 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,15 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
TimeoutUtc = this.GetTimeoutBucket(timeout),
};

var response = await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
return ((WaitResponseReceived)response)?.OrchestrationState;
try
{
var response = await this.PerformRequestWithTimeoutAndCancellation(cancellationToken, request, false).ConfigureAwait(false);
return ((WaitResponseReceived)response)?.OrchestrationState;
}
catch(TimeoutException)
{
return null; // to match semantics of other backends, wait returns null when timing out
}
}

public async Task<OrchestrationState> GetOrchestrationStateAsync(uint partitionId, string instanceId, bool fetchInput = true, bool fetchOutput = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,11 +608,6 @@ async Task<TaskOrchestrationWorkItem> IOrchestrationService.LockNextTaskOrchestr

if (nextOrchestrationWorkItem != null)
{
if (nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence != null)
{
await nextOrchestrationWorkItem.MessageBatch.WaitForDequeueCountPersistence.Task;
}

nextOrchestrationWorkItem.MessageBatch.WaitingSince = null;

this.workItemTraceHelper.TraceWorkItemStarted(
Expand Down Expand Up @@ -653,8 +648,8 @@ Task IOrchestrationService.CompleteTaskOrchestrationWorkItemAsync(

// all continue as new requests are processed immediately (DurableTask.Core always uses "fast" continue-as-new)
// so by the time we get here, it is not a continue as new
partition.Assert(continuedAsNewMessage == null);
partition.Assert(workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew);
partition.Assert(continuedAsNewMessage == null, "unexpected continueAsNew message");
partition.Assert(workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.ContinuedAsNew, "unexpected continueAsNew status");

// we assign sequence numbers to all outgoing messages, to help us track them using unique message ids
long sequenceNumber = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ class OrchestrationMessageBatch : InternalReadEvent, TransportAbstraction.IDurab
public string WorkItemId;
public double? WaitingSince; // measures time waiting to execute

// enforces that the activity cannot start executing before the issuing event is persisted
public readonly TaskCompletionSource<object> WaitForDequeueCountPersistence;
readonly PartitionUpdateEvent waitForDequeueCountPersistence;
OrchestrationWorkItem workItem;

public override EventId EventId => EventId.MakePartitionInternalEventId(this.WorkItemId);

Expand All @@ -46,23 +46,17 @@ public OrchestrationMessageBatch(string instanceId, SessionsState.Session sessio

session.CurrentBatch = this;

if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem)
if (partition.Settings.PersistDequeueCountBeforeStartingWorkItem || filingEvent is RecoveryCompleted)
{
this.WaitForDequeueCountPersistence = new TaskCompletionSource<object>();
DurabilityListeners.Register(filingEvent, this);
this.waitForDequeueCountPersistence = filingEvent;
}

partition.EventDetailTracer?.TraceEventProcessingDetail($"OrchestrationMessageBatch is prefetching instance={this.InstanceId} batch={this.WorkItemId}");

// continue when we have the history state loaded, which gives us the latest state and/or cursor
partition.SubmitEvent(this);
}

public void ConfirmDurable(Event evt)
{
this.WaitForDequeueCountPersistence.TrySetResult(null);
}

public IEnumerable<(TaskMessage, string)> TracedMessages
{
get
Expand All @@ -87,53 +81,52 @@ public void ConfirmDurable(Event evt)
public override void OnReadComplete(TrackedObject s, Partition partition)
{
var historyState = (HistoryState)s;
OrchestrationWorkItem workItem;


if (this.ForceNewExecution || historyState == null)
{
// we either have no previous instance, or want to replace the previous instance
workItem = new OrchestrationWorkItem(partition, this, previousHistory: null);
workItem.Type = OrchestrationWorkItem.ExecutionType.Fresh;
workItem.HistorySize = 0;
this.workItem = new OrchestrationWorkItem(partition, this, previousHistory: null);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.Fresh;
this.workItem.HistorySize = 0;
}
else if (historyState.CachedOrchestrationWorkItem != null)
{
// we have a cached cursor and can resume where we left off
workItem = historyState.CachedOrchestrationWorkItem;
workItem.SetNextMessageBatch(this);
workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromCursor;
workItem.HistorySize = workItem.OrchestrationRuntimeState?.Events?.Count ?? 0;
this.workItem = historyState.CachedOrchestrationWorkItem;
this.workItem.SetNextMessageBatch(this);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromCursor;
this.workItem.HistorySize = this.workItem.OrchestrationRuntimeState?.Events?.Count ?? 0;

// sanity check: it appears cursor is sometimes corrupted, in that case, construct fresh
// TODO investigate reasons and fix root cause
if (workItem.HistorySize != historyState.History?.Count
|| workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != historyState.ExecutionId)
if (this.workItem.HistorySize != historyState.History?.Count
|| this.workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId != historyState.ExecutionId)
{
partition.EventTraceHelper.TraceEventProcessingWarning($"Fixing bad workitem cache instance={this.InstanceId} batch={this.WorkItemId} expected_size={historyState.History?.Count} actual_size={workItem.OrchestrationRuntimeState?.Events?.Count} expected_executionid={historyState.ExecutionId} actual_executionid={workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}");
partition.EventTraceHelper.TraceEventProcessingWarning($"Fixing bad workitem cache instance={this.InstanceId} batch={this.WorkItemId} expected_size={historyState.History?.Count} actual_size={this.workItem.OrchestrationRuntimeState?.Events?.Count} expected_executionid={historyState.ExecutionId} actual_executionid={this.workItem.OrchestrationRuntimeState?.OrchestrationInstance?.ExecutionId}");

// we create a new work item and rehydrate the instance from its history
workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
workItem.HistorySize = historyState.History?.Count ?? 0;
this.workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
this.workItem.HistorySize = historyState.History?.Count ?? 0;
}
}
else
{
// we have to rehydrate the instance from its history
workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
workItem.HistorySize = historyState.History?.Count ?? 0;
this.workItem = new OrchestrationWorkItem(partition, this, previousHistory: historyState.History);
this.workItem.Type = OrchestrationWorkItem.ExecutionType.ContinueFromHistory;
this.workItem.HistorySize = historyState.History?.Count ?? 0;
}

if (!this.IsExecutableInstance(workItem, out var reason))
if (!this.IsExecutableInstance(this.workItem, out var reason))
{
// this instance cannot be executed, so we are discarding the messages
for (int i = 0; i < workItem.MessageBatch.MessagesToProcess.Count; i++)
for (int i = 0; i < this.workItem.MessageBatch.MessagesToProcess.Count; i++)
{
partition.WorkItemTraceHelper.TraceTaskMessageDiscarded(
this.PartitionId,
workItem.MessageBatch.MessagesToProcess[i],
workItem.MessageBatch.MessagesToProcessOrigin[i],
this.workItem.MessageBatch.MessagesToProcess[i],
this.workItem.MessageBatch.MessagesToProcessOrigin[i],
reason);
}

Expand All @@ -151,11 +144,24 @@ public override void OnReadComplete(TrackedObject s, Partition partition)
}
else
{
// the work item is ready to process - put it into the OrchestrationWorkItemQueue
partition.EnqueueOrchestrationWorkItem(workItem);
if (this.waitForDequeueCountPersistence != null)
{
// process the work item once the filing event is persisted
DurabilityListeners.Register(this.waitForDequeueCountPersistence, this);
}
else
{
// the work item is ready to process now - put it into the OrchestrationWorkItemQueue
partition.EnqueueOrchestrationWorkItem(this.workItem);
}
}
}

public void ConfirmDurable(Event evt)
{
this.workItem.Partition.EnqueueOrchestrationWorkItem(this.workItem);
}

bool IsExecutableInstance(TaskOrchestrationWorkItem workItem, out string message)
{
if (workItem.OrchestrationRuntimeState.ExecutionStartedEvent == null
Expand Down
25 changes: 18 additions & 7 deletions src/DurableTask.Netherite/OrchestrationService/Partition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,14 @@ public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler

this.TraceHelper.TracePartitionProgress("Starting", ref this.LastTransition, this.CurrentTimeMs, "");

errorHandler.Token.Register(() => this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, ""));
errorHandler.Token.Register(() => {
this.TraceHelper.TracePartitionProgress("Terminated", ref this.LastTransition, this.CurrentTimeMs, "");

if (!this.ErrorHandler.NormalTermination && this.Settings.TestHooks != null && this.Settings.TestHooks.FaultInjector == null)
{
this.Settings.TestHooks.Error("Partition", "Unexpected partition termination during test");
}
});

await MaxConcurrentStarts.WaitAsync();

Expand Down Expand Up @@ -136,18 +143,22 @@ public async Task<long> CreateOrRestoreAsync(IPartitionErrorHandler errorHandler
}

[Conditional("DEBUG")]
public void Assert(bool condition)
public void Assert(bool condition, string message)
{
if (!condition)
{
if (System.Diagnostics.Debugger.IsAttached)
if (this.Settings.TestHooks != null)
{
this.Settings.TestHooks.Error("Partition.Assert", $"Assertion Failed: {message}");
}
else if (System.Diagnostics.Debugger.IsAttached)
{
System.Diagnostics.Debugger.Break();
}

var stacktrace = System.Environment.StackTrace;

this.ErrorHandler.HandleError(stacktrace, "Assertion failed", null, false, false);
this.ErrorHandler.HandleError(stacktrace, $"Assertion Failed: {message}", null, false, false);
}
}

Expand Down Expand Up @@ -176,7 +187,7 @@ public async Task StopAsync(bool quickly)
}

// at this point, the partition has been terminated (either cleanly or by exception)
this.Assert(this.ErrorHandler.IsTerminated);
this.Assert(this.ErrorHandler.IsTerminated, "expect termination to be complete");

// tell the load publisher to send all buffered info
if (this.LoadPublisher != null)
Expand Down Expand Up @@ -257,7 +268,7 @@ public void SubmitEvent(PartitionReadEvent readEvent)

public void SubmitParallelEvent(PartitionEvent partitionEvent)
{
this.Assert(!(partitionEvent is PartitionUpdateEvent));
this.Assert(!(partitionEvent is PartitionUpdateEvent), "SubmitParallelEvent must not be called with update event");
partitionEvent.ReceivedTimestamp = this.CurrentTimeMs;
this.State.SubmitParallelEvent(partitionEvent);
}
Expand All @@ -279,7 +290,7 @@ public void SubmitEvents(IList<PartitionEvent> partitionEvents)

public void EnqueueActivityWorkItem(ActivityWorkItem item)
{
this.Assert(!string.IsNullOrEmpty(item.OriginWorkItem));
this.Assert(!string.IsNullOrEmpty(item.OriginWorkItem), "null work item in EnqueueActivityWorkItem");

this.WorkItemTraceHelper.TraceWorkItemQueued(
this.PartitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class PartitionErrorHandler : IPartitionErrorHandler
public CancellationToken Token => this.cts.Token;
public bool IsTerminated => this.cts.Token.IsCancellationRequested;

public bool NormalTermination { get; private set; }

public PartitionErrorHandler(int partitionId, ILogger logger, LogLevel logLevelLimit, string storageAccountName, string taskHubName)
{
this.cts = new CancellationTokenSource();
Expand Down Expand Up @@ -62,6 +64,7 @@ void TraceError(bool isWarning, string context, string message, Exception except

public void TerminateNormally()
{
this.NormalTermination = true;
this.Terminate();
}

Expand Down
7 changes: 5 additions & 2 deletions src/DurableTask.Netherite/OrchestrationService/TestHooks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ namespace DurableTask.Netherite
/// </summary>
public class TestHooks
{
internal Faster.CacheDebugger CacheDebugger { get; set; } = null;
internal Faster.CacheDebugger CacheDebugger { get; set; }

internal Faster.ReplayChecker ReplayChecker { get; set; }

public Faster.FaultInjector FaultInjector { get; set; }

internal Faster.ReplayChecker ReplayChecker { get; set; } = null;

internal event Action<string> OnError;

Expand Down
Loading