Skip to content

Commit

Permalink
Expose instance to workflows/activities and client to activities (#393)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jan 16, 2025
1 parent 9673804 commit 3dd6cca
Show file tree
Hide file tree
Showing 12 changed files with 378 additions and 56 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ jobs:
- os: ubuntu-latest
docsTarget: true
cloudTestTarget: true
# This is here alongside docsTarget because newer docfx doesn't work
# with .NET 6.
dotNetVersionOverride: |
6.x
8.x
- os: ubuntu-arm
runsOn: ubuntu-24.04-arm64-2-core
- os: macos-intel
Expand Down
68 changes: 68 additions & 0 deletions src/Temporalio.Extensions.Hosting/ActivityScope.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Threading;
using Microsoft.Extensions.DependencyInjection;

namespace Temporalio.Extensions.Hosting
{
/// <summary>
/// Information and ability to control the activity DI scope.
/// </summary>
public static class ActivityScope
{
private static readonly AsyncLocal<IServiceScope?> ServiceScopeLocal = new();
private static readonly AsyncLocal<object?> ScopedInstanceLocal = new();

/// <summary>
/// Gets or sets the current scope for this activity.
/// </summary>
/// <remarks>
/// This is backed by an async local. By default, when the activity invocation starts
/// (meaning inside the interceptor, not before), a new service scope is created and set on
/// this value. This means it will not be present in the primary execute-activity
/// interceptor
/// (<see cref="Worker.Interceptors.ActivityInboundInterceptor.ExecuteActivityAsync"/>) call
/// but will be available everywhere else the ActivityExecutionContext is. When set by the
/// internal code, it is also disposed by the internal code. See the next remark for how to
/// control the scope.
/// </remarks>
/// <remarks>
/// In situations where a user wants to control the service scope from the primary
/// execute-activity interceptor, this can be set to the result of <c>CreateScope</c> or
/// <c>CreateAsyncScope</c> of a service provider. The internal code will then use this
/// instead of creating its own, and will therefore not dispose it. This should never be set
/// anywhere but inside the primary execute-activity interceptor, and it no matter the value
/// it will be set to null before the <c>base</c> call returns from the primary
/// execute-activity interceptor.
/// </remarks>
public static IServiceScope? ServiceScope
{
get => ServiceScopeLocal.Value;
set => ServiceScopeLocal.Value = value;
}

/// <summary>
/// Gets or sets the scoped instance for non-static activity methods.
/// </summary>
/// <remarks>
/// This is backed by an async local. By default, when the activity invocation starts
/// (meaning inside the interceptor, not before) for a non-static method, an instance is
/// obtained from the service provider and set on this value. This means it will not be
/// present in the primary execute-activity interceptor
/// (<see cref="Worker.Interceptors.ActivityInboundInterceptor.ExecuteActivityAsync"/>) call
/// but will be available everywhere else the ActivityExecutionContext is. See the next
/// remark for how to control the instance.
/// </remarks>
/// <remarks>
/// In situations where a user wants to control the instance from the primary
/// execute-activity interceptor, this can be set to the result of <c>GetRequiredService</c>
/// of a service provider. The internal code will then use this instead of creating its own.
/// This should never be set anywhere but inside the primary execute-activity interceptor,
/// and it no matter the value it will be set to null before the <c>base</c> call returns
/// from the primary execute-activity interceptor.
/// </remarks>
public static object? ScopedInstance
{
get => ScopedInstanceLocal.Value;
set => ScopedInstanceLocal.Value = value;
}
}
}
37 changes: 30 additions & 7 deletions src/Temporalio.Extensions.Hosting/ServiceProviderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,31 @@ public static ActivityDefinition CreateTemporalActivityDefinition(
// Invoker can be async (i.e. returns Task<object?>)
async Task<object?> Invoker(object?[] args)
{
// Wrap in a scope (even for statics to keep logic simple)
// Wrap in a scope if scope doesn't already exist. Keep track of whether we created
// it so we can dispose of it.
var scope = ActivityScope.ServiceScope;
var createdScopeOurselves = scope == null;
if (scope == null)
{
#if NET6_0_OR_GREATER
var scope = provider.CreateAsyncScope();
scope = provider.CreateAsyncScope();
#else
var scope = provider.CreateScope();
scope = provider.CreateScope();
#endif
ActivityScope.ServiceScope = scope;
}

// Run
try
{
object? result;
try
{
// Invoke static or non-static
// Create the instance if not static and not already created
var instance = method.IsStatic
? null
: scope.ServiceProvider.GetRequiredService(instanceType);
: ActivityScope.ScopedInstance ?? scope.ServiceProvider.GetRequiredService(instanceType);
ActivityScope.ScopedInstance = instance;

result = method.Invoke(instance, args);
}
Expand Down Expand Up @@ -111,11 +121,24 @@ public static ActivityDefinition CreateTemporalActivityDefinition(
}
finally
{
// Dispose of scope if we created it
if (createdScopeOurselves)
{
#if NET6_0_OR_GREATER
await scope.DisposeAsync().ConfigureAwait(false);
if (scope is AsyncServiceScope asyncScope)
{
await asyncScope.DisposeAsync().ConfigureAwait(false);
}
else
{
scope.Dispose();
}
#else
scope.Dispose();
scope.Dispose();
#endif
}
ActivityScope.ServiceScope = null;
ActivityScope.ScopedInstance = null;
}
}
return ActivityDefinition.Create(method, Invoker);
Expand Down
19 changes: 18 additions & 1 deletion src/Temporalio/Activities/ActivityExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Threading;
using Google.Protobuf;
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Common;
using Temporalio.Converters;

Expand All @@ -16,6 +17,7 @@ namespace Temporalio.Activities
public class ActivityExecutionContext
{
private readonly Lazy<MetricMeter> metricMeter;
private readonly ITemporalClient? temporalClient;

/// <summary>
/// Initializes a new instance of the <see cref="ActivityExecutionContext"/> class.
Expand All @@ -27,6 +29,7 @@ public class ActivityExecutionContext
/// <param name="logger">Logger.</param>
/// <param name="payloadConverter">Payload converter.</param>
/// <param name="runtimeMetricMeter">Runtime-level metric meter.</param>
/// <param name="temporalClient">Temporal client.</param>
#pragma warning disable CA1068 // We don't require cancellation token as last param
internal ActivityExecutionContext(
ActivityInfo info,
Expand All @@ -35,7 +38,8 @@ internal ActivityExecutionContext(
ByteString taskToken,
ILogger logger,
IPayloadConverter payloadConverter,
Lazy<MetricMeter> runtimeMetricMeter)
Lazy<MetricMeter> runtimeMetricMeter,
ITemporalClient? temporalClient)
{
Info = info;
CancellationToken = cancellationToken;
Expand All @@ -52,6 +56,7 @@ internal ActivityExecutionContext(
{ "activity_type", info.ActivityType },
});
});
this.temporalClient = temporalClient;
}
#pragma warning restore CA1068

Expand Down Expand Up @@ -107,6 +112,18 @@ internal ActivityExecutionContext(
/// </summary>
public MetricMeter MetricMeter => metricMeter.Value;

/// <summary>
/// Gets the Temporal client for use within the activity.
/// </summary>
/// <exception cref="InvalidOperationException">If this is running in a
/// <see cref="Testing.ActivityEnvironment"/> and no client was provided.</exception>
/// <exception cref="InvalidOperationException">If the client the worker was created with is
/// not an <c>ITemporalClient</c>.</exception>
public ITemporalClient TemporalClient => temporalClient ??
throw new InvalidOperationException("No Temporal client available. " +
"This could either be a test environment without a client set, or the worker was " +
"created in an advanced way without an ITemporalClient instance.");

/// <summary>
/// Gets the async local current value.
/// </summary>
Expand Down
10 changes: 9 additions & 1 deletion src/Temporalio/Testing/ActivityEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using Microsoft.Extensions.Logging.Abstractions;
using Temporalio.Activities;
using Temporalio.Api.Common.V1;
using Temporalio.Client;
using Temporalio.Common;
using Temporalio.Converters;

Expand Down Expand Up @@ -60,6 +61,12 @@ public record ActivityEnvironment
/// </summary>
public MetricMeter? MetricMeter { get; init; }

/// <summary>
/// Gets or inits the Temporal client accessible from the activity context. If unset, an
/// exception is thrown when the client is accessed.
/// </summary>
public ITemporalClient? TemporalClient { get; init; }

/// <summary>
/// Gets or sets the cancel reason. Callers may prefer <see cref="Cancel" /> instead.
/// </summary>
Expand Down Expand Up @@ -134,7 +141,8 @@ public async Task<T> RunAsync<T>(Func<Task<T>> activity)
taskToken: ByteString.Empty,
logger: Logger,
payloadConverter: PayloadConverter,
runtimeMetricMeter: new(() => MetricMeter ?? MetricMeterNoop.Instance))
runtimeMetricMeter: new(() => MetricMeter ?? MetricMeterNoop.Instance),
temporalClient: TemporalClient)
{
Heartbeater = Heartbeater,
CancelReasonRef = CancelReasonRef,
Expand Down
4 changes: 3 additions & 1 deletion src/Temporalio/Worker/ActivityWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Google.Protobuf.WellKnownTypes;
using Microsoft.Extensions.Logging;
using Temporalio.Activities;
using Temporalio.Client;
using Temporalio.Converters;
using Temporalio.Exceptions;
using Temporalio.Worker.Interceptors;
Expand Down Expand Up @@ -191,7 +192,8 @@ private void StartActivity(Bridge.Api.ActivityTask.ActivityTask tsk)
taskToken: tsk.TaskToken,
logger: worker.LoggerFactory.CreateLogger($"Temporalio.Activity:{info.ActivityType}"),
payloadConverter: worker.Client.Options.DataConverter.PayloadConverter,
runtimeMetricMeter: worker.MetricMeter);
runtimeMetricMeter: worker.MetricMeter,
temporalClient: worker.Client as ITemporalClient);

// Start task
using (context.Logger.BeginScope(info.LoggerScope))
Expand Down
27 changes: 13 additions & 14 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ public WorkflowUpdateDefinition? DynamicUpdate
/// <inheritdoc />
public WorkflowInfo Info { get; private init; }

/// <inheritdoc />
///
/// This is lazily created and should never be called outside of the scheduler
public object Instance
{
get
{
// We create this lazily because we want the constructor in a workflow context
instance ??= Definition.CreateWorkflowInstance(startArgs!.Value);
return instance;
}
}

/// <inheritdoc />
public bool IsReplaying { get; private set; }

Expand Down Expand Up @@ -322,20 +335,6 @@ public WorkflowUpdateDefinition? DynamicUpdate
/// </summary>
internal WorkflowDefinition Definition { get; private init; }

/// <summary>
/// Gets the instance, lazily creating if needed. This should never be called outside this
/// scheduler.
/// </summary>
private object Instance
{
get
{
// We create this lazily because we want the constructor in a workflow context
instance ??= Definition.CreateWorkflowInstance(startArgs!.Value);
return instance;
}
}

/// <inheritdoc/>
public ContinueAsNewException CreateContinueAsNewException(
string workflow, IReadOnlyCollection<object?> args, ContinueAsNewOptions? options) =>
Expand Down
5 changes: 5 additions & 0 deletions src/Temporalio/Workflows/IWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ internal interface IWorkflowContext
/// </summary>
WorkflowInfo Info { get; }

/// <summary>
/// Gets value for <see cref="Workflow.Instance" />.
/// </summary>
object Instance { get; }

/// <summary>
/// Gets a value indicating whether <see cref="Workflow.Unsafe.IsReplaying" /> is true.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public static WorkflowUpdateDefinition? DynamicUpdate
/// </summary>
public static WorkflowInfo Info => Context.Info;

/// <summary>
/// Gets the instance of the current workflow class.
/// </summary>
public static object Instance => Context.Instance;

/// <summary>
/// Gets a value indicating whether this code is currently running in a workflow.
/// </summary>
Expand Down
Loading

0 comments on commit 3dd6cca

Please sign in to comment.