Skip to content

Commit

Permalink
Refactor ProtoActor and workflow testing logic
Browse files Browse the repository at this point in the history
Reorganized ProtoActor configuration to be conditionally applied based on runtime settings. Updated test workflows to simplify signal handling and improved test structure by introducing scoped dependencies and removing unused event subscriptions.
  • Loading branch information
sfmskywalker committed Dec 14, 2024
1 parent 08f7c2b commit 4793c6e
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 52 deletions.
58 changes: 31 additions & 27 deletions src/apps/Elsa.Server.Web/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,33 +213,6 @@
management.UseReadOnlyMode(useReadOnlyMode);
management.AddVariableTypeAndAlias<OrderReceived>("Application");
})
.UseProtoActor(proto =>
{
proto
.EnableMetrics()
.EnableTracing();

proto.PersistenceProvider = _ =>
{
if (sqlDatabaseProvider == SqlDatabaseProvider.SqlServer)
return new SqlServerProvider(sqlServerConnectionString!, true, "", "proto_actor");
return new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString));
};

if (configuration["KUBERNETES_SERVICE_HOST"] != null)
{
var kubernetesConfig = new KubernetesProviderConfig();
var clusterProvider = new KubernetesProvider(kubernetesConfig);

var remoteConfig = GrpcNetRemoteConfig
.BindToAllInterfaces(advertisedHost: configuration["ProtoActor:AdvertisedHost"]) // Environment variable to be provided by Kubernetes using pod.status.podIP.
.WithLogLevelForDeserializationErrors(LogLevel.Critical)
.WithRemoteDiagnostics(true);

proto.CreateClusterProvider = _ => clusterProvider;
proto.ConfigureRemoteConfig = _ => remoteConfig;
}
})
.UseWorkflowRuntime(runtime =>
{
if (persistenceProvider == PersistenceProvider.MongoDb)
Expand Down Expand Up @@ -457,6 +430,37 @@
});
}

if (distributedCachingTransport == DistributedCachingTransport.ProtoActor || workflowRuntime == WorkflowRuntime.ProtoActor)
{
elsa.UseProtoActor(proto =>
{
proto
.EnableMetrics()
.EnableTracing();

proto.PersistenceProvider = _ =>
{
if (sqlDatabaseProvider == SqlDatabaseProvider.SqlServer)
return new SqlServerProvider(sqlServerConnectionString!, true, "", "proto_actor");
return new SqliteProvider(new SqliteConnectionStringBuilder(sqliteConnectionString));
};

if (configuration["KUBERNETES_SERVICE_HOST"] != null)
{
var kubernetesConfig = new KubernetesProviderConfig();
var clusterProvider = new KubernetesProvider(kubernetesConfig);

var remoteConfig = GrpcNetRemoteConfig
.BindToAllInterfaces(advertisedHost: configuration["ProtoActor:AdvertisedHost"]) // Environment variable to be provided by Kubernetes using pod.status.podIP.
.WithLogLevelForDeserializationErrors(LogLevel.Critical)
.WithRemoteDiagnostics(true);

proto.CreateClusterProvider = _ => clusterProvider;
proto.ConfigureRemoteConfig = _ => remoteConfig;
}
});
}

if (useAzureServiceBus)
{
elsa.UseAzureServiceBus(azureServiceBusConnectionString, asb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected override void ConfigureWebHost(IWebHostBuilder builder)
elsa.UseMassTransit(massTransit =>
{
//massTransit.UseRabbitMq(rabbitMqConnectionString);
massTransit.Services.AddSingleton<WorkflowDefinitionEvents>();
massTransit.AddConsumer<WorkflowDefinitionEventConsumer>("elsa-test-workflow-definition-updates", true);
});
elsa.UseIdentity(identity => identity.UseEntityFrameworkCore(ef => ef.UsePostgreSql(dbConnectionString)));
Expand Down Expand Up @@ -122,8 +123,8 @@ protected override void ConfigureWebHost(IWebHostBuilder builder)
{
services
.AddSingleton<SignalManager>()
.AddSingleton<WorkflowEvents>()
.AddSingleton<WorkflowDefinitionEvents>()
.AddScoped<WorkflowEvents>()
.AddScoped<WorkflowDefinitionEvents>()
.AddSingleton<TriggerChangeTokenSignalEvents>()
.AddScoped<IWorkflowMaterializer, TestWorkflowMaterializer>()
.AddNotificationHandlersFrom<WorkflowServer>()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using Elsa.Common.Models;
using Elsa.Testing.Shared;
using Elsa.Testing.Shared.Services;
using Elsa.Workflows.ComponentTests.Abstractions;
using Elsa.Workflows.ComponentTests.Fixtures;
Expand All @@ -13,17 +12,13 @@ namespace Elsa.Workflows.ComponentTests.Scenarios.BulkDispatchWorkflows;

public class BulkDispatchWorkflowsTests : AppComponentTest
{
private readonly WorkflowEvents _workflowEvents;
private readonly SignalManager _signalManager;
private readonly IWorkflowRuntime _workflowRuntime;
private readonly object _greetEmployeesWorkflowCompletedSignal = new();

public BulkDispatchWorkflowsTests(App app) : base(app)
{
_workflowRuntime = Scope.ServiceProvider.GetRequiredService<IWorkflowRuntime>();
_workflowEvents = Scope.ServiceProvider.GetRequiredService<WorkflowEvents>();
_signalManager = Scope.ServiceProvider.GetRequiredService<SignalManager>();
_workflowEvents.WorkflowInstanceSaved += OnWorkflowInstanceSaved;
}

/// <summary>
Expand All @@ -38,9 +33,7 @@ await workflowClient.CreateInstanceAsync(new CreateWorkflowInstanceRequest
WorkflowDefinitionHandle = WorkflowDefinitionHandle.ByDefinitionId(GreetEmployeesWorkflow.DefinitionId, VersionOptions.Published)
});
await workflowClient.RunInstanceAsync(RunWorkflowInstanceRequest.Empty);
var parentWorkflowInstanceArgs = await _signalManager.WaitAsync<WorkflowInstanceSavedEventArgs>(_greetEmployeesWorkflowCompletedSignal);

Assert.Equal(WorkflowStatus.Finished, parentWorkflowInstanceArgs.WorkflowInstance.Status);
await _signalManager.WaitAsync<string>("Completed");
}

/// <summary>
Expand All @@ -60,18 +53,4 @@ public async Task DispatchWorkflows_ChildWorkflowsShouldReceiveCurrentItem()
await _signalManager.WaitAsync("Banana");
await _signalManager.WaitAsync("Cherry");
}

private void OnWorkflowInstanceSaved(object? sender, WorkflowInstanceSavedEventArgs e)
{
if (e.WorkflowInstance.Status != WorkflowStatus.Finished)
return;

if (e.WorkflowInstance.DefinitionId == GreetEmployeesWorkflow.DefinitionId)
_signalManager.Trigger(_greetEmployeesWorkflowCompletedSignal, e);
}

protected override void OnDispose()
{
_workflowEvents.WorkflowInstanceSaved -= OnWorkflowInstanceSaved;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using Elsa.Testing.Shared.Activities;
using Elsa.Workflows.Activities;

namespace Elsa.Workflows.ComponentTests.Scenarios.BulkDispatchWorkflows.Workflows;
Expand Down Expand Up @@ -28,7 +29,8 @@ protected override void Build(IWorkflowBuilder builder)
WorkflowDefinitionId = new(EmployeeGreetingWorkflow.DefinitionId),
Items = new(inputEntries),
WaitForCompletion = new(true)
}
},
new TriggerSignal("Completed")
}
};
}
Expand Down

0 comments on commit 4793c6e

Please sign in to comment.