From c1f45fe4e9c32193e092e0179ea4eeb046a8497a Mon Sep 17 00:00:00 2001 From: Antonello Provenzano Date: Wed, 26 Apr 2023 09:40:03 +0200 Subject: [PATCH] Adding the option to handle webhooks in parallel threads --- .../Deveel.Webhooks.Receiver.AspNetCore.xml | 31 +++++++++++++++ .../Webhooks/ApplicationBuilderExtensions.cs | 1 + .../Webhooks/HandlerExecutionMode.cs | 19 +++++++++ .../Webhooks/LoggerExtensions.cs | 4 ++ .../Webhooks/WebhookReceiverMiddleware.cs | 39 ++++++++++++++++--- .../Webhooks/WebhookReceiverOptions.cs | 14 +++++++ 6 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/HandlerExecutionMode.cs diff --git a/src/Deveel.Webhooks.Receiver.AspNetCore/Deveel.Webhooks.Receiver.AspNetCore.xml b/src/Deveel.Webhooks.Receiver.AspNetCore/Deveel.Webhooks.Receiver.AspNetCore.xml index 9c18005..ff2ed90 100644 --- a/src/Deveel.Webhooks.Receiver.AspNetCore/Deveel.Webhooks.Receiver.AspNetCore.xml +++ b/src/Deveel.Webhooks.Receiver.AspNetCore/Deveel.Webhooks.Receiver.AspNetCore.xml @@ -91,6 +91,7 @@ The type of the webhook to be received The application builder instance The relative path to listen for webhook posts + The options for the execution of the handlers The middleware will listen only for POST requests to the given path using @@ -210,6 +211,22 @@ webhooks posted to the given path. + + + Enumerates the possible execution modes for webhook handles + when received and processed. + + + + + The handlers are executed sequentially, one at a time. + + + + + The handlers are executed in parallel, all at the same time. + + Provides functions for handling webhooks of a specific type. @@ -974,6 +991,20 @@ from the sender is invalid (400 by default). + + + Gets or sets the execution mode for the handlers + during the processing of a received webhook + (default: ). + + + + + Gets or sets the maximum number of threads to use when + executing the handlers in parallel. By default the number + of processors in the machine is used. + + A default implementation of a verifier of a webhook request that performs diff --git a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/ApplicationBuilderExtensions.cs b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/ApplicationBuilderExtensions.cs index 1b04ad4..79f048f 100644 --- a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/ApplicationBuilderExtensions.cs +++ b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/ApplicationBuilderExtensions.cs @@ -28,6 +28,7 @@ public static class ApplicationBuilderExtensions { /// The type of the webhook to be received /// The application builder instance /// The relative path to listen for webhook posts + /// The options for the execution of the handlers /// /// /// The middleware will listen only for POST requests to the given path using diff --git a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/HandlerExecutionMode.cs b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/HandlerExecutionMode.cs new file mode 100644 index 0000000..e95a90b --- /dev/null +++ b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/HandlerExecutionMode.cs @@ -0,0 +1,19 @@ +using System; + +namespace Deveel.Webhooks { + /// + /// Enumerates the possible execution modes for webhook handles + /// when received and processed. + /// + public enum HandlerExecutionMode { + /// + /// The handlers are executed sequentially, one at a time. + /// + Sequential, + + /// + /// The handlers are executed in parallel, all at the same time. + /// + Parallel + } +} diff --git a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/LoggerExtensions.cs b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/LoggerExtensions.cs index b9486e7..2aec2b5 100644 --- a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/LoggerExtensions.cs +++ b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/LoggerExtensions.cs @@ -57,5 +57,9 @@ static partial class LoggerExtensions { [LoggerMessage(EventId = -20223, Level = LogLevel.Error, Message = "It was not possible to receive a webhook for an unhandled error")] public static partial void LogUnhandledReceiveError(this ILogger logger, Exception error); + + [LoggerMessage(EventId = -20227, Level = LogLevel.Error, + Message = "Unhandled error while executing the handler of type '{HandlerType}' for webhooks of type '{WebhookType}'")] + public static partial void LogUnhandledHandlerError(this ILogger logger, Exception error, Type handlerType, Type webhookType); } } diff --git a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverMiddleware.cs b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverMiddleware.cs index 2fb11aa..6cea9f0 100644 --- a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverMiddleware.cs +++ b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverMiddleware.cs @@ -41,6 +41,39 @@ public WebhookReceiverMiddleware( private int InvalidStatusCode => options.InvalidStatusCode ?? 400; + private async Task HandleWebhookAsync(TWebhook webhook, CancellationToken cancellationToken) { + if (handlers == null) + return; + + var mode = options.ExecutionMode ?? HandlerExecutionMode.Parallel; + + switch (mode) { + case HandlerExecutionMode.Sequential: + foreach (var handler in handlers) { + await ExecuteAsync(handler, webhook, cancellationToken); + } + break; + case HandlerExecutionMode.Parallel: + var parallelOptions = new ParallelOptions { + CancellationToken = cancellationToken, + MaxDegreeOfParallelism = options.MaxParallelThreads ?? Environment.ProcessorCount + }; + await Parallel.ForEachAsync(handlers, parallelOptions, async (handler, token) => { + await ExecuteAsync(handler, webhook, token); + }); + + break; + } + } + + private async Task ExecuteAsync(IWebhookHandler handler, TWebhook webhook, CancellationToken cancellationToken) { + try { + await handler.HandleAsync(webhook, cancellationToken); + } catch (Exception ex) { + logger.LogUnhandledHandlerError(ex, handler.GetType(), typeof(TWebhook)); + } + } + public async Task InvokeAsync(HttpContext context, RequestDelegate next) { try { logger.TraceWebhookArrived(); @@ -56,11 +89,7 @@ public async Task InvokeAsync(HttpContext context, RequestDelegate next) { } if (handlers != null && result.Successful && result.Webhook != null) { - foreach (var handler in handlers) { - await handler.HandleAsync(result.Webhook, context.RequestAborted); - - logger.TraceWebhookHandled(handler.GetType()); - } + await HandleWebhookAsync(result.Webhook, context.RequestAborted); } await next.Invoke(context); diff --git a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverOptions.cs b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverOptions.cs index 7ef4eb4..1613cbd 100644 --- a/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverOptions.cs +++ b/src/Deveel.Webhooks.Receiver.AspNetCore/Webhooks/WebhookReceiverOptions.cs @@ -47,5 +47,19 @@ public class WebhookReceiverOptions { /// from the sender is invalid (400 by default). /// public int? InvalidStatusCode { get; set; } = 400; + + /// + /// Gets or sets the execution mode for the handlers + /// during the processing of a received webhook + /// (default: ). + /// + public HandlerExecutionMode? ExecutionMode { get; set; } = HandlerExecutionMode.Parallel; + + /// + /// Gets or sets the maximum number of threads to use when + /// executing the handlers in parallel. By default the number + /// of processors in the machine is used. + /// + public int? MaxParallelThreads { get; set; } } }