Skip to content

Commit

Permalink
Removing Saga from Dapr Workflows
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Ciocanu <ciocanu@adobe.com>
  • Loading branch information
Artur Ciocanu committed Feb 28, 2025
1 parent bd3a54d commit 4c130c6
Show file tree
Hide file tree
Showing 17 changed files with 12 additions and 1,570 deletions.
44 changes: 1 addition & 43 deletions sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@

package io.dapr.workflows;

import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOptions;

/**
* Common interface for workflow implementations.
*/
Expand All @@ -39,43 +34,6 @@ public interface Workflow {
default void run(WorkflowContext ctx) {
WorkflowStub stub = this.create();

if (!this.isSagaEnabled()) {
// saga disabled
stub.run(ctx);
} else {
// saga enabled
try {
stub.run(ctx);
} catch (OrchestratorBlockedException | ContinueAsNewInterruption e) {
throw e;
} catch (SagaCompensationException e) {
// Saga compensation is triggered gracefully but failed in exception
// don't need to trigger compensation again
throw e;
} catch (Exception e) {
try {
ctx.getSagaContext().compensate();
} catch (Exception se) {
se.addSuppressed(e);
throw se;
}

throw e;
}
}
}

default boolean isSagaEnabled() {
return this.getSagaOption() != null;
}

/**
* get saga configuration.
*
* @return saga configuration
*/
default SagaOptions getSagaOption() {
// by default, saga is disabled
return null;
stub.run(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -530,12 +529,4 @@ default void continueAsNew(Object input) {
default UUID newUuid() {
throw new RuntimeException("No implementation found.");
}

/**
* get saga context.
*
* @return saga context
* @throws UnsupportedOperationException if saga is not enabled.
*/
SagaContext getSagaContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import io.dapr.workflows.runtime.saga.DefaultSagaContext;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.NOPLogger;
Expand All @@ -39,7 +36,6 @@
public class DefaultWorkflowContext implements WorkflowContext {
private final TaskOrchestrationContext innerContext;
private final Logger logger;
private final Saga saga;

/**
* Constructor for DaprWorkflowContextImpl.
Expand All @@ -58,23 +54,7 @@ public DefaultWorkflowContext(TaskOrchestrationContext context) throws IllegalAr
* @param logger Logger
* @throws IllegalArgumentException if context or logger is null
*/
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
this(context, logger, null);
}

public DefaultWorkflowContext(TaskOrchestrationContext context, Saga saga) throws IllegalArgumentException {
this(context, LoggerFactory.getLogger(WorkflowContext.class), saga);
}

/**
* Constructor for DaprWorkflowContextImpl.
*
* @param context TaskOrchestrationContext
* @param logger Logger
* @param saga saga object, if null, saga is disabled
* @throws IllegalArgumentException if context or logger is null
*/
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, Saga saga)
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger)
throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
Expand All @@ -85,7 +65,6 @@ public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, S

this.innerContext = context;
this.logger = logger;
this.saga = saga;
}

/**
Expand Down Expand Up @@ -249,15 +228,6 @@ public UUID newUuid() {
return this.innerContext.newUUID();
}

@Override
public SagaContext getSagaContext() {
if (this.saga == null) {
throw new UnsupportedOperationException("Saga is not enabled");
}

return new DefaultSagaContext(this.saga, this);
}

private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -30,6 +29,7 @@ class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFacto

public WorkflowClassWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();

try {
this.workflowConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
Expand All @@ -48,6 +48,7 @@ public String getName() {
public TaskOrchestration create() {
return ctx -> {
T workflow;

try {
workflow = this.workflowConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
Expand All @@ -56,13 +57,7 @@ public TaskOrchestration create() {
);
}

if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
workflow.run(new DefaultWorkflowContext(ctx));
};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

/**
* Wrapper for Durable Task Framework orchestration factory.
Expand All @@ -37,13 +36,6 @@ public String getName() {

@Override
public TaskOrchestration create() {
return ctx -> {
if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
};
return ctx -> workflow.run(new DefaultWorkflowContext(ctx));
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 4c130c6

Please sign in to comment.