Skip to content

Commit

Permalink
Introduce a recoverability pipeline (#6238)
Browse files Browse the repository at this point in the history
* Introduce a dispatch pipline to outgoing errors

* Temp

* Naming

* Revert

* Revert

* Adjust context

* Split executors

* FIrst draft of a pipeline

* Fix filename

* Fix names

* Invoke policy before pipeline

* Move errorhandle result to the recoverability action

* Pull recoverability events out to a behavior

* Make transport adjustments in one place

* Make satellites use the same executor

* Make actions return the transport operations

* Remove unused file

* Fix logging

* Add the satellite executor back

* Make static headers work again

* Move events to terminator

* Cleanup

* Add test to show alternate recoverability actions

* Make actions extendable

* Make test more realistic

* Cleanup test

* Better name

* Approvals

* Allow the actions to return the notification with an internal API

* Make things protected internal to enforce normal creation via the provided methods but allow inheritance

* Fix the notifications

* Switch to routing context design

* Fix tests and approvals

* Remove the dispatch method from error context and partially undo the extensibility changes

* Remove the closure allocations from the recoverability action invocations

* Additional comment

* Update src/NServiceBus.Core/Recoverability/IRecoverabilityActionContext.cs

Co-authored-by: Andreas Öhlund <andreas.ohlund@particular.net>

* Proper typing for the executors

Co-authored-by: danielmarbach <daniel.marbach@openplace.net>
Co-authored-by: Daniel Marbach <daniel.marbach@nservicebus.com>
  • Loading branch information
3 people authored Feb 8, 2022
1 parent 846a2b4 commit 7044043
Show file tree
Hide file tree
Showing 50 changed files with 1,438 additions and 996 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,9 @@ context0 => Convert(context0.Extensions.Behaviors[0]).Invoke(context0, value(Sys
context9 => Convert(context9.Extensions.Behaviors[9]).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => Convert(context10.Extensions.Behaviors[10]).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => Convert(context11.Extensions.Behaviors[11]).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])),

context0 => Convert(context0.Extensions.Behaviors[0]).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context1 => Convert(context1.Extensions.Behaviors[1]).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context2 => Convert(context2.Extensions.Behaviors[2]).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context3 => Convert(context3.Extensions.Behaviors[3]).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])),
context4 => Convert(context4.Extensions.Behaviors[4]).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task])),
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,9 @@ context0 => Convert(context0.Extensions.Behaviors[0], CaptureExceptionBehavior).
context9 => Convert(context9.Extensions.Behaviors[9], InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => Convert(context10.Extensions.Behaviors[10], LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => Convert(context11.Extensions.Behaviors[11], InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),

context0 => Convert(context0.Extensions.Behaviors[0], RecoverabilityRoutingConnector).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context1 => Convert(context1.Extensions.Behaviors[1], ThrowIfCannotDeferMessageBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context2 => Convert(context2.Extensions.Behaviors[2], AttachSenderRelatedInfoOnMessageBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context3 => Convert(context3.Extensions.Behaviors[3], RoutingToDispatchConnector).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])),
context4 => Convert(context4.Extensions.Behaviors[4], ImmediateDispatchTerminator).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])),
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,9 @@ context0 => Convert(context0.Extensions.Behaviors[0], CaptureExceptionBehavior).
context9 => Convert(context9.Extensions.Behaviors[9], InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])),
context10 => Convert(context10.Extensions.Behaviors[10], LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])),
context11 => Convert(context11.Extensions.Behaviors[11], InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])),

context0 => Convert(context0.Extensions.Behaviors[0], RecoverabilityRoutingConnector).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context1 => Convert(context1.Extensions.Behaviors[1], ThrowIfCannotDeferMessageBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context2 => Convert(context2.Extensions.Behaviors[2], AttachSenderRelatedInfoOnMessageBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])),
context3 => Convert(context3.Extensions.Behaviors[3], RoutingToDispatchConnector).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])),
context4 => Convert(context4.Extensions.Behaviors[4], ImmediateDispatchTerminator).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task])),
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class When_a_message_is_faulted : NServiceBusAcceptanceTest
public async Task Should_add_host_related_headers()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithAuditOn>(b => b.When((session, c) => session.SendLocal(new MessageThatFails())).DoNotFailOnErrorMessages())
.WithEndpoint<EndpointWithFailingMessage>(b => b.When((session, c) => session.SendLocal(new MessageThatFails())).DoNotFailOnErrorMessages())
.WithEndpoint<EndpointThatHandlesErrorMessages>()
.Done(c => c.Done)
.Run();
Expand All @@ -32,9 +32,9 @@ public class Context : ScenarioContext
public string Machine { get; set; }
}

public class EndpointWithAuditOn : EndpointConfigurationBuilder
public class EndpointWithFailingMessage : EndpointConfigurationBuilder
{
public EndpointWithAuditOn()
public EndpointWithFailingMessage()
{
EndpointSetup<DefaultServer>(c =>
{
Expand All @@ -58,9 +58,9 @@ public EndpointThatHandlesErrorMessages()
EndpointSetup<DefaultServer>();
}

public class MessageToBeAuditedHandler : IHandleMessages<MessageThatFails>
public class MessageThatFailsHandler : IHandleMessages<MessageThatFails>
{
public MessageToBeAuditedHandler(Context context)
public MessageThatFailsHandler(Context context)
{
testContext = context;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
namespace NServiceBus.AcceptanceTests.Core.Recoverability
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using NServiceBus.Pipeline;
using NUnit.Framework;

public class When_applying_message_recoverability : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_allow_for_alternate_move_to_error_action()
{
var onMessageSentToErrorQueueTriggered = false;
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithFailingHandler>(b => b
.DoNotFailOnErrorMessages()
.CustomConfig(config =>
{
config.Recoverability()
.Failed(f => f.OnMessageSentToErrorQueue((_, __) =>
{
onMessageSentToErrorQueueTriggered = true;
return Task.CompletedTask;
}));
})
.When((session, ctx) => session.SendLocal(new InitiatingMessage()))
)
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToErrorQueue)
.Run();

Assert.True(context.MessageBodyWasEmpty);
Assert.True(onMessageSentToErrorQueueTriggered);
}

class Context : ScenarioContext
{
public bool MessageMovedToErrorQueue { get; set; }
public bool MessageBodyWasEmpty { get; set; }
}

class EndpointWithFailingHandler : EndpointConfigurationBuilder
{
static readonly string ErrorQueueAddress = Conventions.EndpointNamingConvention(typeof(ErrorSpy));

public EndpointWithFailingHandler()
{
EndpointSetup<DefaultServer>((config, context) =>
{
config.SendFailedMessagesTo(ErrorQueueAddress);
config.Pipeline.Register(typeof(CustomRecoverabilityActionBehavior), "Applies a custom recoverability actions");
});
}

public class CustomRecoverabilityActionBehavior : Behavior<IRecoverabilityContext>
{
public override Task Invoke(IRecoverabilityContext context, Func<Task> next)
{
if (context.RecoverabilityAction is MoveToError)
{
//Here we could store the body, headers and error metadata elsewhere

context.RecoverabilityAction = new CustomOnErrorAction(context.RecoverabilityConfiguration.Failed.ErrorQueue);
}

return next();
}

class CustomOnErrorAction : MoveToError
{
public CustomOnErrorAction(string errorQueue) : base(errorQueue)
{
}

public override IReadOnlyCollection<IRoutingContext> GetRoutingContexts(IRecoverabilityActionContext context)
{
var routingContexts = base.GetRoutingContexts(context);

// show how we just send an empty message with the message id to the error queue
// headers are preserved to make sure the necessary acceptance test infrastructure is still present
foreach (var routingContext in routingContexts)
{
routingContext.Message.UpdateBody(ReadOnlyMemory<byte>.Empty);
}

return routingContexts;
}
}
}

class InitiatingHandler : IHandleMessages<InitiatingMessage>
{
public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context)
{
throw new SimulatedException("Some failure");
}
}
}

class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
EndpointSetup<DefaultServer>(c => c.Pipeline.Register(typeof(ErrorMessageDetector), "Detect incoming error messages"));
}

class ErrorMessageDetector : IBehavior<ITransportReceiveContext, ITransportReceiveContext>
{
public ErrorMessageDetector(Context testContext)
{
this.testContext = testContext;
}

public Task Invoke(ITransportReceiveContext context, Func<ITransportReceiveContext, Task> next)
{
testContext.MessageBodyWasEmpty = context.Message.Body.IsEmpty;
testContext.MessageMovedToErrorQueue = true;
return next(context);
}

Context testContext;
}
}

public class InitiatingMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Features;
using MessageMutator;
using NUnit.Framework;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
namespace NServiceBus.AcceptanceTests.Core.Recoverability
{
using System;
using System.Threading.Tasks;
using AcceptanceTesting;
using AcceptanceTesting.Customization;
using EndpointTemplates;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NUnit.Framework;

public class When_message_is_dispatched_to_error_queue : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_allow_body_to_be_manipulated()
{
var context = await Scenario.Define<Context>()
.WithEndpoint<EndpointWithFailingHandler>(b => b
.DoNotFailOnErrorMessages()
.When((session, ctx) => session.SendLocal(new InitiatingMessage()))
)
.WithEndpoint<ErrorSpy>()
.Done(c => c.MessageMovedToErrorQueue)
.Run();

Assert.True(context.MessageBodyWasEmpty);
}

class Context : ScenarioContext
{
public bool MessageMovedToErrorQueue { get; set; }
public bool MessageBodyWasEmpty { get; internal set; }
}

class EndpointWithFailingHandler : EndpointConfigurationBuilder
{
static string errorQueueAddress = Conventions.EndpointNamingConvention(typeof(ErrorSpy));

public EndpointWithFailingHandler()
{
EndpointSetup<DefaultServer>((config, context) =>
{
config.SendFailedMessagesTo(errorQueueAddress);
config.Pipeline.Register(typeof(ErrorBodyStorageBehavior), "Simulate writing the body to a separate storage and pass a null body to the transport");
});
}

public class ErrorBodyStorageBehavior : Behavior<IDispatchContext>
{
public override Task Invoke(IDispatchContext context, Func<Task> next)
{
foreach (var operation in context.Operations)
{
var unicastAddress = operation.AddressTag as UnicastAddressTag;

if (unicastAddress?.Destination != errorQueueAddress)
{
continue;
}

operation.Message.UpdateBody(ReadOnlyMemory<byte>.Empty);
}
return next();
}
}

class InitiatingHandler : IHandleMessages<InitiatingMessage>
{
public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context)
{
throw new SimulatedException("Some failure");
}
}
}

class ErrorSpy : EndpointConfigurationBuilder
{
public ErrorSpy()
{
EndpointSetup<DefaultServer>(c => c.Pipeline.Register(typeof(ErrorMessageDetector), "Detect incoming error messages"));
}

class ErrorMessageDetector : IBehavior<ITransportReceiveContext, ITransportReceiveContext>
{
public ErrorMessageDetector(Context testContext)
{
this.testContext = testContext;
}

public Task Invoke(ITransportReceiveContext context, Func<ITransportReceiveContext, Task> next)
{
testContext.MessageBodyWasEmpty = context.Message.Body.IsEmpty;
testContext.MessageMovedToErrorQueue = true;
return next(context);
}

Context testContext;
}
}

public class InitiatingMessage : IMessage
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

<ItemGroup>
<RemoveSourceFileFromPackage Include="Core\**\*.cs" />
<RemoveSourceFileFromPackage Remove="Core\Recoverability\When_applying_message_recoverability.cs" />
<RemoveSourceFileFromPackage Include="AssemblyInfo.cs" />
</ItemGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void EachTypeHasABasicTest()
.Where(t => t.IsPublic && !t.IsSealed && !t.GetCustomAttributes(true).OfType<ObsoleteAttribute>().Any())
.Where(t => t.IsInterface || t.IsAbstract)
.Where(t => !ignoredTypes.Contains(t))
.Where(HasMethodWithContextParameter)
.Where(HasTaskReturningMethodWithContextParameter)
.OrderBy(t => t.FullName)
.ToArray();

Expand Down Expand Up @@ -112,7 +112,7 @@ public class TestTimeout {}";
return Assert(ForwardCancellationTokenAnalyzer.DiagnosticId, code);
}

static bool HasMethodWithContextParameter(Type type)
static bool HasTaskReturningMethodWithContextParameter(Type type)
{
var typeList = type.GetInterfaces().ToList();
typeList.Add(type);
Expand All @@ -121,6 +121,13 @@ static bool HasMethodWithContextParameter(Type type)
{
foreach (var method in typeOrInterface.GetMethods())
{
var isAwaitable = method.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null;

if (!isAwaitable)
{
continue;
}

foreach (var param in method.GetParameters())
{
if (typeof(ICancellableContext).IsAssignableFrom(param.ParameterType))
Expand Down
Loading

0 comments on commit 7044043

Please sign in to comment.