diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt index 006ccb90da5..e42cf746047 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt @@ -65,3 +65,9 @@ context0 => Convert(context0.Extensions.Behaviors[0]).Invoke(context0, value(Sys context9 => Convert(context9.Extensions.Behaviors[9]).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), context10 => Convert(context10.Extensions.Behaviors[10]).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), context11 => Convert(context11.Extensions.Behaviors[11]).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), + +context0 => Convert(context0.Extensions.Behaviors[0]).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context1 => Convert(context1.Extensions.Behaviors[1]).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context2 => Convert(context2.Extensions.Behaviors[2]).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context3 => Convert(context3.Extensions.Behaviors[3]).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])), + context4 => Convert(context4.Extensions.Behaviors[4]).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt index 8214ff93fb5..47897918028 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt @@ -65,3 +65,9 @@ context0 => Convert(context0.Extensions.Behaviors[0], CaptureExceptionBehavior). context9 => Convert(context9.Extensions.Behaviors[9], InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), context10 => Convert(context10.Extensions.Behaviors[10], LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), context11 => Convert(context11.Extensions.Behaviors[11], InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), + +context0 => Convert(context0.Extensions.Behaviors[0], RecoverabilityRoutingConnector).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context1 => Convert(context1.Extensions.Behaviors[1], ThrowIfCannotDeferMessageBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context2 => Convert(context2.Extensions.Behaviors[2], AttachSenderRelatedInfoOnMessageBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context3 => Convert(context3.Extensions.Behaviors[3], RoutingToDispatchConnector).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])), + context4 => Convert(context4.Extensions.Behaviors[4], ImmediateDispatchTerminator).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), diff --git a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt index 1991f0cf6f8..0bf74bc4498 100644 --- a/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt +++ b/src/NServiceBus.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt @@ -65,3 +65,9 @@ context0 => Convert(context0.Extensions.Behaviors[0], CaptureExceptionBehavior). context9 => Convert(context9.Extensions.Behaviors[9], InferredMessageTypeEnricherBehavior).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), context10 => Convert(context10.Extensions.Behaviors[10], LoadHandlersConnector).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), context11 => Convert(context11.Extensions.Behaviors[11], InvokeHandlerTerminator).Invoke(context11, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), + +context0 => Convert(context0.Extensions.Behaviors[0], RecoverabilityRoutingConnector).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context1 => Convert(context1.Extensions.Behaviors[1], ThrowIfCannotDeferMessageBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context2 => Convert(context2.Extensions.Behaviors[2], AttachSenderRelatedInfoOnMessageBehavior).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context3 => Convert(context3.Extensions.Behaviors[3], RoutingToDispatchConnector).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])), + context4 => Convert(context4.Extensions.Behaviors[4], ImmediateDispatchTerminator).Invoke(context4, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.AcceptanceTests/Core/Diagnostics/When_a_message_is_faulted.cs b/src/NServiceBus.AcceptanceTests/Core/Diagnostics/When_a_message_is_faulted.cs index e34b2913ba2..35fd9785b30 100644 --- a/src/NServiceBus.AcceptanceTests/Core/Diagnostics/When_a_message_is_faulted.cs +++ b/src/NServiceBus.AcceptanceTests/Core/Diagnostics/When_a_message_is_faulted.cs @@ -12,7 +12,7 @@ public class When_a_message_is_faulted : NServiceBusAcceptanceTest public async Task Should_add_host_related_headers() { var context = await Scenario.Define() - .WithEndpoint(b => b.When((session, c) => session.SendLocal(new MessageThatFails())).DoNotFailOnErrorMessages()) + .WithEndpoint(b => b.When((session, c) => session.SendLocal(new MessageThatFails())).DoNotFailOnErrorMessages()) .WithEndpoint() .Done(c => c.Done) .Run(); @@ -32,9 +32,9 @@ public class Context : ScenarioContext public string Machine { get; set; } } - public class EndpointWithAuditOn : EndpointConfigurationBuilder + public class EndpointWithFailingMessage : EndpointConfigurationBuilder { - public EndpointWithAuditOn() + public EndpointWithFailingMessage() { EndpointSetup(c => { @@ -58,9 +58,9 @@ public EndpointThatHandlesErrorMessages() EndpointSetup(); } - public class MessageToBeAuditedHandler : IHandleMessages + public class MessageThatFailsHandler : IHandleMessages { - public MessageToBeAuditedHandler(Context context) + public MessageThatFailsHandler(Context context) { testContext = context; } diff --git a/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_applying_message_recoverability.cs b/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_applying_message_recoverability.cs new file mode 100644 index 00000000000..346b785a248 --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_applying_message_recoverability.cs @@ -0,0 +1,133 @@ +namespace NServiceBus.AcceptanceTests.Core.Recoverability +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using EndpointTemplates; + using NServiceBus.Pipeline; + using NUnit.Framework; + + public class When_applying_message_recoverability : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_allow_for_alternate_move_to_error_action() + { + var onMessageSentToErrorQueueTriggered = false; + var context = await Scenario.Define() + .WithEndpoint(b => b + .DoNotFailOnErrorMessages() + .CustomConfig(config => + { + config.Recoverability() + .Failed(f => f.OnMessageSentToErrorQueue((_, __) => + { + onMessageSentToErrorQueueTriggered = true; + return Task.CompletedTask; + })); + }) + .When((session, ctx) => session.SendLocal(new InitiatingMessage())) + ) + .WithEndpoint() + .Done(c => c.MessageMovedToErrorQueue) + .Run(); + + Assert.True(context.MessageBodyWasEmpty); + Assert.True(onMessageSentToErrorQueueTriggered); + } + + class Context : ScenarioContext + { + public bool MessageMovedToErrorQueue { get; set; } + public bool MessageBodyWasEmpty { get; set; } + } + + class EndpointWithFailingHandler : EndpointConfigurationBuilder + { + static readonly string ErrorQueueAddress = Conventions.EndpointNamingConvention(typeof(ErrorSpy)); + + public EndpointWithFailingHandler() + { + EndpointSetup((config, context) => + { + config.SendFailedMessagesTo(ErrorQueueAddress); + config.Pipeline.Register(typeof(CustomRecoverabilityActionBehavior), "Applies a custom recoverability actions"); + }); + } + + public class CustomRecoverabilityActionBehavior : Behavior + { + public override Task Invoke(IRecoverabilityContext context, Func next) + { + if (context.RecoverabilityAction is MoveToError) + { + //Here we could store the body, headers and error metadata elsewhere + + context.RecoverabilityAction = new CustomOnErrorAction(context.RecoverabilityConfiguration.Failed.ErrorQueue); + } + + return next(); + } + + class CustomOnErrorAction : MoveToError + { + public CustomOnErrorAction(string errorQueue) : base(errorQueue) + { + } + + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + var routingContexts = base.GetRoutingContexts(context); + + // show how we just send an empty message with the message id to the error queue + // headers are preserved to make sure the necessary acceptance test infrastructure is still present + foreach (var routingContext in routingContexts) + { + routingContext.Message.UpdateBody(ReadOnlyMemory.Empty); + } + + return routingContexts; + } + } + } + + class InitiatingHandler : IHandleMessages + { + public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context) + { + throw new SimulatedException("Some failure"); + } + } + } + + class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() + { + EndpointSetup(c => c.Pipeline.Register(typeof(ErrorMessageDetector), "Detect incoming error messages")); + } + + class ErrorMessageDetector : IBehavior + { + public ErrorMessageDetector(Context testContext) + { + this.testContext = testContext; + } + + public Task Invoke(ITransportReceiveContext context, Func next) + { + testContext.MessageBodyWasEmpty = context.Message.Body.IsEmpty; + testContext.MessageMovedToErrorQueue = true; + return next(context); + } + + Context testContext; + } + } + + public class InitiatingMessage : IMessage + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_failing_mutated_message.cs b/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_failing_mutated_message.cs index 096b4786d76..cd0d070965d 100644 --- a/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_failing_mutated_message.cs +++ b/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_failing_mutated_message.cs @@ -5,7 +5,6 @@ using System.Threading.Tasks; using AcceptanceTesting; using EndpointTemplates; - using Features; using MessageMutator; using NUnit.Framework; diff --git a/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_message_is_dispatched_to_error_queue.cs b/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_message_is_dispatched_to_error_queue.cs new file mode 100644 index 00000000000..72ba911fefa --- /dev/null +++ b/src/NServiceBus.AcceptanceTests/Core/Recoverability/When_message_is_dispatched_to_error_queue.cs @@ -0,0 +1,105 @@ +namespace NServiceBus.AcceptanceTests.Core.Recoverability +{ + using System; + using System.Threading.Tasks; + using AcceptanceTesting; + using AcceptanceTesting.Customization; + using EndpointTemplates; + using NServiceBus.Pipeline; + using NServiceBus.Routing; + using NUnit.Framework; + + public class When_message_is_dispatched_to_error_queue : NServiceBusAcceptanceTest + { + [Test] + public async Task Should_allow_body_to_be_manipulated() + { + var context = await Scenario.Define() + .WithEndpoint(b => b + .DoNotFailOnErrorMessages() + .When((session, ctx) => session.SendLocal(new InitiatingMessage())) + ) + .WithEndpoint() + .Done(c => c.MessageMovedToErrorQueue) + .Run(); + + Assert.True(context.MessageBodyWasEmpty); + } + + class Context : ScenarioContext + { + public bool MessageMovedToErrorQueue { get; set; } + public bool MessageBodyWasEmpty { get; internal set; } + } + + class EndpointWithFailingHandler : EndpointConfigurationBuilder + { + static string errorQueueAddress = Conventions.EndpointNamingConvention(typeof(ErrorSpy)); + + public EndpointWithFailingHandler() + { + EndpointSetup((config, context) => + { + config.SendFailedMessagesTo(errorQueueAddress); + config.Pipeline.Register(typeof(ErrorBodyStorageBehavior), "Simulate writing the body to a separate storage and pass a null body to the transport"); + }); + } + + public class ErrorBodyStorageBehavior : Behavior + { + public override Task Invoke(IDispatchContext context, Func next) + { + foreach (var operation in context.Operations) + { + var unicastAddress = operation.AddressTag as UnicastAddressTag; + + if (unicastAddress?.Destination != errorQueueAddress) + { + continue; + } + + operation.Message.UpdateBody(ReadOnlyMemory.Empty); + } + return next(); + } + } + + class InitiatingHandler : IHandleMessages + { + public Task Handle(InitiatingMessage initiatingMessage, IMessageHandlerContext context) + { + throw new SimulatedException("Some failure"); + } + } + } + + class ErrorSpy : EndpointConfigurationBuilder + { + public ErrorSpy() + { + EndpointSetup(c => c.Pipeline.Register(typeof(ErrorMessageDetector), "Detect incoming error messages")); + } + + class ErrorMessageDetector : IBehavior + { + public ErrorMessageDetector(Context testContext) + { + this.testContext = testContext; + } + + public Task Invoke(ITransportReceiveContext context, Func next) + { + testContext.MessageBodyWasEmpty = context.Message.Body.IsEmpty; + testContext.MessageMovedToErrorQueue = true; + return next(context); + } + + Context testContext; + } + } + + public class InitiatingMessage : IMessage + { + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj index 8f4eca00208..62dcaddbc05 100644 --- a/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj +++ b/src/NServiceBus.AcceptanceTests/NServiceBus.AcceptanceTests.csproj @@ -32,6 +32,7 @@ + diff --git a/src/NServiceBus.Core.Analyzer.Tests/ForwardCancellationToken/ForwardFromPipelineTests.cs b/src/NServiceBus.Core.Analyzer.Tests/ForwardCancellationToken/ForwardFromPipelineTests.cs index 82c55ff9241..1678959861d 100644 --- a/src/NServiceBus.Core.Analyzer.Tests/ForwardCancellationToken/ForwardFromPipelineTests.cs +++ b/src/NServiceBus.Core.Analyzer.Tests/ForwardCancellationToken/ForwardFromPipelineTests.cs @@ -31,7 +31,7 @@ public void EachTypeHasABasicTest() .Where(t => t.IsPublic && !t.IsSealed && !t.GetCustomAttributes(true).OfType().Any()) .Where(t => t.IsInterface || t.IsAbstract) .Where(t => !ignoredTypes.Contains(t)) - .Where(HasMethodWithContextParameter) + .Where(HasTaskReturningMethodWithContextParameter) .OrderBy(t => t.FullName) .ToArray(); @@ -112,7 +112,7 @@ public class TestTimeout {}"; return Assert(ForwardCancellationTokenAnalyzer.DiagnosticId, code); } - static bool HasMethodWithContextParameter(Type type) + static bool HasTaskReturningMethodWithContextParameter(Type type) { var typeList = type.GetInterfaces().ToList(); typeList.Add(type); @@ -121,6 +121,13 @@ static bool HasMethodWithContextParameter(Type type) { foreach (var method in typeOrInterface.GetMethods()) { + var isAwaitable = method.ReturnType.GetMethod(nameof(Task.GetAwaiter)) != null; + + if (!isAwaitable) + { + continue; + } + foreach (var param in method.GetParameters()) { if (typeof(ICancellableContext).IsAssignableFrom(param.ParameterType)) diff --git a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt index 04819400447..4d67b4612a7 100644 --- a/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt +++ b/src/NServiceBus.Core.Tests/ApprovalFiles/APIApprovals.ApproveNServiceBus.approved.txt @@ -256,9 +256,12 @@ namespace NServiceBus public NServiceBus.DelayedRetriesSettings OnMessageBeingRetried(System.Func notificationCallback) { } public NServiceBus.DelayedRetriesSettings TimeIncrease(System.TimeSpan timeIncrease) { } } - public sealed class DelayedRetry : NServiceBus.RecoverabilityAction + public class DelayedRetry : NServiceBus.RecoverabilityAction { + protected DelayedRetry(System.TimeSpan delay) { } public System.TimeSpan Delay { get; } + public override NServiceBus.Transport.ErrorHandleResult ErrorHandleResult { get; } + public override System.Collections.Generic.IReadOnlyCollection GetRoutingContexts(NServiceBus.Pipeline.IRecoverabilityActionContext context) { } } public enum DependencyLifecycle { @@ -275,9 +278,12 @@ namespace NServiceBus public static void CustomDiagnosticsWriter(this NServiceBus.EndpointConfiguration config, System.Func customDiagnosticsWriter) { } public static void SetDiagnosticsPath(this NServiceBus.EndpointConfiguration config, string path) { } } - public sealed class Discard : NServiceBus.RecoverabilityAction + public class Discard : NServiceBus.RecoverabilityAction { + public Discard(string reason) { } + public override NServiceBus.Transport.ErrorHandleResult ErrorHandleResult { get; } public string Reason { get; } + public override System.Collections.Generic.IReadOnlyCollection GetRoutingContexts(NServiceBus.Pipeline.IRecoverabilityActionContext context) { } } public class DistributionPolicy : NServiceBus.IDistributionPolicy { @@ -590,7 +596,12 @@ namespace NServiceBus public NServiceBus.ImmediateRetriesSettings OnMessageBeingRetried(System.Func notificationCallback) { } public NServiceBus.ImmediateRetriesSettings OnMessageBeingRetried(System.Func notificationCallback) { } } - public sealed class ImmediateRetry : NServiceBus.RecoverabilityAction { } + public class ImmediateRetry : NServiceBus.RecoverabilityAction + { + protected ImmediateRetry() { } + public override NServiceBus.Transport.ErrorHandleResult ErrorHandleResult { get; } + public override System.Collections.Generic.IReadOnlyCollection GetRoutingContexts(NServiceBus.Pipeline.IRecoverabilityActionContext context) { } + } [System.Obsolete("Gateway persistence has been moved to the NServiceBus.Gateway dedicated package. " + "Will be removed in version 9.0.0.", true)] public static class InMemoryGatewayPersistenceConfigurationExtensions @@ -776,9 +787,12 @@ namespace NServiceBus public static System.Threading.Tasks.Task Unsubscribe(this NServiceBus.IMessageSession session, System.Type messageType, System.Threading.CancellationToken cancellationToken = default) { } public static System.Threading.Tasks.Task Unsubscribe(this NServiceBus.IMessageSession session, System.Threading.CancellationToken cancellationToken = default) { } } - public sealed class MoveToError : NServiceBus.RecoverabilityAction + public class MoveToError : NServiceBus.RecoverabilityAction { + protected MoveToError(string errorQueue) { } + public override NServiceBus.Transport.ErrorHandleResult ErrorHandleResult { get; } public string ErrorQueue { get; } + public override System.Collections.Generic.IReadOnlyCollection GetRoutingContexts(NServiceBus.Pipeline.IRecoverabilityActionContext context) { } } public class NServiceBusMarkerInterfaceConvention : NServiceBus.IMessageConvention { @@ -894,6 +908,8 @@ namespace NServiceBus public abstract class RecoverabilityAction { protected RecoverabilityAction() { } + public abstract NServiceBus.Transport.ErrorHandleResult ErrorHandleResult { get; } + public abstract System.Collections.Generic.IReadOnlyCollection GetRoutingContexts(NServiceBus.Pipeline.IRecoverabilityActionContext context); public static NServiceBus.DelayedRetry DelayedRetry(System.TimeSpan timeSpan) { } public static NServiceBus.Discard Discard(string reason) { } public static NServiceBus.ImmediateRetry ImmediateRetry() { } @@ -2004,6 +2020,19 @@ namespace NServiceBus.Pipeline { NServiceBus.Pipeline.OutgoingLogicalMessage Message { get; } } + public interface IRecoverabilityActionContext : NServiceBus.Extensibility.IExtendable, NServiceBus.ICancellableContext, NServiceBus.Pipeline.IBehaviorContext + { + NServiceBus.Transport.ErrorContext ErrorContext { get; } + System.Collections.Generic.IReadOnlyDictionary Metadata { get; } + } + public interface IRecoverabilityContext : NServiceBus.Extensibility.IExtendable, NServiceBus.ICancellableContext, NServiceBus.Pipeline.IBehaviorContext + { + NServiceBus.Transport.ErrorContext ErrorContext { get; } + System.Collections.Generic.Dictionary Metadata { get; } + NServiceBus.RecoverabilityAction RecoverabilityAction { get; set; } + NServiceBus.RecoverabilityConfig RecoverabilityConfiguration { get; } + NServiceBus.Pipeline.IRecoverabilityActionContext PreventChanges(); + } public interface IRoutingContext : NServiceBus.Extensibility.IExtendable, NServiceBus.ICancellableContext, NServiceBus.Pipeline.IBehaviorContext { NServiceBus.Transport.OutgoingMessage Message { get; } @@ -2123,6 +2152,13 @@ namespace NServiceBus.Pipeline public static bool TryGetIncomingPhysicalMessage(this NServiceBus.Pipeline.IOutgoingReplyContext context, out NServiceBus.Transport.IncomingMessage message) { } } } +namespace NServiceBus.Recoverability +{ + public static class RecoverabilityContextExtensions + { + public static NServiceBus.Pipeline.IRoutingContext CreateRoutingContext(this NServiceBus.Pipeline.IRecoverabilityActionContext context, NServiceBus.Transport.OutgoingMessage outgoingMessage, NServiceBus.Routing.RoutingStrategy routingStrategy) { } + } +} namespace NServiceBus.Routing { public abstract class AddressTag @@ -2493,10 +2529,10 @@ namespace NServiceBus.Transport } public class ErrorContext { - public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary headers, string nativeMessageId, System.ReadOnlyMemory body, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.IReadOnlyContextBag context) { } + public ErrorContext(System.Exception exception, System.Collections.Generic.Dictionary headers, string nativeMessageId, System.ReadOnlyMemory body, NServiceBus.Transport.TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, NServiceBus.Extensibility.ContextBag context) { } public int DelayedDeliveriesPerformed { get; } public System.Exception Exception { get; } - public NServiceBus.Extensibility.IReadOnlyContextBag Extensions { get; } + public NServiceBus.Extensibility.ContextBag Extensions { get; } public int ImmediateProcessingFailures { get; } public NServiceBus.Transport.IncomingMessage Message { get; } public string ReceiveAddress { get; } diff --git a/src/NServiceBus.Core.Tests/Fakes/TestableContextChecker.cs b/src/NServiceBus.Core.Tests/Fakes/TestableContextChecker.cs index fb0f76d3236..a7ec3bfad94 100644 --- a/src/NServiceBus.Core.Tests/Fakes/TestableContextChecker.cs +++ b/src/NServiceBus.Core.Tests/Fakes/TestableContextChecker.cs @@ -21,7 +21,8 @@ public void ShouldProvideTestableImplementationForAllBehaviorContexts() .Where(x => !x.GetCustomAttributes().Any(att => att.GetType() == typeof(ObsoleteAttribute))) .Except(new[] { - typeof(PipelineTerminator<>.ITerminatingContext) + typeof(PipelineTerminator<>.ITerminatingContext), + typeof(IRecoverabilityActionContext) }); foreach (var behaviorContextInterface in behaviorContextInterfaces) diff --git a/src/NServiceBus.Core.Tests/Recoverability/DelayedRetryExecutorTests.cs b/src/NServiceBus.Core.Tests/Recoverability/DelayedRetryExecutorTests.cs deleted file mode 100644 index 6d585d22205..00000000000 --- a/src/NServiceBus.Core.Tests/Recoverability/DelayedRetryExecutorTests.cs +++ /dev/null @@ -1,122 +0,0 @@ -namespace NServiceBus.Core.Tests.Recoverability -{ - using System; - using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using NServiceBus.Extensibility; - using NServiceBus.Transport; - using NUnit.Framework; - - [TestFixture] - public class DelayedRetryExecutorTests - { - [SetUp] - public void Setup() - { - dispatcher = new FakeDispatcher(); - } - - [Test] - public async Task Should_float_transport_transaction_to_dispatcher() - { - var delayedRetryExecutor = CreateExecutor(); - var errorContext = CreateErrorContext(); - - await delayedRetryExecutor.Retry(errorContext, TimeSpan.Zero); - - Assert.AreEqual(dispatcher.Transaction, errorContext.TransportTransaction); - } - - [Test] - public async Task When_native_delayed_delivery_should_add_delivery_constraint() - { - var delayedRetryExecutor = CreateExecutor(); - var errorContext = CreateErrorContext(); - var delay = TimeSpan.FromSeconds(42); - - await delayedRetryExecutor.Retry(errorContext, delay); - - var transportOperation = dispatcher.UnicastTransportOperations.Single(); - var deliveryConstraint = transportOperation.Properties.DelayDeliveryWith; - - Assert.AreEqual(transportOperation.Destination, errorContext.ReceiveAddress); - Assert.IsNotNull(deliveryConstraint); - Assert.AreEqual(delay, deliveryConstraint.Delay); - } - - [Test] - public async Task Should_update_retry_headers_when_present() - { - var delayedRetryExecutor = CreateExecutor(); - var originalHeadersTimestamp = DateTimeOffsetHelper.ToWireFormattedString(new DateTimeOffset(2012, 12, 12, 0, 0, 0, TimeSpan.Zero)); - - var errorContext = CreateErrorContext(new Dictionary - { - {Headers.DelayedRetries, "2"}, - {Headers.DelayedRetriesTimestamp, originalHeadersTimestamp} - }); - - var now = DateTimeOffset.UtcNow; - await delayedRetryExecutor.Retry(errorContext, TimeSpan.Zero); - - var incomingMessage = errorContext.Message; - - var outgoingMessageHeaders = dispatcher.UnicastTransportOperations.Single().Message.Headers; - - Assert.AreEqual("3", outgoingMessageHeaders[Headers.DelayedRetries]); - Assert.AreEqual("2", incomingMessage.Headers[Headers.DelayedRetries]); - - var utcDateTime = DateTimeOffsetHelper.ToDateTimeOffset(outgoingMessageHeaders[Headers.DelayedRetriesTimestamp]); - // the serialization removes precision which may lead to now being greater than the deserialized header value - var adjustedNow = DateTimeOffsetHelper.ToDateTimeOffset(DateTimeOffsetHelper.ToWireFormattedString(now)); - Assert.That(utcDateTime, Is.GreaterThanOrEqualTo(adjustedNow)); - Assert.AreEqual(originalHeadersTimestamp, incomingMessage.Headers[Headers.DelayedRetriesTimestamp]); - } - - [Test] - public async Task Should_add_retry_headers_when_not_present() - { - var delayedRetryExecutor = CreateExecutor(); - var errorContext = CreateErrorContext(); - - await delayedRetryExecutor.Retry(errorContext, TimeSpan.Zero); - - var outgoingMessageHeaders = dispatcher.TransportOperations.UnicastTransportOperations.Single().Message.Headers; - - Assert.AreEqual("1", outgoingMessageHeaders[Headers.DelayedRetries]); - Assert.IsFalse(errorContext.Message.Headers.ContainsKey(Headers.DelayedRetries)); - Assert.IsTrue(outgoingMessageHeaders.ContainsKey(Headers.DelayedRetriesTimestamp)); - Assert.IsFalse(errorContext.Message.Headers.ContainsKey(Headers.DelayedRetriesTimestamp)); - } - - DelayedRetryExecutor CreateExecutor() - { - return new DelayedRetryExecutor(dispatcher); - } - - ErrorContext CreateErrorContext(Dictionary headers = null) - { - return new ErrorContext(new Exception(), headers ?? new Dictionary(), "messageId", new byte[0], new TransportTransaction(), 0, "my-queue", new ContextBag()); - } - - FakeDispatcher dispatcher; - - class FakeDispatcher : IMessageDispatcher - { - public TransportOperations TransportOperations { get; private set; } - - public List UnicastTransportOperations => TransportOperations.UnicastTransportOperations; - - public TransportTransaction Transaction { get; private set; } - - public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken = default) - { - TransportOperations = outgoingMessages; - Transaction = transaction; - return Task.FromResult(0); - } - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Recoverability/DelayedRetryRecoverabilityActionTests.cs b/src/NServiceBus.Core.Tests/Recoverability/DelayedRetryRecoverabilityActionTests.cs new file mode 100644 index 00000000000..404039f1a24 --- /dev/null +++ b/src/NServiceBus.Core.Tests/Recoverability/DelayedRetryRecoverabilityActionTests.cs @@ -0,0 +1,84 @@ +namespace NServiceBus.Core.Tests.Recoverability +{ + using System; + using System.Collections.Generic; + using System.Linq; + using NServiceBus.Extensibility; + using NServiceBus.Routing; + using NServiceBus.Transport; + using NUnit.Framework; + using Testing; + + [TestFixture] + public class DelayedRetryRecoverabilityActionTests + { + [Test] + public void When_delay_message_retry() + { + var recoverabilityContext = CreateRecoverabilityContext(); + var delay = TimeSpan.FromSeconds(42); + var delayedRetryAction = new DelayedRetry(delay); + + var routingContext = delayedRetryAction.GetRoutingContexts(recoverabilityContext) + .Single(); + + var routingStrategy = routingContext.RoutingStrategies.Single() as UnicastRoutingStrategy; + + Assert.AreEqual(recoverabilityContext.ErrorContext.ReceiveAddress, (routingStrategy.Apply(new Dictionary()) as UnicastAddressTag).Destination); + Assert.AreEqual(delay, routingContext.Extensions.Get().DelayDeliveryWith.Delay); + Assert.AreEqual(ErrorHandleResult.Handled, delayedRetryAction.ErrorHandleResult); + } + + [Test] + public void Should_update_retry_headers_when_present() + { + var delayedRetryAction = new DelayedRetry(TimeSpan.Zero); + var originalHeadersTimestamp = DateTimeOffsetHelper.ToWireFormattedString(new DateTimeOffset(2012, 12, 12, 0, 0, 0, TimeSpan.Zero)); + + var recoverabilityContext = CreateRecoverabilityContext(new Dictionary + { + {Headers.DelayedRetries, "2"}, + {Headers.DelayedRetriesTimestamp, originalHeadersTimestamp} + }); + + var now = DateTimeOffset.UtcNow; + var routingContexts = delayedRetryAction.GetRoutingContexts(recoverabilityContext); + + var incomingMessage = recoverabilityContext.ErrorContext.Message; + + var outgoingMessageHeaders = routingContexts.Single().Message.Headers; + + Assert.AreEqual("3", outgoingMessageHeaders[Headers.DelayedRetries]); + Assert.AreEqual("2", incomingMessage.Headers[Headers.DelayedRetries]); + + var utcDateTime = DateTimeOffsetHelper.ToDateTimeOffset(outgoingMessageHeaders[Headers.DelayedRetriesTimestamp]); + // the serialization removes precision which may lead to now being greater than the deserialized header value + var adjustedNow = DateTimeOffsetHelper.ToDateTimeOffset(DateTimeOffsetHelper.ToWireFormattedString(now)); + Assert.That(utcDateTime, Is.GreaterThanOrEqualTo(adjustedNow)); + Assert.AreEqual(originalHeadersTimestamp, incomingMessage.Headers[Headers.DelayedRetriesTimestamp]); + } + + [Test] + public void Should_add_retry_headers_when_not_present() + { + var delayedRetryAction = new DelayedRetry(TimeSpan.Zero); + var recoverabilityContext = CreateRecoverabilityContext(); + + var routingContexts = delayedRetryAction.GetRoutingContexts(recoverabilityContext); + + var outgoingMessageHeaders = routingContexts.Single().Message.Headers; + + Assert.AreEqual("1", outgoingMessageHeaders[Headers.DelayedRetries]); + Assert.IsFalse(recoverabilityContext.ErrorContext.Message.Headers.ContainsKey(Headers.DelayedRetries)); + Assert.IsTrue(outgoingMessageHeaders.ContainsKey(Headers.DelayedRetriesTimestamp)); + Assert.IsFalse(recoverabilityContext.ErrorContext.Message.Headers.ContainsKey(Headers.DelayedRetriesTimestamp)); + } + + static TestableRecoverabilityContext CreateRecoverabilityContext(Dictionary headers = null) + { + var errorContext = new ErrorContext(new Exception(), headers ?? new Dictionary(), + "messageId", Array.Empty(), new TransportTransaction(), 0, "my-queue", new ContextBag()); + return new TestableRecoverabilityContext { ErrorContext = errorContext }; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Recoverability/FaultMetadataExtractorTests.cs b/src/NServiceBus.Core.Tests/Recoverability/FaultMetadataExtractorTests.cs new file mode 100644 index 00000000000..4e7768fc93c --- /dev/null +++ b/src/NServiceBus.Core.Tests/Recoverability/FaultMetadataExtractorTests.cs @@ -0,0 +1,104 @@ +namespace NServiceBus.Core.Tests.Recoverability +{ + using System; + using System.Collections.Generic; + using System.Runtime.CompilerServices; + using NServiceBus.Extensibility; + using NServiceBus.Faults; + using NServiceBus.Transport; + using NUnit.Framework; + + [TestFixture] + public class FaultMetadataExtractorTests + { + [Test] + public void VerifyExceptionHeadersAreSet() + { + var exception = GetAnException(); + + var extractor = new FaultMetadataExtractor(new Dictionary(), _ => { }); + + var metadata = extractor.Extract(CreateErrorContext(exception)); + + Assert.AreEqual("System.AggregateException", metadata["NServiceBus.ExceptionInfo.ExceptionType"]); + Assert.AreEqual(exception.ToString(), metadata["NServiceBus.ExceptionInfo.StackTrace"]); + Assert.IsTrue(metadata.ContainsKey("NServiceBus.TimeOfFailure")); + + Assert.AreEqual("System.Exception", metadata["NServiceBus.ExceptionInfo.InnerExceptionType"]); + Assert.AreEqual("A fake help link", metadata["NServiceBus.ExceptionInfo.HelpLink"]); + Assert.AreEqual("NServiceBus.Core.Tests", metadata["NServiceBus.ExceptionInfo.Source"]); + Assert.AreEqual("my-address", metadata[FaultsHeaderKeys.FailedQ]); + } + + [Test] + public void ExceptionMessageIsTruncated() + { + var exception = new Exception(new string('x', (int)Math.Pow(2, 15))); + var extractor = new FaultMetadataExtractor(new Dictionary(), _ => { }); + + var metadata = extractor.Extract(CreateErrorContext(exception)); + + Assert.AreEqual((int)Math.Pow(2, 14), metadata["NServiceBus.ExceptionInfo.Message"].Length); + } + + [Test] + public void ShouldApplyStaticMetadata() + { + var extractor = new FaultMetadataExtractor(new Dictionary { { "static-key", "some value" } }, _ => { }); + + var metadata = extractor.Extract(CreateErrorContext()); + + Assert.AreEqual("some value", metadata["static-key"]); + } + + [Test] + public void ShouldApplyCustomizations() + { + var extractor = new FaultMetadataExtractor(new Dictionary { { "static-key", "some value" } }, m => + { + m["static-key"] = "some other value"; + }); + + var metadata = extractor.Extract(CreateErrorContext()); + + Assert.AreEqual("some other value", metadata["static-key"]); + } + + ErrorContext CreateErrorContext(Exception exception = null) + { + return new ErrorContext(exception ?? GetAnException(), new Dictionary(), "some-id", new byte[0], new TransportTransaction(), 0, "my-address", new ContextBag()); + } + + Exception GetAnException() + { + try + { + MethodThatThrows1(); + } + catch (Exception e) + { + return e; + } + return null; + } + + [MethodImpl(MethodImplOptions.NoInlining)] + public void MethodThatThrows1() + { + try + { + MethodThatThrows2(); + } + catch (Exception exception) + { + throw new AggregateException("My Exception", exception) { HelpLink = "A fake help link" }; + } + } + + [MethodImpl(MethodImplOptions.NoInlining)] + void MethodThatThrows2() + { + throw new Exception("My Inner Exception"); + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Recoverability/MoveToErrorsExecutorTests.cs b/src/NServiceBus.Core.Tests/Recoverability/MoveToErrorsExecutorTests.cs index fa4d7077700..a21e16a5169 100644 --- a/src/NServiceBus.Core.Tests/Recoverability/MoveToErrorsExecutorTests.cs +++ b/src/NServiceBus.Core.Tests/Recoverability/MoveToErrorsExecutorTests.cs @@ -3,44 +3,34 @@ using System; using System.Collections.Generic; using System.Linq; - using System.Threading; - using System.Threading.Tasks; using NServiceBus.Extensibility; - using NServiceBus.Faults; + using NServiceBus.Routing; using NUnit.Framework; + using Testing; using Transport; [TestFixture] public class MoveToErrorsExecutorTests { - [SetUp] - public void Setup() - { - dispatcher = new FakeDispatcher(); - staticFaultMetadata = new Dictionary(); - moveToErrorsExecutor = new MoveToErrorsExecutor(dispatcher, staticFaultMetadata, headers => { }); - } - [Test] - public async Task MoveToErrorQueue_should_dispatch_message_to_error_queue() + public void MoveToErrorQueue_should_route_message_to_error_queue() { var customErrorQueue = "random_error_queue"; - var errorContext = CreateErrorContext(); + var recoverabilityContext = CreateRecoverabilityContext(); + var moveToErrorAction = new MoveToError(customErrorQueue); + var routingContext = moveToErrorAction.GetRoutingContexts(recoverabilityContext) + .Single(); - await moveToErrorsExecutor.MoveToErrorQueue(customErrorQueue, errorContext); + var addressTag = (UnicastAddressTag)((UnicastRoutingStrategy)routingContext.RoutingStrategies.Single()) + .Apply(new Dictionary()); - Assert.That(dispatcher.TransportOperations.MulticastTransportOperations.Count(), Is.EqualTo(0)); - Assert.That(dispatcher.TransportOperations.UnicastTransportOperations.Count(), Is.EqualTo(1)); - Assert.That(dispatcher.Transaction, Is.EqualTo(errorContext.TransportTransaction)); - - var outgoingMessage = dispatcher.TransportOperations.UnicastTransportOperations.Single(); - Assert.That(outgoingMessage.Destination, Is.EqualTo(customErrorQueue)); - Assert.That(outgoingMessage.Message.MessageId, Is.EqualTo(errorContext.Message.MessageId)); + Assert.AreEqual(customErrorQueue, addressTag.Destination); + Assert.AreEqual(ErrorHandleResult.Handled, moveToErrorAction.ErrorHandleResult); } [Test] - public async Task MoveToErrorQueue_should_preserve_incoming_message_headers() + public void MoveToErrorQueue_should_preserve_incoming_message_headers() { var incomingMessageHeaders = new Dictionary { @@ -48,16 +38,19 @@ public async Task MoveToErrorQueue_should_preserve_incoming_message_headers() {"key2", "value2"} }; - var errorContext = CreateErrorContext(messageHeaders: incomingMessageHeaders); + var recoverabilityContext = CreateRecoverabilityContext(messageHeaders: incomingMessageHeaders); + + var moveToErrorAction = new MoveToError(ErrorQueueAddress); + var routingContext = moveToErrorAction.GetRoutingContexts(recoverabilityContext) + .Single(); - await moveToErrorsExecutor.MoveToErrorQueue(ErrorQueueAddress, errorContext); + var outgoingMessageHeaders = routingContext.Message.Headers; - var outgoingMessage = dispatcher.TransportOperations.UnicastTransportOperations.Single(); - Assert.That(errorContext.Message.Headers, Is.SubsetOf(outgoingMessage.Message.Headers)); + Assert.That(recoverabilityContext.ErrorContext.Message.Headers, Is.SubsetOf(outgoingMessageHeaders)); } [Test] - public async Task MoveToErrorQueue_should_remove_known_retry_headers() + public void MoveToErrorQueue_should_remove_known_retry_headers() { var retryHeaders = new Dictionary { @@ -65,101 +58,49 @@ public async Task MoveToErrorQueue_should_remove_known_retry_headers() {Headers.DelayedRetries, "21"} }; - var errorContext = CreateErrorContext(messageHeaders: retryHeaders); - - await moveToErrorsExecutor.MoveToErrorQueue(ErrorQueueAddress, errorContext); - - var outgoingMessage = dispatcher.TransportOperations.UnicastTransportOperations.Single(); - Assert.That(outgoingMessage.Message.Headers.Keys, Does.Not.Contain(Headers.ImmediateRetries)); - Assert.That(outgoingMessage.Message.Headers.Keys, Does.Not.Contain(Headers.DelayedRetries)); - } - - [Test] - public async Task MoveToErrorQueue_should_add_exception_headers() - { - var exception = new InvalidOperationException("test exception"); - var errorContext = CreateErrorContext(raisedException: exception); - - await moveToErrorsExecutor.MoveToErrorQueue(ErrorQueueAddress, errorContext); - - var outgoingMessageHeaders = dispatcher.TransportOperations.UnicastTransportOperations.Single().Message.Headers; - // we only test presence of some exception headers set by ExceptionHeaderHelper - Assert.That(outgoingMessageHeaders, Contains.Key("NServiceBus.ExceptionInfo.ExceptionType")); - Assert.That(outgoingMessageHeaders, Contains.Key("NServiceBus.ExceptionInfo.Message")); - Assert.That(outgoingMessageHeaders, Contains.Key("NServiceBus.ExceptionInfo.StackTrace")); - // check for leaking headers - Assert.That(errorContext.Message.Headers.ContainsKey("NServiceBus.ExceptionInfo.ExceptionType"), Is.False); - } - - - [Test] - public async Task MoveToErrorQueue_should_add_failed_queue_header() - { - var errorContext = CreateErrorContext(); - - await moveToErrorsExecutor.MoveToErrorQueue(ErrorQueueAddress, errorContext); + var recoverabilityContext = CreateRecoverabilityContext(messageHeaders: retryHeaders); - var outgoingMessageHeaders = dispatcher.TransportOperations.UnicastTransportOperations.Single().Message.Headers; + var moveToErrorAction = new MoveToError(ErrorQueueAddress); + var transportOperation = moveToErrorAction.GetRoutingContexts(recoverabilityContext) + .Single(); + var outgoingMessageHeaders = transportOperation.Message.Headers; - Assert.That(outgoingMessageHeaders, Contains.Key(FaultsHeaderKeys.FailedQ)); - Assert.AreEqual(outgoingMessageHeaders[FaultsHeaderKeys.FailedQ], ReceiveAddress); + Assert.That(outgoingMessageHeaders.Keys, Does.Not.Contain(Headers.ImmediateRetries)); + Assert.That(outgoingMessageHeaders.Keys, Does.Not.Contain(Headers.DelayedRetries)); } [Test] - public async Task MoveToErrorQueue_should_add_static_fault_info_to_headers() + public void MoveToErrorQueue_should_add_metadata_to_headers() { - staticFaultMetadata.Add("staticFaultMetadataKey", "staticFaultMetadataValue"); + var recoverabilityContext = CreateRecoverabilityContext(metadata: new Dictionary { { "staticFaultMetadataKey", "staticFaultMetadataValue" } }); - var errorContext = CreateErrorContext(); + var moveToErrorAction = new MoveToError(ErrorQueueAddress); + var transportOperation = moveToErrorAction.GetRoutingContexts(recoverabilityContext) + .Single(); + var outgoingMessageHeaders = transportOperation.Message.Headers; - await moveToErrorsExecutor.MoveToErrorQueue(ErrorQueueAddress, errorContext); - - var outgoingMessageHeaders = dispatcher.TransportOperations.UnicastTransportOperations.Single().Message.Headers; Assert.That(outgoingMessageHeaders, Contains.Item(new KeyValuePair("staticFaultMetadataKey", "staticFaultMetadataValue"))); // check for leaking headers - Assert.That(errorContext.Message.Headers.ContainsKey("staticFaultMetadataKey"), Is.False); - } - - [Test] - public async Task MoveToErrorQueue_should_apply_header_customizations_before_dispatch() - { - staticFaultMetadata.Add("staticFaultMetadataKey", "staticFaultMetadataValue"); - var exception = new InvalidOperationException("test exception"); - - Dictionary passedInHeaders = null; - moveToErrorsExecutor = new MoveToErrorsExecutor(dispatcher, staticFaultMetadata, headers => { passedInHeaders = headers; }); - - await moveToErrorsExecutor.MoveToErrorQueue(ErrorQueueAddress, CreateErrorContext(raisedException: exception)); - - Assert.NotNull(passedInHeaders); - Assert.That(passedInHeaders, Contains.Key("staticFaultMetadataKey")); - Assert.That(passedInHeaders, Contains.Key("NServiceBus.ExceptionInfo.Message")); + Assert.That(recoverabilityContext.ErrorContext.Message.Headers.ContainsKey("staticFaultMetadataKey"), Is.False); } - static ErrorContext CreateErrorContext(Exception raisedException = null, string exceptionMessage = "default-message", string messageId = "default-id", int numberOfDeliveryAttempts = 1, Dictionary messageHeaders = default) + static TestableRecoverabilityContext CreateRecoverabilityContext(Exception raisedException = null, string exceptionMessage = "default-message", string messageId = "default-id", int numberOfDeliveryAttempts = 1, Dictionary messageHeaders = default, Dictionary metadata = default) { - return new ErrorContext(raisedException ?? new Exception(exceptionMessage), messageHeaders ?? new Dictionary(), messageId, new byte[0], new TransportTransaction(), numberOfDeliveryAttempts, ReceiveAddress, new ContextBag()); + var errorContext = new ErrorContext(raisedException ?? new Exception(exceptionMessage), + messageHeaders ?? new Dictionary(), messageId, Array.Empty(), + new TransportTransaction(), numberOfDeliveryAttempts, ReceiveAddress, new ContextBag()); + var recoverabilityContext = new TestableRecoverabilityContext + { + ErrorContext = errorContext, + }; + if (metadata != default) + { + recoverabilityContext.Metadata = metadata; + } + return recoverabilityContext; } - - MoveToErrorsExecutor moveToErrorsExecutor; - FakeDispatcher dispatcher; - Dictionary staticFaultMetadata; const string ErrorQueueAddress = "errorQ"; const string ReceiveAddress = "my-endpoint"; - - class FakeDispatcher : IMessageDispatcher - { - public TransportOperations TransportOperations { get; private set; } - - public TransportTransaction Transaction { get; private set; } - - public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken = default) - { - TransportOperations = outgoingMessages; - Transaction = transaction; - return Task.FromResult(0); - } - } } } \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Recoverability/RecoverabilityExecutorTests.cs b/src/NServiceBus.Core.Tests/Recoverability/RecoverabilityExecutorTests.cs index 0559d65d647..acb399a2032 100644 --- a/src/NServiceBus.Core.Tests/Recoverability/RecoverabilityExecutorTests.cs +++ b/src/NServiceBus.Core.Tests/Recoverability/RecoverabilityExecutorTests.cs @@ -2,280 +2,25 @@ { using System; using System.Collections.Generic; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; using NServiceBus.Extensibility; using NUnit.Framework; + using Testing; using Transport; [TestFixture] - public class RecoverabilityExecutorTests + public class DiscardRecoverabilityActionTests { - [SetUp] - public void SetUp() - { - dispatcher = new FakeDispatcher(); - } - - [Test] - public async Task When_notification_turned_off_no_notification_should_be_raised() - { - var policy = RetryPolicy.Return( - actions: new RecoverabilityAction[] - { - RecoverabilityAction.ImmediateRetry(), - RecoverabilityAction.DelayedRetry(TimeSpan.FromSeconds(10)), - RecoverabilityAction.MoveToError("errorQueue") - }); - var executor = CreateExecutor(policy, raiseNotifications: false); - var errorContext = CreateErrorContext(); - - await executor.Invoke(errorContext); //force retry - await executor.Invoke(errorContext); //force delayed retry - await executor.Invoke(errorContext); //force move to errors - - Assert.IsEmpty(messageRetriedNotifications); - Assert.IsEmpty(messageFaultedNotifications); - } - - [Test] - public async Task When_failure_is_handled_with_immediate_retries_notification_should_be_raised() - { - var recoverabilityExecutor = CreateExecutor(RetryPolicy.AlwaysRetry()); - var errorContext = CreateErrorContext(numberOfDeliveryAttempts: 1, exceptionMessage: "test", messageId: "message-id"); - - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageRetriedNotifications.Single(); - - Assert.AreEqual(0, failure.Attempt); - Assert.IsTrue(failure.IsImmediateRetry); - Assert.AreEqual("test", failure.Exception.Message); - Assert.AreEqual("message-id", failure.Message.MessageId); - } - - [Test] - public async Task When_failure_is_handled_with_delayed_retries_notification_should_be_raised() - { - var recoverabilityExecutor = CreateExecutor(RetryPolicy.AlwaysDelay(TimeSpan.FromSeconds(10))); - var errorContext = CreateErrorContext(numberOfDeliveryAttempts: 1, exceptionMessage: "test", messageId: "message-id"); - - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageRetriedNotifications.Single(); - - Assert.AreEqual(1, failure.Attempt); - Assert.IsFalse(failure.IsImmediateRetry); - Assert.AreEqual("test", failure.Exception.Message); - Assert.AreEqual("message-id", failure.Message.MessageId); - } - - [Test] - public async Task When_failure_is_handled_by_moving_to_errors_notification_should_be_raised() - { - var recoverabilityExecutor = CreateExecutor(RetryPolicy.AlwaysMoveToErrors()); - var errorContext = CreateErrorContext(exceptionMessage: "test", messageId: "message-id"); - - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageFaultedNotifications.Single(); - - Assert.AreEqual("test", failure.Exception.Message); - Assert.AreEqual("message-id", failure.Message.MessageId); - } - - [Test] - public async Task When_delayed_retries_not_supported_but_policy_demands_it_should_move_to_errors() - { - var recoverabilityExecutor = CreateExecutor( - RetryPolicy.AlwaysDelay(TimeSpan.FromDays(1)), - delayedRetriesSupported: false); - var errorContext = CreateErrorContext(messageId: "message-id"); - - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageFaultedNotifications.Single(); - - Assert.IsEmpty(messageRetriedNotifications); - Assert.AreEqual("message-id", failure.Message.MessageId); - } - - [Test] - public async Task When_immediate_retries_not_supported_but_policy_demands_it_should_move_to_errors() - { - var recoverabilityExecutor = CreateExecutor( - RetryPolicy.AlwaysRetry(), - immediateRetriesSupported: false); - var errorContext = CreateErrorContext(messageId: "message-id"); - - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageFaultedNotifications.Single(); - - Assert.IsEmpty(messageRetriedNotifications); - Assert.AreEqual("message-id", failure.Message.MessageId); - } - [Test] - public async Task When_unsupported_action_returned_should_move_to_errors() + public void Discard_action_should_discard_message() { - var recoverabilityExecutor = CreateExecutor( - RetryPolicy.Unsupported()); - var errorContext = CreateErrorContext(messageId: "message-id"); + var discardAction = new Discard("not needed anymore"); + var errorContext = new ErrorContext(new Exception(""), new Dictionary(), "some-id", Array.Empty(), new TransportTransaction(), 1, "my-endpoint", new ContextBag()); + var actionContext = new TestableRecoverabilityContext { ErrorContext = errorContext }; - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageFaultedNotifications.Single(); - - Assert.IsEmpty(messageRetriedNotifications); - Assert.AreEqual("message-id", failure.Message.MessageId); - } - - [Test] - public async Task When_discard_action_returned_should_discard_message() - { - var recoverabilityExecutor = CreateExecutor( - RetryPolicy.Discard("not needed anymore")); - var errorContext = CreateErrorContext(messageId: "message-id"); - - var result = await recoverabilityExecutor.Invoke(errorContext); - - Assert.AreEqual(ErrorHandleResult.Handled, result); - Assert.IsEmpty(messageRetriedNotifications); - Assert.IsEmpty(messageFaultedNotifications); - } - - [Test] - public async Task When_moving_to_custom_error_queue_custom_error_queue_address_should_be_set_on_notification() - { - var customErrorQueueAddress = "custom-error-queue"; - var recoverabilityExecutor = CreateExecutor(RetryPolicy.AlwaysMoveToErrors(customErrorQueueAddress)); - var errorContext = CreateErrorContext(); - - await recoverabilityExecutor.Invoke(errorContext); - - var failure = messageFaultedNotifications.Single(); - - Assert.IsEmpty(messageRetriedNotifications); - Assert.AreEqual(customErrorQueueAddress, failure.ErrorQueue); - } - - static ErrorContext CreateErrorContext(Exception raisedException = null, string exceptionMessage = "default-message", string messageId = "default-id", int numberOfDeliveryAttempts = 1) - { - return new ErrorContext(raisedException ?? new Exception(exceptionMessage), new Dictionary(), messageId, new byte[0], new TransportTransaction(), numberOfDeliveryAttempts, "my-endpoint", new ContextBag()); - } - - RecoverabilityExecutor CreateExecutor(Func policy, bool delayedRetriesSupported = true, bool immediateRetriesSupported = true, bool raiseNotifications = true) - { - messageRetriedNotifications = new List(); - var messageRetryNotification = new Notification(); - messageRetryNotification.Subscribe((e, _) => - { - messageRetriedNotifications.Add(e); - return Task.FromResult(0); - }); - - messageFaultedNotifications = new List(); - var messageFaultedNotification = new Notification(); - messageFaultedNotification.Subscribe((e, _) => - { - messageFaultedNotifications.Add(e); - return Task.FromResult(0); - }); - - return new RecoverabilityExecutor( - raiseNotifications, - immediateRetriesSupported, - delayedRetriesSupported, - policy, - new RecoverabilityConfig(new ImmediateConfig(0), new DelayedConfig(0, TimeSpan.Zero), new FailedConfig(ErrorQueueAddress, new HashSet())), - delayedRetriesSupported ? new DelayedRetryExecutor(dispatcher) : null, - new MoveToErrorsExecutor(dispatcher, new Dictionary(), headers => { }), - messageRetryNotification, - messageFaultedNotification); - } - - FakeDispatcher dispatcher; - - List messageRetriedNotifications; - List messageFaultedNotifications; - - static string ErrorQueueAddress = "error-queue"; - - class RetryPolicy - { - RetryPolicy(RecoverabilityAction[] actions) - { - this.actions = new Queue(actions); - } - - public RecoverabilityAction Invoke(RecoverabilityConfig config, ErrorContext errorContext) - { - return actions.Dequeue(); - } - - public static Func AlwaysDelay(TimeSpan delay) - { - return new RetryPolicy(new[] - { - RecoverabilityAction.DelayedRetry(delay) - }).Invoke; - } - - public static Func AlwaysMoveToErrors(string errorQueueAddress = "errorQueue") - { - return new RetryPolicy(new[] - { - RecoverabilityAction.MoveToError(errorQueueAddress) - }).Invoke; - } - - public static Func AlwaysRetry() - { - return new RetryPolicy(new[] - { - RecoverabilityAction.ImmediateRetry() - }).Invoke; - } - - public static Func Return(RecoverabilityAction[] actions) - { - return new RetryPolicy(actions).Invoke; - } - - public static Func Unsupported() - { - return new RetryPolicy(new[] - { - new UnsupportedAction() - }).Invoke; - } - - public static Func Discard(string reason) - { - return new RetryPolicy(new[] - { - new Discard(reason), - }).Invoke; - } - - Queue actions; - } - - class UnsupportedAction : RecoverabilityAction - { - } - - class FakeDispatcher : IMessageDispatcher - { - public TransportOperations TransportOperations { get; private set; } + var routingContexts = discardAction.GetRoutingContexts(actionContext); - public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, CancellationToken cancellationToken = default) - { - TransportOperations = outgoingMessages; - return Task.CompletedTask; - } + CollectionAssert.IsEmpty(routingContexts); + Assert.AreEqual(discardAction.ErrorHandleResult, ErrorHandleResult.Handled); } } } \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Recoverability/TransportCapabilityAdjustmentTests.cs b/src/NServiceBus.Core.Tests/Recoverability/TransportCapabilityAdjustmentTests.cs new file mode 100644 index 00000000000..de7eae94c3a --- /dev/null +++ b/src/NServiceBus.Core.Tests/Recoverability/TransportCapabilityAdjustmentTests.cs @@ -0,0 +1,31 @@ +namespace NServiceBus.Core.Tests.Recoverability +{ + using System; + using NUnit.Framework; + + [TestFixture] + public class TransportCapabilityAdjustmentTests + { + [Test] + public void When_delayed_retries_not_supported_but_policy_demands_it_should_move_to_errors() + { + var recoverabilityAction = RecoverabilityComponent.AdjustForTransportCapabilities(ErrorQueueAddress, false, false, RecoverabilityAction.DelayedRetry(TimeSpan.FromSeconds(1))); + + var errorAction = recoverabilityAction as MoveToError; + Assert.NotNull(errorAction); + Assert.AreEqual(ErrorQueueAddress, errorAction.ErrorQueue); + } + + [Test] + public void When_immediate_retries_not_supported_but_policy_demands_it_should_move_to_errors() + { + var recoverabilityAction = RecoverabilityComponent.AdjustForTransportCapabilities(ErrorQueueAddress, false, false, RecoverabilityAction.ImmediateRetry()); + + var errorAction = recoverabilityAction as MoveToError; + Assert.NotNull(errorAction); + Assert.AreEqual(ErrorQueueAddress, errorAction.ErrorQueue); + } + + static string ErrorQueueAddress = "error-queue"; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core.Tests/Utils/ExceptionHeaderHelperTests.cs b/src/NServiceBus.Core.Tests/Utils/ExceptionHeaderHelperTests.cs deleted file mode 100644 index 538cbf02f15..00000000000 --- a/src/NServiceBus.Core.Tests/Utils/ExceptionHeaderHelperTests.cs +++ /dev/null @@ -1,71 +0,0 @@ -namespace NServiceBus.Core.Tests.Utils -{ - using System; - using System.Collections.Generic; - using System.Runtime.CompilerServices; - using NUnit.Framework; - - [TestFixture] - public class ExceptionHeaderHelperTests - { - [Test] - public void VerifyHeadersAreSet() - { - var exception = GetAnException(); - var dictionary = new Dictionary(); - - ExceptionHeaderHelper.SetExceptionHeaders(dictionary, exception); - - Assert.AreEqual("System.AggregateException", dictionary["NServiceBus.ExceptionInfo.ExceptionType"]); - Assert.AreEqual(exception.ToString(), dictionary["NServiceBus.ExceptionInfo.StackTrace"]); - Assert.IsTrue(dictionary.ContainsKey("NServiceBus.TimeOfFailure")); - - Assert.AreEqual("System.Exception", dictionary["NServiceBus.ExceptionInfo.InnerExceptionType"]); - Assert.AreEqual("A fake help link", dictionary["NServiceBus.ExceptionInfo.HelpLink"]); - Assert.AreEqual("NServiceBus.Core.Tests", dictionary["NServiceBus.ExceptionInfo.Source"]); - } - - Exception GetAnException() - { - try - { - MethodThatThrows1(); - } - catch (Exception e) - { - return e; - } - return null; - } - - [MethodImpl(MethodImplOptions.NoInlining)] - public void MethodThatThrows1() - { - try - { - MethodThatThrows2(); - } - catch (Exception exception) - { - throw new AggregateException("My Exception", exception) { HelpLink = "A fake help link" }; - } - } - - [MethodImpl(MethodImplOptions.NoInlining)] - void MethodThatThrows2() - { - throw new Exception("My Inner Exception"); - } - - [Test] - public void ExceptionMessageIsTruncated() - { - var exception = new Exception(new string('x', (int)Math.Pow(2, 15))); - var dictionary = new Dictionary(); - - ExceptionHeaderHelper.SetExceptionHeaders(dictionary, exception); - - Assert.AreEqual((int)Math.Pow(2, 14), dictionary["NServiceBus.ExceptionInfo.Message"].Length); - } - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core/EndpointCreator.cs b/src/NServiceBus.Core/EndpointCreator.cs index 3efb0b556b2..9564e279eed 100644 --- a/src/NServiceBus.Core/EndpointCreator.cs +++ b/src/NServiceBus.Core/EndpointCreator.cs @@ -60,7 +60,11 @@ void Configure() featureComponent.Initalize(featureConfigurationContext); - recoverabilityComponent.Initialize(receiveConfiguration, hostingConfiguration, transportSeam); + recoverabilityComponent.Initialize( + receiveConfiguration, + hostingConfiguration, + transportSeam, + pipelineSettings); var routingComponent = RoutingComponent.Initialize( routingConfiguration, diff --git a/src/NServiceBus.Core/Notifications/CompositeNotification.cs b/src/NServiceBus.Core/Notifications/CompositeNotification.cs new file mode 100644 index 00000000000..f5ca5af3fdd --- /dev/null +++ b/src/NServiceBus.Core/Notifications/CompositeNotification.cs @@ -0,0 +1,50 @@ +namespace NServiceBus +{ + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + + class CompositeNotification + { + public void Register(INotificationSubscriptions notification) => + notifications.Add(new Notifier(notification)); + + // Currently we only have class notifications and no structs or records so object is fine. + public Task Raise(object @event, CancellationToken cancellationToken = default) + { + if (@event is null) + { + return Task.CompletedTask; + } + + for (int i = 0; i < notifications.Count; i++) + { + if (notifications[i].Handle(@event)) + { + return notifications[i].Raise(@event, cancellationToken); + } + } + return Task.CompletedTask; + } + + List notifications = new List(); + + interface INotifier + { + bool Handle(object @event); + Task Raise(object @event, CancellationToken cancellationToken = default); + } + + class Notifier : INotifier + { + public Notifier(INotificationSubscriptions notifier) => this.notifier = notifier; + + public bool Handle(object @event) => @event is TEvent; + + public Task Raise(object @event, CancellationToken cancellationToken = default) => + notifier.Raise((TEvent)@event, cancellationToken); + + readonly INotificationSubscriptions notifier; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Notifications/Notification.cs b/src/NServiceBus.Core/Notifications/Notification.cs index 3425cf56ac0..4f8fea2ff74 100644 --- a/src/NServiceBus.Core/Notifications/Notification.cs +++ b/src/NServiceBus.Core/Notifications/Notification.cs @@ -8,16 +8,11 @@ class Notification : INotificationSubscriptions { - public void Subscribe(Func subscription) - { - subscriptions.Add(subscription); - } + public void Subscribe(Func subscription) => subscriptions.Add(subscription); - List> subscriptions = new List>(); + Task INotificationSubscriptions.Raise(TEvent @event, CancellationToken cancellationToken) => + Task.WhenAll(subscriptions.Select(s => s.Invoke(@event, cancellationToken))); - Task INotificationSubscriptions.Raise(TEvent @event, CancellationToken cancellationToken) - { - return Task.WhenAll(subscriptions.Select(s => s.Invoke(@event, cancellationToken))); - } + List> subscriptions = new List>(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs new file mode 100644 index 00000000000..3fcb435da15 --- /dev/null +++ b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingContextExtensions.cs @@ -0,0 +1,23 @@ +namespace NServiceBus +{ + using Pipeline; + using Routing; + using Transport; + + static class RoutingContextExtensions + { + public static TransportOperation ToTransportOperation(this IRoutingContext context, RoutingStrategy strategy, DispatchConsistency dispatchConsistency) + { + var addressLabel = strategy.Apply(context.Message.Headers); + var message = new OutgoingMessage(context.Message.MessageId, context.Message.Headers, context.Message.Body); + + if (!context.Extensions.TryGet(out DispatchProperties dispatchProperties)) + { + dispatchProperties = new DispatchProperties(); + } + + var transportOperation = new TransportOperation(message, addressLabel, dispatchProperties, dispatchConsistency); + return transportOperation; + } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs index 15b85c57951..7729bde974c 100644 --- a/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs +++ b/src/NServiceBus.Core/Pipeline/Outgoing/RoutingToDispatchConnector.cs @@ -19,15 +19,7 @@ public override Task Invoke(IRoutingContext context, Func(builder); var mainPipelineExecutor = new MainPipelineExecutor(builder, pipelineCache, messageOperations, configuration.PipelineCompletedSubscribers, receivePipeline); - var recoverabilityExecutorFactory = recoverabilityComponent.GetRecoverabilityExecutorFactory(builder); - var recoverability = recoverabilityExecutorFactory - .CreateDefault(); + var recoverabilityPipelineExecutor = recoverabilityComponent.CreateRecoverabilityPipelineExecutor( + builder, + pipelineCache, + pipelineComponent, + messageOperations); + + await mainPump.Initialize( + configuration.PushRuntimeSettings, + mainPipelineExecutor.Invoke, + recoverabilityPipelineExecutor.Invoke, + cancellationToken).ConfigureAwait(false); - await mainPump.Initialize(configuration.PushRuntimeSettings, mainPipelineExecutor.Invoke, - recoverability.Invoke, cancellationToken).ConfigureAwait(false); receivers.Add(mainPump); if (transportInfrastructure.Receivers.TryGetValue(InstanceSpecificReceiverId, out var instanceSpecificPump)) { var instancePump = CreateReceiver(consecutiveFailuresConfiguration, instanceSpecificPump); - var instanceSpecificRecoverabilityExecutor = recoverabilityExecutorFactory.CreateDefault(); - await instancePump.Initialize(configuration.PushRuntimeSettings, mainPipelineExecutor.Invoke, - instanceSpecificRecoverabilityExecutor.Invoke, cancellationToken).ConfigureAwait(false); + await instancePump.Initialize( + configuration.PushRuntimeSettings, + mainPipelineExecutor.Invoke, + recoverabilityPipelineExecutor.Invoke, + cancellationToken).ConfigureAwait(false); receivers.Add(instancePump); } @@ -185,10 +193,15 @@ await instancePump.Initialize(configuration.PushRuntimeSettings, mainPipelineExe { var satellitePump = CreateReceiver(consecutiveFailuresConfiguration, transportInfrastructure.Receivers[satellite.Name]); var satellitePipeline = new SatellitePipelineExecutor(builder, satellite); - var satelliteRecoverabilityExecutor = recoverabilityExecutorFactory.Create(satellite.RecoverabilityPolicy); + var satelliteRecoverabilityExecutor = recoverabilityComponent.CreateSatelliteRecoverabilityExecutor(builder, satellite.RecoverabilityPolicy); + + await satellitePump.Initialize( + satellite.RuntimeSettings, + satellitePipeline.Invoke, + satelliteRecoverabilityExecutor.Invoke, + cancellationToken) + .ConfigureAwait(false); - await satellitePump.Initialize(satellite.RuntimeSettings, satellitePipeline.Invoke, - satelliteRecoverabilityExecutor.Invoke, cancellationToken).ConfigureAwait(false); receivers.Add(satellitePump); } catch (Exception ex) when (!ex.IsCausedBy(cancellationToken)) diff --git a/src/NServiceBus.Core/Recoverability/DelayedRetry.cs b/src/NServiceBus.Core/Recoverability/DelayedRetry.cs index 33b1c00ee0f..b89c760bbb6 100644 --- a/src/NServiceBus.Core/Recoverability/DelayedRetry.cs +++ b/src/NServiceBus.Core/Recoverability/DelayedRetry.cs @@ -1,20 +1,66 @@ namespace NServiceBus { using System; + using System.Collections.Generic; + using DelayedDelivery; + using Logging; + using Pipeline; + using Recoverability; + using Routing; + using Transport; /// /// Indicates recoverability is required to delay retry the current message. /// - public sealed class DelayedRetry : RecoverabilityAction + public class DelayedRetry : RecoverabilityAction { - internal DelayedRetry(TimeSpan delay) - { - Delay = delay; - } + /// + /// Creates the action with the requested delay. + /// + protected internal DelayedRetry(TimeSpan delay) => Delay = delay; /// /// The retry delay. /// public TimeSpan Delay { get; } + + /// + /// The ErrorHandleResult that should be passed to the transport. + /// + public override ErrorHandleResult ErrorHandleResult => ErrorHandleResult.Handled; + + /// + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + var errorContext = context.ErrorContext; + var message = errorContext.Message; + + Logger.Warn($"Delayed Retry will reschedule message '{message.MessageId}' after a delay of {Delay} because of an exception:", errorContext.Exception); + + var outgoingMessage = new OutgoingMessage(message.MessageId, new Dictionary(message.Headers), message.Body); + + var currentDelayedRetriesAttempt = message.GetDelayedDeliveriesPerformed() + 1; + + if (context is IRecoverabilityActionContextNotifications notifications) + { + notifications.Add(new MessageToBeRetried( + attempt: currentDelayedRetriesAttempt, + delay: Delay, + immediateRetry: false, + errorContext: errorContext)); + } + + outgoingMessage.SetCurrentDelayedDeliveries(currentDelayedRetriesAttempt); + outgoingMessage.SetDelayedDeliveryTimestamp(DateTimeOffset.UtcNow); + + var routingContext = context.CreateRoutingContext(outgoingMessage, new UnicastRoutingStrategy(errorContext.ReceiveAddress)); + routingContext.Extensions.Set(new DispatchProperties + { + DelayDeliveryWith = new DelayDeliveryWith(Delay) + }); + return new[] { routingContext }; + } + + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/DelayedRetryExecutor.cs b/src/NServiceBus.Core/Recoverability/DelayedRetryExecutor.cs deleted file mode 100644 index d60b52d2334..00000000000 --- a/src/NServiceBus.Core/Recoverability/DelayedRetryExecutor.cs +++ /dev/null @@ -1,43 +0,0 @@ -namespace NServiceBus -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using DelayedDelivery; - using Routing; - using Transport; - - class DelayedRetryExecutor - { - public DelayedRetryExecutor(IMessageDispatcher dispatcher) - { - this.dispatcher = dispatcher; - } - - public async Task Retry(ErrorContext errorContext, TimeSpan delay, CancellationToken cancellationToken = default) - { - var message = errorContext.Message; - var outgoingMessage = new OutgoingMessage(message.MessageId, new Dictionary(message.Headers), message.Body); - - var currentDelayedRetriesAttempt = message.GetDelayedDeliveriesPerformed() + 1; - - outgoingMessage.SetCurrentDelayedDeliveries(currentDelayedRetriesAttempt); - outgoingMessage.SetDelayedDeliveryTimestamp(DateTimeOffset.UtcNow); - - var dispatchProperties = new DispatchProperties - { - DelayDeliveryWith = new DelayDeliveryWith(delay) - }; - var messageDestination = new UnicastAddressTag(errorContext.ReceiveAddress); - - var transportOperations = new TransportOperations(new TransportOperation(outgoingMessage, messageDestination, dispatchProperties)); - - await dispatcher.Dispatch(transportOperations, errorContext.TransportTransaction, cancellationToken).ConfigureAwait(false); - - return currentDelayedRetriesAttempt; - } - - IMessageDispatcher dispatcher; - } -} diff --git a/src/NServiceBus.Core/Recoverability/Discard.cs b/src/NServiceBus.Core/Recoverability/Discard.cs index ddce2ef52e6..a090e6ab9d5 100644 --- a/src/NServiceBus.Core/Recoverability/Discard.cs +++ b/src/NServiceBus.Core/Recoverability/Discard.cs @@ -1,11 +1,20 @@ namespace NServiceBus { + using System; + using System.Collections.Generic; + using Logging; + using Transport; + using Pipeline; + /// /// Indicates recoverability is required to discard/ignore the current message. /// - public sealed class Discard : RecoverabilityAction + public class Discard : RecoverabilityAction { - internal Discard(string reason) + /// + /// Creates the action with the stated reason. + /// + public Discard(string reason) { Reason = reason; } @@ -14,5 +23,20 @@ internal Discard(string reason) /// The reason why a message was discarded. /// public string Reason { get; } + + /// + /// How to handle the message from a transport perspective. + /// + public override ErrorHandleResult ErrorHandleResult => ErrorHandleResult.Handled; + + /// + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + var errorContext = context.ErrorContext; + Logger.Info($"Discarding message with id '{errorContext.Message.MessageId}'. Reason: {Reason}", errorContext.Exception); + return Array.Empty(); + } + + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Utils/ExceptionHeaderHelper.cs b/src/NServiceBus.Core/Recoverability/FaultMetadataExtractor.cs similarity index 52% rename from src/NServiceBus.Core/Utils/ExceptionHeaderHelper.cs rename to src/NServiceBus.Core/Recoverability/FaultMetadataExtractor.cs index c6f2fa209ef..c25cae0f924 100644 --- a/src/NServiceBus.Core/Utils/ExceptionHeaderHelper.cs +++ b/src/NServiceBus.Core/Recoverability/FaultMetadataExtractor.cs @@ -3,10 +3,32 @@ using System; using System.Collections; using System.Collections.Generic; + using NServiceBus.Faults; + using NServiceBus.Transport; - static class ExceptionHeaderHelper + class FaultMetadataExtractor { - public static void SetExceptionHeaders(Dictionary headers, Exception e) + public FaultMetadataExtractor(Dictionary staticFaultMetadata, Action> headerCustomizations) + { + this.staticFaultMetadata = staticFaultMetadata; + this.headerCustomizations = headerCustomizations; + } + + public Dictionary Extract(ErrorContext errorContext) + { + var metadata = new Dictionary(staticFaultMetadata) + { + [FaultsHeaderKeys.FailedQ] = errorContext.ReceiveAddress + }; + + SetExceptionMetadata(metadata, errorContext.Exception); + + headerCustomizations(metadata); + + return metadata; + } + + static void SetExceptionMetadata(Dictionary headers, Exception e) { headers["NServiceBus.ExceptionInfo.ExceptionType"] = e.GetType().FullName; @@ -16,7 +38,7 @@ public static void SetExceptionHeaders(Dictionary headers, Excep } headers["NServiceBus.ExceptionInfo.HelpLink"] = e.HelpLink; - headers["NServiceBus.ExceptionInfo.Message"] = e.GetMessage().Truncate(16384); + headers["NServiceBus.ExceptionInfo.Message"] = Truncate(e.GetMessage(), 16384); headers["NServiceBus.ExceptionInfo.Source"] = e.Source; headers["NServiceBus.ExceptionInfo.StackTrace"] = e.ToString(); headers["NServiceBus.TimeOfFailure"] = DateTimeOffsetHelper.ToWireFormattedString(DateTimeOffset.UtcNow); @@ -25,9 +47,7 @@ public static void SetExceptionHeaders(Dictionary headers, Excep { return; } -#pragma warning disable DE0006 foreach (DictionaryEntry entry in e.Data) -#pragma warning restore DE0006 { if (entry.Value == null) { @@ -37,11 +57,15 @@ public static void SetExceptionHeaders(Dictionary headers, Excep } } - static string Truncate(this string value, int maxLength) => + static string Truncate(string value, int maxLength) => string.IsNullOrEmpty(value) ? value : (value.Length <= maxLength ? value : value.Substring(0, maxLength)); + + + readonly Dictionary staticFaultMetadata; + readonly Action> headerCustomizations; } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/IRecoverabilityActionContext.cs b/src/NServiceBus.Core/Recoverability/IRecoverabilityActionContext.cs new file mode 100644 index 00000000000..3e2a3243601 --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/IRecoverabilityActionContext.cs @@ -0,0 +1,21 @@ +namespace NServiceBus.Pipeline +{ + using System.Collections.Generic; + using Transport; + + /// + /// Provide context to recoverability actions. + /// + public interface IRecoverabilityActionContext : IBehaviorContext + { + /// + /// Context for the message that failed processing. + /// + ErrorContext ErrorContext { get; } + + /// + /// Metadata for this message. + /// + IReadOnlyDictionary Metadata { get; } + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/IRecoverabilityActionContextNotifications.cs b/src/NServiceBus.Core/Recoverability/IRecoverabilityActionContextNotifications.cs new file mode 100644 index 00000000000..00360d31531 --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/IRecoverabilityActionContextNotifications.cs @@ -0,0 +1,11 @@ +namespace NServiceBus +{ + using System.Collections.Generic; + + // This is deliberately internal and sneaky. We have a hunch with the introduction of the recoverability pipeline + // many of the cases that today require notifications can be obsoleted over time. + interface IRecoverabilityActionContextNotifications : IEnumerable + { + void Add(object notification); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/IRecoverabilityContext.cs b/src/NServiceBus.Core/Recoverability/IRecoverabilityContext.cs new file mode 100644 index 00000000000..a0ab257cc1c --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/IRecoverabilityContext.cs @@ -0,0 +1,36 @@ +namespace NServiceBus.Pipeline +{ + using System.Collections.Generic; + using Transport; + + /// + /// Provide context to behaviors on the recoverability pipeline. + /// + public interface IRecoverabilityContext : IBehaviorContext + { + /// + /// Context for the message that failed processing. + /// + ErrorContext ErrorContext { get; } + + /// + /// The recoverability configuration for the endpoint. + /// + public RecoverabilityConfig RecoverabilityConfiguration { get; } + + /// + /// The recoverability action to take for this message. + /// + RecoverabilityAction RecoverabilityAction { get; set; } + + /// + /// Metadata for this message. + /// + Dictionary Metadata { get; } + + /// + /// Locks the recoverability action for further changes. + /// + IRecoverabilityActionContext PreventChanges(); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/IRecoverabilityPipelineExecutor.cs b/src/NServiceBus.Core/Recoverability/IRecoverabilityPipelineExecutor.cs new file mode 100644 index 00000000000..98cd0f40793 --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/IRecoverabilityPipelineExecutor.cs @@ -0,0 +1,11 @@ +namespace NServiceBus +{ + using System.Threading; + using System.Threading.Tasks; + using Transport; + + interface IRecoverabilityPipelineExecutor + { + Task Invoke(ErrorContext errorContext, CancellationToken cancellationToken = default); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/ImmediateRetry.cs b/src/NServiceBus.Core/Recoverability/ImmediateRetry.cs index db121e3681d..04a7e3a2769 100644 --- a/src/NServiceBus.Core/Recoverability/ImmediateRetry.cs +++ b/src/NServiceBus.Core/Recoverability/ImmediateRetry.cs @@ -1,10 +1,45 @@ namespace NServiceBus { + using System; + using System.Collections.Generic; + using NServiceBus.Logging; + using NServiceBus.Transport; + using Pipeline; + /// /// Indicates recoverability is required to immediately retry the current message. /// - public sealed class ImmediateRetry : RecoverabilityAction + public class ImmediateRetry : RecoverabilityAction { - internal ImmediateRetry() { } + /// + /// Creates an immediate retry action. + /// + protected internal ImmediateRetry() + { + } + + /// + /// The ErrorHandleResult that should be passed to the transport. + /// + public override ErrorHandleResult ErrorHandleResult => ErrorHandleResult.RetryRequired; + + /// + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + var errorContext = context.ErrorContext; + + Logger.Info($"Immediate Retry is going to retry message '{errorContext.Message.MessageId}' because of an exception:", errorContext.Exception); + if (context is IRecoverabilityActionContextNotifications notifications) + { + notifications.Add(new MessageToBeRetried( + attempt: errorContext.ImmediateProcessingFailures - 1, + delay: TimeSpan.Zero, + immediateRetry: true, + errorContext: errorContext)); + } + return Array.Empty(); + } + + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/MoveToError.cs b/src/NServiceBus.Core/Recoverability/MoveToError.cs index 077432f2236..4b2a82a1f60 100644 --- a/src/NServiceBus.Core/Recoverability/MoveToError.cs +++ b/src/NServiceBus.Core/Recoverability/MoveToError.cs @@ -1,18 +1,62 @@ namespace NServiceBus { + using System.Collections.Generic; + using Logging; + using Pipeline; + using Recoverability; + using Routing; + using Transport; + /// /// Indicates that recoverability is required to move the current message to the error queue. /// - public sealed class MoveToError : RecoverabilityAction + public class MoveToError : RecoverabilityAction { - internal MoveToError(string errorQueue) - { - ErrorQueue = errorQueue; - } + /// + /// Creates the action with the target error queue. + /// + protected internal MoveToError(string errorQueue) => ErrorQueue = errorQueue; /// /// Defines the error queue where the message should be move to. /// public string ErrorQueue { get; } + + /// + /// The ErrorHandleResult that should be passed to the transport. + /// + public override ErrorHandleResult ErrorHandleResult => ErrorHandleResult.Handled; + + /// + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + var errorContext = context.ErrorContext; + var metadata = context.Metadata; + var message = errorContext.Message; + + Logger.Error($"Moving message '{message.MessageId}' to the error queue '{ErrorQueue}' because processing failed due to an exception:", errorContext.Exception); + + if (context is IRecoverabilityActionContextNotifications notifications) + { + notifications.Add(new MessageFaulted(errorContext, ErrorQueue)); + } + + var outgoingMessage = new OutgoingMessage(message.MessageId, new Dictionary(message.Headers), message.Body); + + var headers = outgoingMessage.Headers; + headers.Remove(Headers.DelayedRetries); + headers.Remove(Headers.ImmediateRetries); + + foreach (var faultMetadata in metadata) + { + headers[faultMetadata.Key] = faultMetadata.Value; + } + return new[] + { + context.CreateRoutingContext(outgoingMessage, new UnicastRoutingStrategy(ErrorQueue)) + }; + } + + static readonly ILog Logger = LogManager.GetLogger(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/MoveToErrorsExecutor.cs b/src/NServiceBus.Core/Recoverability/MoveToErrorsExecutor.cs deleted file mode 100644 index dbba6863e90..00000000000 --- a/src/NServiceBus.Core/Recoverability/MoveToErrorsExecutor.cs +++ /dev/null @@ -1,49 +0,0 @@ -namespace NServiceBus -{ - using System; - using System.Collections.Generic; - using System.Threading; - using System.Threading.Tasks; - using NServiceBus.Faults; - using Routing; - using Transport; - - class MoveToErrorsExecutor - { - public MoveToErrorsExecutor(IMessageDispatcher dispatcher, Dictionary staticFaultMetadata, Action> headerCustomizations) - { - this.dispatcher = dispatcher; - this.staticFaultMetadata = staticFaultMetadata; - this.headerCustomizations = headerCustomizations; - } - - public Task MoveToErrorQueue(string errorQueueAddress, ErrorContext errorContext, CancellationToken cancellationToken = default) - { - var message = errorContext.Message; - var outgoingMessage = new OutgoingMessage(message.MessageId, new Dictionary(message.Headers), message.Body); - - var headers = outgoingMessage.Headers; - headers.Remove(Headers.DelayedRetries); - headers.Remove(Headers.ImmediateRetries); - - headers[FaultsHeaderKeys.FailedQ] = errorContext.ReceiveAddress; - - ExceptionHeaderHelper.SetExceptionHeaders(headers, errorContext.Exception); - - foreach (var faultMetadata in staticFaultMetadata) - { - headers[faultMetadata.Key] = faultMetadata.Value; - } - - headerCustomizations(headers); - - var transportOperations = new TransportOperations(new TransportOperation(outgoingMessage, new UnicastAddressTag(errorQueueAddress))); - - return dispatcher.Dispatch(transportOperations, errorContext.TransportTransaction, cancellationToken); - } - - IMessageDispatcher dispatcher; - Dictionary staticFaultMetadata; - Action> headerCustomizations; - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityAction.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityAction.cs index 724a10e50e3..bfbb72daf9d 100644 --- a/src/NServiceBus.Core/Recoverability/RecoverabilityAction.cs +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityAction.cs @@ -1,6 +1,9 @@ namespace NServiceBus { using System; + using System.Collections.Generic; + using Transport; + using Pipeline; /// /// Abstraction representing any recoverability action. @@ -18,10 +21,12 @@ protected internal RecoverabilityAction() /// Creates an immediate retry recoverability action. /// /// Immediate retry action. - public static ImmediateRetry ImmediateRetry() - { - return CachedImmediateRetry; - } + public static ImmediateRetry ImmediateRetry() => CachedImmediateRetry; + + /// + /// Returns the routing contexts derived from the provided recoverability context. + /// + public abstract IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context); /// /// Creates a new delayed retry recoverability action. @@ -57,6 +62,11 @@ public static Discard Discard(string reason) return new Discard(reason); } - static ImmediateRetry CachedImmediateRetry = new ImmediateRetry(); + /// + /// The ErrorHandleResult that should be passed to the transport. + /// + public abstract ErrorHandleResult ErrorHandleResult { get; } + + static readonly ImmediateRetry CachedImmediateRetry = new ImmediateRetry(); } } \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs index 04c7e6c2a2f..f39ba03aa45 100644 --- a/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityComponent.cs @@ -3,10 +3,11 @@ using System; using System.Collections.Generic; using System.Linq; - using Hosting; - using Microsoft.Extensions.DependencyInjection; + using NServiceBus.Hosting; + using NServiceBus.Logging; + using NServiceBus.Pipeline; + using NServiceBus.Support; using Settings; - using Support; using Transport; class RecoverabilityComponent @@ -15,8 +16,8 @@ public RecoverabilityComponent(SettingsHolder settings) { this.settings = settings; var configuration = settings.Get(); - MessageRetryNotification = configuration.MessageRetryNotification; - MessageFaultedNotification = configuration.MessageFaultedNotification; + messageRetryNotification = configuration.MessageRetryNotification; + messageFaultedNotification = configuration.MessageFaultedNotification; settings.SetDefault(NumberOfDelayedRetries, DefaultNumberOfRetries); settings.SetDefault(DelayedRetriesTimeIncrease, DefaultTimeIncrease); settings.SetDefault(NumberOfImmediateRetries, 5); @@ -24,17 +25,11 @@ public RecoverabilityComponent(SettingsHolder settings) settings.AddUnrecoverableException(typeof(MessageDeserializationException)); } - public RecoverabilityExecutorFactory GetRecoverabilityExecutorFactory(IServiceProvider builder) - { - if (recoverabilityExecutorFactory == null) - { - recoverabilityExecutorFactory = CreateRecoverabilityExecutorFactory(builder); - } - - return recoverabilityExecutorFactory; - } - - public void Initialize(ReceiveComponent.Configuration receiveConfiguration, HostingComponent.Configuration hostingConfiguration, TransportSeam transportSeam) + public void Initialize( + ReceiveComponent.Configuration receiveConfiguration, + HostingComponent.Configuration hostingConfiguration, + TransportSeam transportSeam, + PipelineSettings pipelineSettings) { if (receiveConfiguration.IsSendOnlyEndpoint) { @@ -44,8 +39,9 @@ public void Initialize(ReceiveComponent.Configuration receiveConfiguration, Host hostInformation = hostingConfiguration.HostInformation; this.transportSeam = transportSeam; - transactionsOn = transportSeam.TransportDefinition.TransportTransactionMode != TransportTransactionMode.None; + delayedRetriesAvailable = transactionsOn && transportSeam.TransportDefinition.SupportsDelayedDelivery; + immediateRetriesAvailable = transactionsOn; var errorQueue = settings.ErrorQueueAddress(); transportSeam.QueueBindings.BindSending(errorQueue); @@ -58,6 +54,10 @@ public void Initialize(ReceiveComponent.Configuration receiveConfiguration, Host recoverabilityConfig = new RecoverabilityConfig(immediateRetryConfig, delayedRetryConfig, failedConfig); + faultMetadataExtractor = CreateFaultMetadataExtractor(); + + pipelineSettings.Register(new RecoverabilityRoutingConnector(messageRetryNotification, messageFaultedNotification), "Executes the configured retry policy"); + hostingConfiguration.AddStartupDiagnosticsSection("Recoverability", new { ImmediateRetries = recoverabilityConfig.Immediate.MaxNumberOfRetries, @@ -68,50 +68,89 @@ public void Initialize(ReceiveComponent.Configuration receiveConfiguration, Host }); } - RecoverabilityExecutorFactory CreateRecoverabilityExecutorFactory(IServiceProvider builder) + public IRecoverabilityPipelineExecutor CreateRecoverabilityPipelineExecutor( + IServiceProvider serviceProvider, + IPipelineCache pipelineCache, + PipelineComponent pipeline, + MessageOperations messageOperations) { - var delayedRetriesAvailable = transactionsOn && transportSeam.TransportDefinition.SupportsDelayedDelivery; - var immediateRetriesAvailable = transactionsOn; + var recoverabilityPipeline = pipeline.CreatePipeline(serviceProvider); - Func moveToErrorsExecutorFactory = () => + if (!settings.TryGet(PolicyOverride, out Func policy)) { - var staticFaultMetadata = new Dictionary - { - {Headers.ProcessingMachine, RuntimeEnvironment.MachineName}, - {Headers.ProcessingEndpoint, settings.EndpointName()}, - {Headers.HostId, hostInformation.HostId.ToString("N")}, - {Headers.HostDisplayName, hostInformation.DisplayName} - }; - - var headerCustomizations = settings.Get>>(FaultHeaderCustomization); - - return new MoveToErrorsExecutor(builder.GetRequiredService(), staticFaultMetadata, headerCustomizations); + policy = (config, context) => DefaultRecoverabilityPolicy.Invoke(config, context); }; - Func delayedRetryExecutorFactory = () => - { - if (delayedRetriesAvailable) + return new RecoverabilityPipelineExecutor<(RecoverabilityComponent, + Func)>( + serviceProvider, + pipelineCache, + messageOperations, + recoverabilityConfig, + (errorContext, state) => { - return new DelayedRetryExecutor(builder.GetRequiredService()); - } + var (@this, localPolicy) = state; + return AdjustForTransportCapabilities( + @this.recoverabilityConfig.Failed.ErrorQueue, + @this.immediateRetriesAvailable, + @this.delayedRetriesAvailable, + localPolicy(@this.recoverabilityConfig, errorContext)); + }, + recoverabilityPipeline, + faultMetadataExtractor, + (this, policy)); + } - return null; - }; + public IRecoverabilityPipelineExecutor CreateSatelliteRecoverabilityExecutor( + IServiceProvider serviceProvider, + Func recoverabilityPolicy) => + new SatelliteRecoverabilityExecutor<(RecoverabilityComponent, Func)>( + serviceProvider, + faultMetadataExtractor, + (errorContext, state) => + { + var (@this, policy) = state; + return AdjustForTransportCapabilities( + @this.recoverabilityConfig.Failed.ErrorQueue, + @this.immediateRetriesAvailable, + @this.delayedRetriesAvailable, + policy(@this.recoverabilityConfig, errorContext)); + }, (this, recoverabilityPolicy)); + + public static RecoverabilityAction AdjustForTransportCapabilities( + string errorQueue, + bool immediateRetriesAvailable, + bool delayedRetriesAvailable, + RecoverabilityAction selectedAction) + { + if (selectedAction is ImmediateRetry && !immediateRetriesAvailable) + { + Logger.Warn("Recoverability policy requested ImmediateRetry however immediate retries are not available with the current endpoint configuration. Moving message to error queue instead."); + return RecoverabilityAction.MoveToError(errorQueue); + } - if (!settings.TryGet(PolicyOverride, out Func policy)) + if (selectedAction is DelayedRetry && !delayedRetriesAvailable) { - policy = DefaultRecoverabilityPolicy.Invoke; + Logger.Warn("Recoverability policy requested DelayedRetry however delayed delivery capability is not available with the current endpoint configuration. Moving message to error queue instead."); + return RecoverabilityAction.MoveToError(errorQueue); } - return new RecoverabilityExecutorFactory( - policy, - recoverabilityConfig, - delayedRetryExecutorFactory, - moveToErrorsExecutorFactory, - immediateRetriesAvailable, - delayedRetriesAvailable, - MessageRetryNotification, - MessageFaultedNotification); + return selectedAction; + } + + FaultMetadataExtractor CreateFaultMetadataExtractor() + { + var staticFaultMetadata = new Dictionary + { + {Headers.ProcessingMachine, RuntimeEnvironment.MachineName}, + {Headers.ProcessingEndpoint, settings.EndpointName()}, + {Headers.HostId, hostInformation.HostId.ToString("N")}, + {Headers.HostDisplayName, hostInformation.DisplayName} + }; + + var headerCustomizations = settings.Get>>(FaultHeaderCustomization); + + return new FaultMetadataExtractor(staticFaultMetadata, headerCustomizations); } ImmediateConfig GetImmediateRetryConfig() @@ -147,14 +186,15 @@ DelayedConfig GetDelayedRetryConfig() return new DelayedConfig(numberOfRetries, timeIncrease); } - public Notification MessageRetryNotification; - public Notification MessageFaultedNotification; - - IReadOnlySettings settings; - bool transactionsOn; + Notification messageRetryNotification; + Notification messageFaultedNotification; RecoverabilityConfig recoverabilityConfig; - RecoverabilityExecutorFactory recoverabilityExecutorFactory; + FaultMetadataExtractor faultMetadataExtractor; HostInformation hostInformation; + IReadOnlySettings settings; + bool transactionsOn; + bool delayedRetriesAvailable; + bool immediateRetriesAvailable; public const string NumberOfDelayedRetries = "Recoverability.Delayed.DefaultPolicy.Retries"; public const string DelayedRetriesTimeIncrease = "Recoverability.Delayed.DefaultPolicy.Timespan"; @@ -165,6 +205,7 @@ DelayedConfig GetDelayedRetryConfig() static int DefaultNumberOfRetries = 3; static TimeSpan DefaultTimeIncrease = TimeSpan.FromSeconds(10); + static ILog Logger = LogManager.GetLogger(); TransportSeam transportSeam; public class Configuration diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityContext.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityContext.cs new file mode 100644 index 00000000000..1116cd6e784 --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityContext.cs @@ -0,0 +1,65 @@ +namespace NServiceBus +{ + using System; + using System.Collections; + using System.Collections.Generic; + using System.Linq; + using Pipeline; + using Transport; + + class RecoverabilityContext : BehaviorContext, IRecoverabilityContext, IRecoverabilityActionContext, IRecoverabilityActionContextNotifications + { + public RecoverabilityContext( + ErrorContext errorContext, + RecoverabilityConfig recoverabilityConfig, + Dictionary metadata, + RecoverabilityAction recoverabilityAction, + IBehaviorContext parent) : base(parent) + { + ErrorContext = errorContext; + RecoverabilityConfiguration = recoverabilityConfig; + Metadata = metadata; + RecoverabilityAction = recoverabilityAction; + } + + public ErrorContext ErrorContext { get; } + + IReadOnlyDictionary IRecoverabilityActionContext.Metadata => Metadata; + + public RecoverabilityConfig RecoverabilityConfiguration { get; } + + public Dictionary Metadata { get; } + + public RecoverabilityAction RecoverabilityAction + { + get => recoverabilityAction; + set + { + if (locked) + { + throw new InvalidOperationException("The RecoverabilityAction has already been executed and can't be changed"); + } + recoverabilityAction = value; + } + } + + public IRecoverabilityActionContext PreventChanges() + { + locked = true; + return this; + } + + public void Add(object notification) + { + notifications ??= new List(); + notifications.Add(notification); + } + + public IEnumerator GetEnumerator() => notifications?.GetEnumerator() ?? Enumerable.Empty().GetEnumerator(); + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + RecoverabilityAction recoverabilityAction; + bool locked; + List notifications; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityContextExtensions.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityContextExtensions.cs new file mode 100644 index 00000000000..f47edb0ccc3 --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityContextExtensions.cs @@ -0,0 +1,18 @@ +namespace NServiceBus.Recoverability +{ + using Pipeline; + using Routing; + using Transport; + + /// + /// Allows the dispatch pipeline to be invoked from the recoverability pipeline. + /// + public static class RecoverabilityContextExtensions + { + /// + /// Creates a based on the current context. + /// + public static IRoutingContext CreateRoutingContext(this IRecoverabilityActionContext context, OutgoingMessage outgoingMessage, RoutingStrategy routingStrategy) => + new RoutingContext(outgoingMessage, routingStrategy, context); + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityExecutor.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityExecutor.cs deleted file mode 100644 index c7b0b7d7c09..00000000000 --- a/src/NServiceBus.Core/Recoverability/RecoverabilityExecutor.cs +++ /dev/null @@ -1,148 +0,0 @@ -namespace NServiceBus -{ - using System; - using System.Threading; - using System.Threading.Tasks; - using Logging; - using Transport; - - class RecoverabilityExecutor - { - public RecoverabilityExecutor( - bool raiseRecoverabilityNotifications, - bool immediateRetriesAvailable, - bool delayedRetriesAvailable, - Func recoverabilityPolicy, - RecoverabilityConfig configuration, - DelayedRetryExecutor delayedRetryExecutor, - MoveToErrorsExecutor moveToErrorsExecutor, - INotificationSubscriptions messageRetryNotification, - INotificationSubscriptions messageFaultedNotification) - { - this.configuration = configuration; - this.recoverabilityPolicy = recoverabilityPolicy; - this.delayedRetryExecutor = delayedRetryExecutor; - this.moveToErrorsExecutor = moveToErrorsExecutor; - this.messageRetryNotification = messageRetryNotification; - this.messageFaultedNotification = messageFaultedNotification; - this.immediateRetriesAvailable = immediateRetriesAvailable; - this.delayedRetriesAvailable = delayedRetriesAvailable; - - raiseNotifications = raiseRecoverabilityNotifications; - } - - public Task Invoke(ErrorContext errorContext, CancellationToken cancellationToken = default) - { - var recoveryAction = recoverabilityPolicy(configuration, errorContext); - - if (recoveryAction is Discard discard) - { - Logger.Info($"Discarding message with id '{errorContext.Message.MessageId}'. Reason: {discard.Reason}", errorContext.Exception); - return HandledTask; - } - - // When we can't do immediate retries and policy did not honor MaxNumberOfRetries for ImmediateRetries - if (recoveryAction is ImmediateRetry && !immediateRetriesAvailable) - { - Logger.Warn("Recoverability policy requested ImmediateRetry however immediate retries are not available with the current endpoint configuration. Moving message to error queue instead."); - return MoveToError(errorContext, configuration.Failed.ErrorQueue, cancellationToken); - } - - if (recoveryAction is ImmediateRetry) - { - return RaiseImmediateRetryNotifications(errorContext, cancellationToken); - } - - // When we can't do delayed retries, a policy customization probably didn't honor MaxNumberOfRetries for DelayedRetries - if (recoveryAction is DelayedRetry && !delayedRetriesAvailable) - { - Logger.Warn("Recoverability policy requested DelayedRetry however delayed delivery capability is not available with the current endpoint configuration. Moving message to error queue instead."); - return MoveToError(errorContext, configuration.Failed.ErrorQueue, cancellationToken); - } - - if (recoveryAction is DelayedRetry delayedRetryAction) - { - return DeferMessage(delayedRetryAction, errorContext, cancellationToken); - } - - if (recoveryAction is MoveToError moveToError) - { - return MoveToError(errorContext, moveToError.ErrorQueue, cancellationToken); - } - - Logger.Warn("Recoverability policy returned an unsupported recoverability action. Moving message to error queue instead."); - return MoveToError(errorContext, configuration.Failed.ErrorQueue, cancellationToken); - } - - async Task RaiseImmediateRetryNotifications(ErrorContext errorContext, CancellationToken cancellationToken) - { - Logger.Info($"Immediate Retry is going to retry message '{errorContext.Message.MessageId}' because of an exception:", errorContext.Exception); - - if (raiseNotifications) - { - await messageRetryNotification.Raise( - new MessageToBeRetried( - attempt: errorContext.ImmediateProcessingFailures - 1, - delay: TimeSpan.Zero, - immediateRetry: true, - errorContext: errorContext), - cancellationToken) - .ConfigureAwait(false); - } - - return ErrorHandleResult.RetryRequired; - } - - async Task MoveToError(ErrorContext errorContext, string errorQueue, CancellationToken cancellationToken) - { - var message = errorContext.Message; - - Logger.Error($"Moving message '{message.MessageId}' to the error queue '{errorQueue}' because processing failed due to an exception:", errorContext.Exception); - - await moveToErrorsExecutor.MoveToErrorQueue(errorQueue, errorContext, cancellationToken).ConfigureAwait(false); - - if (raiseNotifications) - { - await messageFaultedNotification.Raise(new MessageFaulted(errorContext, errorQueue), cancellationToken).ConfigureAwait(false); - } - - return ErrorHandleResult.Handled; - } - - async Task DeferMessage(DelayedRetry action, ErrorContext errorContext, CancellationToken cancellationToken) - { - var message = errorContext.Message; - - Logger.Warn($"Delayed Retry will reschedule message '{message.MessageId}' after a delay of {action.Delay} because of an exception:", errorContext.Exception); - - var currentDelayedRetriesAttempts = await delayedRetryExecutor.Retry(errorContext, action.Delay, cancellationToken).ConfigureAwait(false); - - if (raiseNotifications) - { - await messageRetryNotification.Raise( - new MessageToBeRetried( - attempt: currentDelayedRetriesAttempts, - delay: action.Delay, - immediateRetry: false, - errorContext: errorContext), - cancellationToken) - .ConfigureAwait(false); - } - - return ErrorHandleResult.Handled; - } - - readonly INotificationSubscriptions messageRetryNotification; - readonly INotificationSubscriptions messageFaultedNotification; - Func recoverabilityPolicy; - DelayedRetryExecutor delayedRetryExecutor; - MoveToErrorsExecutor moveToErrorsExecutor; - bool raiseNotifications; - bool immediateRetriesAvailable; - bool delayedRetriesAvailable; - RecoverabilityConfig configuration; - - static Task HandledTask = Task.FromResult(ErrorHandleResult.Handled); - static ILog Logger = LogManager.GetLogger(); - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityExecutorFactory.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityExecutorFactory.cs deleted file mode 100644 index d241353693d..00000000000 --- a/src/NServiceBus.Core/Recoverability/RecoverabilityExecutorFactory.cs +++ /dev/null @@ -1,65 +0,0 @@ -namespace NServiceBus -{ - using System; - using Transport; - - class RecoverabilityExecutorFactory - { - public RecoverabilityExecutorFactory( - Func defaultRecoverabilityPolicy, - RecoverabilityConfig configuration, - Func delayedRetryExecutorFactory, - Func moveToErrorsExecutorFactory, - bool immediateRetriesAvailable, - bool delayedRetriesAvailable, - INotificationSubscriptions messageRetryNotification, - INotificationSubscriptions messageFaultedNotification) - { - this.configuration = configuration; - this.defaultRecoverabilityPolicy = defaultRecoverabilityPolicy; - this.delayedRetryExecutorFactory = delayedRetryExecutorFactory; - this.moveToErrorsExecutorFactory = moveToErrorsExecutorFactory; - this.immediateRetriesAvailable = immediateRetriesAvailable; - this.delayedRetriesAvailable = delayedRetriesAvailable; - this.messageRetryNotification = messageRetryNotification; - this.messageFaultedNotification = messageFaultedNotification; - } - - public RecoverabilityExecutor CreateDefault() - { - return Create(defaultRecoverabilityPolicy, raiseNotifications: true); - } - - public RecoverabilityExecutor Create(Func customRecoverabilityPolicy) - { - return Create(customRecoverabilityPolicy, raiseNotifications: false); - } - - RecoverabilityExecutor Create(Func customRecoverabilityPolicy, bool raiseNotifications) - { - var delayedRetryExecutor = delayedRetryExecutorFactory(); - var moveToErrorsExecutor = moveToErrorsExecutorFactory(); - - return new RecoverabilityExecutor( - raiseNotifications, - immediateRetriesAvailable, - delayedRetriesAvailable, - customRecoverabilityPolicy, - configuration, - delayedRetryExecutor, - moveToErrorsExecutor, - messageRetryNotification, - messageFaultedNotification); - } - - readonly bool immediateRetriesAvailable; - readonly bool delayedRetriesAvailable; - readonly INotificationSubscriptions messageRetryNotification; - readonly INotificationSubscriptions messageFaultedNotification; - - Func defaultRecoverabilityPolicy; - Func delayedRetryExecutorFactory; - Func moveToErrorsExecutorFactory; - RecoverabilityConfig configuration; - } -} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityPipelineExecutor.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityPipelineExecutor.cs new file mode 100644 index 00000000000..81cab6d678e --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityPipelineExecutor.cs @@ -0,0 +1,65 @@ +namespace NServiceBus +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Microsoft.Extensions.DependencyInjection; + using NServiceBus.Pipeline; + using Transport; + + class RecoverabilityPipelineExecutor : IRecoverabilityPipelineExecutor + { + public RecoverabilityPipelineExecutor( + IServiceProvider serviceProvider, + IPipelineCache pipelineCache, + MessageOperations messageOperations, + RecoverabilityConfig recoverabilityConfig, + Func recoverabilityPolicy, + Pipeline recoverabilityPipeline, + FaultMetadataExtractor faultMetadataExtractor, + TState state) + { + this.state = state; + this.serviceProvider = serviceProvider; + this.pipelineCache = pipelineCache; + this.messageOperations = messageOperations; + this.recoverabilityConfig = recoverabilityConfig; + this.recoverabilityPolicy = recoverabilityPolicy; + this.recoverabilityPipeline = recoverabilityPipeline; + this.faultMetadataExtractor = faultMetadataExtractor; + } + + public async Task Invoke(ErrorContext errorContext, CancellationToken cancellationToken = default) + { + using (var childScope = serviceProvider.CreateScope()) + { + var rootContext = new RootContext(childScope.ServiceProvider, messageOperations, pipelineCache, cancellationToken); + rootContext.Extensions.Merge(errorContext.Extensions); + + var recoverabilityAction = recoverabilityPolicy(errorContext, state); + + var metadata = faultMetadataExtractor.Extract(errorContext); + + var recoverabilityContext = new RecoverabilityContext( + errorContext, + recoverabilityConfig, + metadata, + recoverabilityAction, + rootContext); + + await recoverabilityPipeline.Invoke(recoverabilityContext).ConfigureAwait(false); + + return recoverabilityContext.RecoverabilityAction.ErrorHandleResult; + } + } + + readonly IServiceProvider serviceProvider; + readonly IPipelineCache pipelineCache; + readonly MessageOperations messageOperations; + readonly RecoverabilityConfig recoverabilityConfig; + readonly Func recoverabilityPolicy; + readonly Pipeline recoverabilityPipeline; + readonly FaultMetadataExtractor faultMetadataExtractor; + readonly TState state; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs b/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs new file mode 100644 index 00000000000..acac6807042 --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/RecoverabilityRoutingConnector.cs @@ -0,0 +1,42 @@ +namespace NServiceBus +{ + using System; + using System.Threading.Tasks; + using Pipeline; + + class RecoverabilityRoutingConnector : StageConnector + { + public RecoverabilityRoutingConnector( + INotificationSubscriptions messageRetryNotification, + INotificationSubscriptions messageFaultedNotification) + { + notifications = new CompositeNotification(); + notifications.Register(messageRetryNotification); + notifications.Register(messageFaultedNotification); + } + + public override async Task Invoke(IRecoverabilityContext context, Func stage) + { + var recoverabilityActionContext = context.PreventChanges(); + + RecoverabilityAction recoverabilityAction = context.RecoverabilityAction; + var routingContexts = recoverabilityAction + .GetRoutingContexts(recoverabilityActionContext); + + foreach (var routingContext in routingContexts) + { + await stage(routingContext).ConfigureAwait(false); + } + + if (context is IRecoverabilityActionContextNotifications events) + { + foreach (object @event in events) + { + await notifications.Raise(@event, context.CancellationToken).ConfigureAwait(false); + } + } + } + + readonly CompositeNotification notifications; + } +} diff --git a/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs b/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs new file mode 100644 index 00000000000..1a6b13f0cff --- /dev/null +++ b/src/NServiceBus.Core/Recoverability/SatelliteRecoverabilityExecutor.cs @@ -0,0 +1,85 @@ +namespace NServiceBus +{ + using System; + using System.Collections.Generic; + using System.Threading; + using System.Threading.Tasks; + using Extensibility; + using Microsoft.Extensions.DependencyInjection; + using Transport; + using Pipeline; + + class SatelliteRecoverabilityExecutor : IRecoverabilityPipelineExecutor + { + public SatelliteRecoverabilityExecutor( + IServiceProvider serviceProvider, + FaultMetadataExtractor faultMetadataExtractor, + Func recoverabilityPolicy, + TState state) + { + this.state = state; + this.serviceProvider = serviceProvider; + this.faultMetadataExtractor = faultMetadataExtractor; + this.recoverabilityPolicy = recoverabilityPolicy; + } + + public async Task Invoke( + ErrorContext errorContext, + CancellationToken cancellationToken = default) + { + var recoverabilityAction = recoverabilityPolicy(errorContext, state); + var metadata = faultMetadataExtractor.Extract(errorContext); + + var actionContext = new BehaviorActionContext(errorContext, metadata, serviceProvider, cancellationToken); + + List transportOperations = null; + var routingContexts = recoverabilityAction.GetRoutingContexts(actionContext); + + foreach (var routingContext in routingContexts) + { + // using the count here is not entirely accurate because of the way we duplicate based on the strategies + // but in many cases it is a good approximation. + transportOperations ??= new List(routingContexts.Count); + foreach (var strategy in routingContext.RoutingStrategies) + { + var transportOperation = routingContext.ToTransportOperation(strategy, DispatchConsistency.Default); + transportOperations.Add(transportOperation); + } + } + + if (transportOperations == null) + { + return recoverabilityAction.ErrorHandleResult; + } + + var dispatcher = serviceProvider.GetRequiredService(); + await dispatcher.Dispatch(new TransportOperations(transportOperations.ToArray()), errorContext.TransportTransaction, cancellationToken).ConfigureAwait(false); + + return recoverabilityAction.ErrorHandleResult; + } + + class BehaviorActionContext : IRecoverabilityActionContext + { + public BehaviorActionContext(ErrorContext errorContext, IReadOnlyDictionary metadata, IServiceProvider serviceProvider, CancellationToken cancellationToken) + { + ErrorContext = errorContext; + Metadata = metadata; + CancellationToken = cancellationToken; + Builder = serviceProvider; + } + public CancellationToken CancellationToken { get; } + + public ContextBag Extensions => contextBag ??= new ContextBag(); + public IServiceProvider Builder { get; } + public ErrorContext ErrorContext { get; } + public IReadOnlyDictionary Metadata { get; } + + ContextBag contextBag; + } + + readonly IServiceProvider serviceProvider; + readonly FaultMetadataExtractor faultMetadataExtractor; + readonly Func recoverabilityPolicy; + readonly TState state; + } +} \ No newline at end of file diff --git a/src/NServiceBus.Core/Transports/ErrorContext.cs b/src/NServiceBus.Core/Transports/ErrorContext.cs index b20136039f4..3a5566ec473 100644 --- a/src/NServiceBus.Core/Transports/ErrorContext.cs +++ b/src/NServiceBus.Core/Transports/ErrorContext.cs @@ -20,7 +20,7 @@ public class ErrorContext /// Number of failed immediate processing attempts. /// The receive address. /// A which can be used to extend the current object. - public ErrorContext(Exception exception, Dictionary headers, string nativeMessageId, ReadOnlyMemory body, TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, IReadOnlyContextBag context) + public ErrorContext(Exception exception, Dictionary headers, string nativeMessageId, ReadOnlyMemory body, TransportTransaction transportTransaction, int immediateProcessingFailures, string receiveAddress, ContextBag context) { Guard.AgainstNull(nameof(exception), exception); Guard.AgainstNull(nameof(transportTransaction), transportTransaction); @@ -73,6 +73,6 @@ public ErrorContext(Exception exception, Dictionary headers, str /// /// A collection of additional information provided by the transport. /// - public IReadOnlyContextBag Extensions { get; } + public ContextBag Extensions { get; } } } diff --git a/src/NServiceBus.Core/Transports/MessageContext.cs b/src/NServiceBus.Core/Transports/MessageContext.cs index 29d8505b9c4..968d05234cb 100644 --- a/src/NServiceBus.Core/Transports/MessageContext.cs +++ b/src/NServiceBus.Core/Transports/MessageContext.cs @@ -59,6 +59,7 @@ public MessageContext(string nativeMessageId, Dictionary headers /// Transport address that received the failed message. /// public string ReceiveAddress { get; } + /// /// A which can be used to extend the current object. /// diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt index ce3ebf00b9d..c466da4a111 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net472.approved.txt @@ -61,3 +61,8 @@ context0 => Convert(context0.Extensions.Behaviors[0]).Invoke(context0, value(Sys context8 => Convert(context8.Extensions.Behaviors[8]).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), context9 => Convert(context9.Extensions.Behaviors[9]).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), context10 => Convert(context10.Extensions.Behaviors[10]).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), + +context0 => Convert(context0.Extensions.Behaviors[0]).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context1 => Convert(context1.Extensions.Behaviors[1]).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context2 => Convert(context2.Extensions.Behaviors[2]).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])), + context3 => Convert(context3.Extensions.Behaviors[3]).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt index a0ee0773c25..ba3ffa75b49 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.net6.0.approved.txt @@ -61,3 +61,8 @@ context0 => Convert(context0.Extensions.Behaviors[0], CaptureExceptionBehavior). context8 => Convert(context8.Extensions.Behaviors[8], InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), context9 => Convert(context9.Extensions.Behaviors[9], LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), context10 => Convert(context10.Extensions.Behaviors[10], InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), + +context0 => Convert(context0.Extensions.Behaviors[0], RecoverabilityRoutingConnector).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context1 => Convert(context1.Extensions.Behaviors[1], AttachSenderRelatedInfoOnMessageBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context2 => Convert(context2.Extensions.Behaviors[2], RoutingToDispatchConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])), + context3 => Convert(context3.Extensions.Behaviors[3], ImmediateDispatchTerminator).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task`1[System.Threading.Tasks.VoidTaskResult]])), diff --git a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt index a9e0c2d1e4d..13e763fba92 100644 --- a/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt +++ b/src/NServiceBus.Learning.AcceptanceTests/ApprovalFiles/When_pipelines_are_built.Should_preserve_order.netcoreapp3.1.approved.txt @@ -61,3 +61,8 @@ context0 => Convert(context0.Extensions.Behaviors[0], CaptureExceptionBehavior). context8 => Convert(context8.Extensions.Behaviors[8], InferredMessageTypeEnricherBehavior).Invoke(context8, value(System.Func`2[NServiceBus.Pipeline.IIncomingLogicalMessageContext,System.Threading.Tasks.Task])), context9 => Convert(context9.Extensions.Behaviors[9], LoadHandlersConnector).Invoke(context9, value(System.Func`2[NServiceBus.Pipeline.IInvokeHandlerContext,System.Threading.Tasks.Task])), context10 => Convert(context10.Extensions.Behaviors[10], InvokeHandlerTerminator).Invoke(context10, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IInvokeHandlerContext],System.Threading.Tasks.Task])), + +context0 => Convert(context0.Extensions.Behaviors[0], RecoverabilityRoutingConnector).Invoke(context0, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context1 => Convert(context1.Extensions.Behaviors[1], AttachSenderRelatedInfoOnMessageBehavior).Invoke(context1, value(System.Func`2[NServiceBus.Pipeline.IRoutingContext,System.Threading.Tasks.Task])), + context2 => Convert(context2.Extensions.Behaviors[2], RoutingToDispatchConnector).Invoke(context2, value(System.Func`2[NServiceBus.Pipeline.IDispatchContext,System.Threading.Tasks.Task])), + context3 => Convert(context3.Extensions.Behaviors[3], ImmediateDispatchTerminator).Invoke(context3, value(System.Func`2[NServiceBus.Pipeline.PipelineTerminator`1+ITerminatingContext[NServiceBus.Pipeline.IDispatchContext],System.Threading.Tasks.Task])), diff --git a/src/NServiceBus.Testing.Fakes/TestableRecoverabilityContext.cs b/src/NServiceBus.Testing.Fakes/TestableRecoverabilityContext.cs new file mode 100644 index 00000000000..dbaefe33a3c --- /dev/null +++ b/src/NServiceBus.Testing.Fakes/TestableRecoverabilityContext.cs @@ -0,0 +1,64 @@ +namespace NServiceBus.Testing +{ + using System; + using System.Collections.Generic; + using Extensibility; + using Pipeline; + using Transport; + + /// + /// A testable implementation of . + /// + public partial class TestableRecoverabilityContext : TestableBehaviorContext, IRecoverabilityContext, IRecoverabilityActionContext + { + /// + /// The message that failed processing. + /// + public ErrorContext ErrorContext { get; set; } = new ErrorContext( + new Exception(), + new Dictionary(), + Guid.NewGuid().ToString(), + ReadOnlyMemory.Empty, + new TransportTransaction(), + 0, + "receive-address", + new ContextBag()); + + /// + /// Metadata for this message. + /// + IReadOnlyDictionary IRecoverabilityActionContext.Metadata => Metadata; + + /// + /// The recoverability configuration for the endpoint. + /// + public RecoverabilityConfig RecoverabilityConfiguration { get; set; } = new RecoverabilityConfig( + new ImmediateConfig(0), + new DelayedConfig(0, TimeSpan.Zero), + new FailedConfig("error", new HashSet())); + + /// + /// The recoverability action to take for this message. + /// + public RecoverabilityAction RecoverabilityAction { get; set; } + + /// + /// Metadata for this message. + /// + public Dictionary Metadata { get; set; } = new Dictionary(); + + /// + /// Locks the recoverability action for further changes. + /// + public IRecoverabilityActionContext PreventChanges() + { + IsLocked = true; + return this; + } + + /// + /// True if the recoverability action was locked. + /// + public bool IsLocked { get; set; } = false; + } +} \ No newline at end of file