Skip to content

Commit

Permalink
Removing Saga from Dapr Workflows (#1216)
Browse files Browse the repository at this point in the history
  • Loading branch information
artur-ciocanu authored Feb 28, 2025
1 parent bd3a54d commit 510679e
Show file tree
Hide file tree
Showing 24 changed files with 29 additions and 1,582 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.junit.jupiter.api.Assertions.*;

/**
Expand Down Expand Up @@ -65,7 +65,7 @@ public class DaprKeyValueRepositoryIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("postgresql-repository-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
Expand All @@ -39,14 +37,14 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -82,7 +80,7 @@ public class MySQLDaprKeyValueTemplateIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("mysql-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.mysql", "v1", STATE_STORE_PROPERTIES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
import org.springframework.data.keyvalue.core.query.KeyValueQuery;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
Expand All @@ -38,6 +36,7 @@

import static io.dapr.it.spring.data.DaprSpringDataConstants.BINDING_NAME;
import static io.dapr.it.spring.data.DaprSpringDataConstants.STATE_STORE_NAME;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

Expand Down Expand Up @@ -68,7 +67,7 @@ public class PostgreSQLDaprKeyValueTemplateIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("postgresql-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component(STATE_STORE_NAME, "state.postgresql", "v1", STATE_STORE_PROPERTIES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Collections;
import java.util.List;

import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest(
Expand All @@ -60,7 +61,7 @@ public class DaprSpringMessagingIT {

@Container
@ServiceConnection
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("messaging-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.dapr.it.testcontainers;

public interface DaprContainerConstants {
String IMAGE_TAG = "daprio/daprd:1.14.1";
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -61,7 +62,7 @@ public class DaprContainerIT {
private static final String PUBSUB_TOPIC_NAME = "topic";

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("dapr-app")
.withAppPort(8081)
.withAppChannelAddress("host.testcontainers.internal");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Collections;
import java.util.Map;

import static io.dapr.it.testcontainers.DaprContainerConstants.IMAGE_TAG;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand All @@ -56,7 +57,7 @@ public class DaprWorkflowsIT {
private static final Network DAPR_NETWORK = Network.newNetwork();

@Container
private static final DaprContainer DAPR_CONTAINER = new DaprContainer("daprio/daprd:1.13.2")
private static final DaprContainer DAPR_CONTAINER = new DaprContainer(IMAGE_TAG)
.withAppName("workflow-dapr-app")
.withNetwork(DAPR_NETWORK)
.withComponent(new Component("kvstore", "state.in-memory", "v1",
Expand Down
44 changes: 1 addition & 43 deletions sdk-workflows/src/main/java/io/dapr/workflows/Workflow.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@

package io.dapr.workflows;

import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
import io.dapr.workflows.saga.SagaCompensationException;
import io.dapr.workflows.saga.SagaOptions;

/**
* Common interface for workflow implementations.
*/
Expand All @@ -39,43 +34,6 @@ public interface Workflow {
default void run(WorkflowContext ctx) {
WorkflowStub stub = this.create();

if (!this.isSagaEnabled()) {
// saga disabled
stub.run(ctx);
} else {
// saga enabled
try {
stub.run(ctx);
} catch (OrchestratorBlockedException | ContinueAsNewInterruption e) {
throw e;
} catch (SagaCompensationException e) {
// Saga compensation is triggered gracefully but failed in exception
// don't need to trigger compensation again
throw e;
} catch (Exception e) {
try {
ctx.getSagaContext().compensate();
} catch (Exception se) {
se.addSuppressed(e);
throw se;
}

throw e;
}
}
}

default boolean isSagaEnabled() {
return this.getSagaOption() != null;
}

/**
* get saga configuration.
*
* @return saga configuration
*/
default SagaOptions getSagaOption() {
// by default, saga is disabled
return null;
stub.run(ctx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import com.microsoft.durabletask.TaskFailedException;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -530,12 +529,4 @@ default void continueAsNew(Object input) {
default UUID newUuid() {
throw new RuntimeException("No implementation found.");
}

/**
* get saga context.
*
* @return saga context
* @throws UnsupportedOperationException if saga is not enabled.
*/
SagaContext getSagaContext();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import io.dapr.workflows.WorkflowContext;
import io.dapr.workflows.WorkflowTaskOptions;
import io.dapr.workflows.WorkflowTaskRetryPolicy;
import io.dapr.workflows.runtime.saga.DefaultSagaContext;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.NOPLogger;
Expand All @@ -39,7 +36,6 @@
public class DefaultWorkflowContext implements WorkflowContext {
private final TaskOrchestrationContext innerContext;
private final Logger logger;
private final Saga saga;

/**
* Constructor for DaprWorkflowContextImpl.
Expand All @@ -58,23 +54,7 @@ public DefaultWorkflowContext(TaskOrchestrationContext context) throws IllegalAr
* @param logger Logger
* @throws IllegalArgumentException if context or logger is null
*/
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
this(context, logger, null);
}

public DefaultWorkflowContext(TaskOrchestrationContext context, Saga saga) throws IllegalArgumentException {
this(context, LoggerFactory.getLogger(WorkflowContext.class), saga);
}

/**
* Constructor for DaprWorkflowContextImpl.
*
* @param context TaskOrchestrationContext
* @param logger Logger
* @param saga saga object, if null, saga is disabled
* @throws IllegalArgumentException if context or logger is null
*/
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, Saga saga)
public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger)
throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
Expand All @@ -85,7 +65,6 @@ public DefaultWorkflowContext(TaskOrchestrationContext context, Logger logger, S

this.innerContext = context;
this.logger = logger;
this.saga = saga;
}

/**
Expand Down Expand Up @@ -249,15 +228,6 @@ public UUID newUuid() {
return this.innerContext.newUUID();
}

@Override
public SagaContext getSagaContext() {
if (this.saga == null) {
throw new UnsupportedOperationException("Saga is not enabled");
}

return new DefaultSagaContext(this.saga, this);
}

private static TaskOptions toTaskOptions(WorkflowTaskOptions options) {
if (options == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
Expand All @@ -30,6 +29,7 @@ class WorkflowClassWrapper<T extends Workflow> implements TaskOrchestrationFacto

public WorkflowClassWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();

try {
this.workflowConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
Expand All @@ -48,6 +48,7 @@ public String getName() {
public TaskOrchestration create() {
return ctx -> {
T workflow;

try {
workflow = this.workflowConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
Expand All @@ -56,13 +57,7 @@ public TaskOrchestration create() {
);
}

if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
workflow.run(new DefaultWorkflowContext(ctx));
};

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import com.microsoft.durabletask.TaskOrchestration;
import com.microsoft.durabletask.TaskOrchestrationFactory;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.saga.Saga;

/**
* Wrapper for Durable Task Framework orchestration factory.
Expand All @@ -37,13 +36,6 @@ public String getName() {

@Override
public TaskOrchestration create() {
return ctx -> {
if (workflow.getSagaOption() != null) {
Saga saga = new Saga(workflow.getSagaOption());
workflow.run(new DefaultWorkflowContext(ctx, saga));
} else {
workflow.run(new DefaultWorkflowContext(ctx));
}
};
return ctx -> workflow.run(new DefaultWorkflowContext(ctx));
}
}
Loading

0 comments on commit 510679e

Please sign in to comment.