diff --git a/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java b/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java index 5547d72f48..8629982a90 100644 --- a/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java +++ b/dapr-spring/dapr-spring-workflows/src/main/java/io/dapr/spring/workflows/config/DaprWorkflowsConfiguration.java @@ -17,7 +17,7 @@ public class DaprWorkflowsConfiguration implements ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(DaprWorkflowsConfiguration.class); - private WorkflowRuntimeBuilder workflowRuntimeBuilder; + private final WorkflowRuntimeBuilder workflowRuntimeBuilder; public DaprWorkflowsConfiguration(WorkflowRuntimeBuilder workflowRuntimeBuilder) { this.workflowRuntimeBuilder = workflowRuntimeBuilder; @@ -29,16 +29,21 @@ public DaprWorkflowsConfiguration(WorkflowRuntimeBuilder workflowRuntimeBuilder) */ private void registerWorkflowsAndActivities(ApplicationContext applicationContext) { LOGGER.info("Registering Dapr Workflows and Activities"); + Map workflowBeans = applicationContext.getBeansOfType(Workflow.class); - for (Workflow w : workflowBeans.values()) { - LOGGER.info("Dapr Workflow: '{}' registered", w.getClass().getName()); - workflowRuntimeBuilder.registerWorkflow(w.getClass()); + + for (Workflow workflow : workflowBeans.values()) { + LOGGER.info("Dapr Workflow: '{}' registered", workflow.getClass().getName()); + + workflowRuntimeBuilder.registerWorkflow(workflow); } Map workflowActivitiesBeans = applicationContext.getBeansOfType(WorkflowActivity.class); - for (WorkflowActivity a : workflowActivitiesBeans.values()) { - LOGGER.info("Dapr Workflow Activity: '{}' registered", a.getClass().getName()); - workflowRuntimeBuilder.registerActivity(a.getClass()); + + for (WorkflowActivity activity : workflowActivitiesBeans.values()) { + LOGGER.info("Dapr Workflow Activity: '{}' registered", activity.getClass().getName()); + + workflowRuntimeBuilder.registerActivity(activity); } try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) { diff --git a/daprdocs/content/en/java-sdk-docs/spring-boot/_index.md b/daprdocs/content/en/java-sdk-docs/spring-boot/_index.md index 204104a7cd..74f988fb4a 100644 --- a/daprdocs/content/en/java-sdk-docs/spring-boot/_index.md +++ b/daprdocs/content/en/java-sdk-docs/spring-boot/_index.md @@ -122,7 +122,7 @@ Besides the previous configuration (`DaprTestContainersConfig`) your tests shoul The Java SDK allows you to interface with all of the [Dapr building blocks]({{< ref building-blocks >}}). But if you want to leverage the Spring and Spring Boot programming model you can use the `dapr-spring-boot-starter` integration. This includes implementations of Spring Data (`KeyValueTemplate` and `CrudRepository`) as well as a `DaprMessagingTemplate` for producing and consuming messages -(similar to [Spring Kafka](https://spring.io/projects/spring-kafka), [Spring Pulsar](https://spring.io/projects/spring-pulsar) and [Spring AMQP for RabbitMQ](https://spring.io/projects/spring-amqp)). +(similar to [Spring Kafka](https://spring.io/projects/spring-kafka), [Spring Pulsar](https://spring.io/projects/spring-pulsar) and [Spring AMQP for RabbitMQ](https://spring.io/projects/spring-amqp)) and Dapr workflows. ## Using Spring Data `CrudRepository` and `KeyValueTemplate` @@ -277,6 +277,53 @@ public static void setup(){ You can check and run the [full example source code here](https://github.com/salaboy/dapr-spring-boot-docs-examples). +## Using Dapr Workflows with Spring Boot + +Following the same approach that we used for Spring Data and Spring Messaging, the `dapr-spring-boot-starter` brings Dapr Workflow integration for Spring Boot users. + +To work with Dapr Workflows you need to define and implement your workflows using code. The Dapr Spring Boot Starter makes your life easier by managing `Workflow`s and `WorkflowActivity`s as Spring beans. + +In order to enable the automatic bean discovery you can annotate your `@SpringBootApplication` with the `@EnableDaprWorkflows` annotation: + +``` +@SpringBootApplication +@EnableDaprWorkflows +public class MySpringBootApplication {} +``` + +By adding this annotation, all the `WorkflowActivity`s will be automatically managed by Spring and registered to the workflow engine. + +By having all `WorkflowActivity`s as managed beans we can use Spring `@Autowired` mechanism to inject any bean that our workflow activity might need to implement its functionality, for example the `@RestTemplate`: + +``` +public class MyWorkflowActivity implements WorkflowActivity { + + @Autowired + private RestTemplate restTemplate; +``` + +You can also `@Autowired` the `DaprWorkflowClient` to create new instances of your workflows. + +``` +@Autowired +private DaprWorkflowClient daprWorkflowClient; +``` + +This enable applications to schedule new workflow instances and raise events. + +``` +String instanceId = daprWorkflowClient.scheduleNewWorkflow(MyWorkflow.class, payload); +``` + +and + +``` +daprWorkflowClient.raiseEvent(instanceId, "MyEvenet", event); +``` + +Check the [Dapr Workflow documentation](https://docs.dapr.io/developing-applications/building-blocks/workflow/workflow-overview/) for more information about how to work with Dapr Workflows. + + ## Next steps Learn more about the [Dapr Java SDK packages available to add to your Java applications](https://dapr.github.io/java-sdk/). diff --git a/examples/pom.xml b/examples/pom.xml index 4611c69b87..c4d5698db3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -134,6 +134,11 @@ protobuf-java ${protobuf.version} + + com.squareup.okhttp3 + okhttp + 4.12.0 + diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java index cc250113d6..b2da80a409 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java @@ -19,12 +19,13 @@ import jakarta.servlet.DispatcherType; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; -import org.jetbrains.annotations.Nullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.servlet.HandlerInterceptor; import org.springframework.web.servlet.ModelAndView; +import javax.annotation.Nullable; + import java.util.Collections; @Component diff --git a/sdk-actors/src/test/java/io/dapr/client/DaprHttpProxy.java b/sdk-actors/src/test/java/io/dapr/client/DaprHttpProxy.java deleted file mode 100644 index a1c263dd9f..0000000000 --- a/sdk-actors/src/test/java/io/dapr/client/DaprHttpProxy.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright 2021 The Dapr Authors - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and -limitations under the License. -*/ - -package io.dapr.client; - -import okhttp3.OkHttpClient; - -public class DaprHttpProxy extends io.dapr.client.DaprHttp { - - public DaprHttpProxy(String hostname, int port, String daprApiToken, OkHttpClient httpClient) { - super(hostname, port, daprApiToken, httpClient); - } - -} diff --git a/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java b/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java index 13d0954703..c7dd5ccc56 100644 --- a/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/resiliency/WaitForSidecarIT.java @@ -21,8 +21,8 @@ import java.time.Duration; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; /** * Test SDK resiliency. @@ -43,7 +43,7 @@ public class WaitForSidecarIT extends BaseIT { @BeforeAll public static void init() throws Exception { daprRun = startDaprApp(WaitForSidecarIT.class.getSimpleName(), 5000); - daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName()+"NotRunning", 5000); + daprNotRunning = startDaprApp(WaitForSidecarIT.class.getSimpleName() + "NotRunning", 5000); daprNotRunning.stop(); toxiProxyRun = new ToxiProxyRun(daprRun, LATENCY, JITTER); @@ -61,24 +61,30 @@ public void waitSucceeds() throws Exception { public void waitTimeout() { int timeoutInMillis = (int)LATENCY.minusMillis(100).toMillis(); long started = System.currentTimeMillis(); + assertThrows(RuntimeException.class, () -> { try(var client = toxiProxyRun.newDaprClientBuilder().build()) { client.waitForSidecar(timeoutInMillis).block(); } }); + long duration = System.currentTimeMillis() - started; - assertTrue(duration >= timeoutInMillis); + + assertThat(duration).isGreaterThanOrEqualTo(timeoutInMillis); } @Test public void waitSlow() throws Exception { int timeoutInMillis = (int)LATENCY.plusMillis(100).toMillis(); long started = System.currentTimeMillis(); + try(var client = toxiProxyRun.newDaprClientBuilder().build()) { client.waitForSidecar(timeoutInMillis).block(); } + long duration = System.currentTimeMillis() - started; - assertTrue(duration >= LATENCY.toMillis()); + + assertThat(duration).isGreaterThanOrEqualTo(LATENCY.toMillis()); } @Test @@ -87,12 +93,15 @@ public void waitNotRunningTimeout() { // This has to do with a previous bug in the implementation. int timeoutMilliseconds = 5000; long started = System.currentTimeMillis(); + assertThrows(RuntimeException.class, () -> { try(var client = daprNotRunning.newDaprClientBuilder().build()) { client.waitForSidecar(timeoutMilliseconds).block(); } }); + long duration = System.currentTimeMillis() - started; - assertTrue(duration >= timeoutMilliseconds); + + assertThat(duration).isGreaterThanOrEqualTo(timeoutMilliseconds); } } diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java index fc03f4412a..1e41186df4 100644 --- a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/DaprSpringMessagingIT.java @@ -23,16 +23,14 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Disabled; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.testcontainers.service.connection.ServiceConnection; -import org.springframework.test.context.DynamicPropertyRegistry; -import org.springframework.test.context.DynamicPropertySource; import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -56,8 +54,9 @@ public class DaprSpringMessagingIT { private static final Logger logger = LoggerFactory.getLogger(DaprSpringMessagingIT.class); private static final String TOPIC = "mockTopic"; - private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final int APP_PORT = 8080; + private static final String SUBSCRIPTION_MESSAGE_PATTERN = ".*app is subscribed to the following topics.*"; @Container @ServiceConnection @@ -65,7 +64,8 @@ public class DaprSpringMessagingIT { .withAppName("messaging-dapr-app") .withNetwork(DAPR_NETWORK) .withComponent(new Component("pubsub", "pubsub.in-memory", "v1", Collections.emptyMap())) - .withAppPort(8080) + .withAppPort(APP_PORT) + .withAppHealthCheckPath("/ready") .withDaprLogLevel(DaprLogLevel.DEBUG) .withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String())) .withAppChannelAddress("host.testcontainers.internal"); @@ -78,16 +78,16 @@ public class DaprSpringMessagingIT { @BeforeAll public static void beforeAll(){ - org.testcontainers.Testcontainers.exposeHostPorts(8080); + org.testcontainers.Testcontainers.exposeHostPorts(APP_PORT); } @BeforeEach - public void beforeEach() throws InterruptedException { - Thread.sleep(1000); + public void beforeEach() { + // Ensure the subscriptions are registered + Wait.forLogMessage(SUBSCRIPTION_MESSAGE_PATTERN, 1).waitUntilReady(DAPR_CONTAINER); } @Test - @Disabled("Test is flaky due to global state in the spring test application.") public void testDaprMessagingTemplate() throws InterruptedException { for (int i = 0; i < 10; i++) { var msg = "ProduceAndReadWithPrimitiveMessageType:" + i; diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestRestController.java b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestRestController.java index a5d12093c0..963cf6b3f6 100644 --- a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestRestController.java +++ b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestRestController.java @@ -33,7 +33,7 @@ public class TestRestController { private static final Logger LOG = LoggerFactory.getLogger(TestRestController.class); private final List> events = new ArrayList<>(); - @GetMapping("/") + @GetMapping("/ready") public String ok() { return "OK"; } diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java similarity index 92% rename from sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityWrapper.java rename to sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java index 18f4eb55de..3dcb8ef6b6 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapper.java @@ -23,7 +23,7 @@ /** * Wrapper for Durable Task Framework task activity factory. */ -public class WorkflowActivityWrapper implements TaskActivityFactory { +public class WorkflowActivityClassWrapper implements TaskActivityFactory { private final Constructor activityConstructor; private final String name; @@ -32,7 +32,7 @@ public class WorkflowActivityWrapper implements Task * * @param clazz Class of the activity to wrap. */ - public WorkflowActivityWrapper(Class clazz) { + public WorkflowActivityClassWrapper(Class clazz) { this.name = clazz.getCanonicalName(); try { this.activityConstructor = clazz.getDeclaredConstructor(); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java new file mode 100644 index 0000000000..17d509924e --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapper.java @@ -0,0 +1,46 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskActivity; +import com.microsoft.durabletask.TaskActivityFactory; +import io.dapr.workflows.WorkflowActivity; + +/** + * Wrapper for Durable Task Framework task activity factory. + */ +public class WorkflowActivityInstanceWrapper implements TaskActivityFactory { + private final T activity; + private final String name; + + /** + * Constructor for WorkflowActivityWrapper. + * + * @param instance Instance of the activity to wrap. + */ + public WorkflowActivityInstanceWrapper(T instance) { + this.name = instance.getClass().getCanonicalName(); + this.activity = instance; + } + + @Override + public String getName() { + return name; + } + + @Override + public TaskActivity create() { + return ctx -> activity.run(new DefaultWorkflowActivityContext(ctx)); + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java similarity index 93% rename from sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowWrapper.java rename to sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java index 91f1dd8bce..9c0ed95a6d 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowWrapper.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowClassWrapper.java @@ -24,11 +24,11 @@ /** * Wrapper for Durable Task Framework orchestration factory. */ -class WorkflowWrapper implements TaskOrchestrationFactory { +class WorkflowClassWrapper implements TaskOrchestrationFactory { private final Constructor workflowConstructor; private final String name; - public WorkflowWrapper(Class clazz) { + public WorkflowClassWrapper(Class clazz) { this.name = clazz.getCanonicalName(); try { this.workflowConstructor = clazz.getDeclaredConstructor(); diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java new file mode 100644 index 0000000000..bda34d5974 --- /dev/null +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowInstanceWrapper.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskOrchestration; +import com.microsoft.durabletask.TaskOrchestrationFactory; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.saga.Saga; + +/** + * Wrapper for Durable Task Framework orchestration factory. + */ +class WorkflowInstanceWrapper implements TaskOrchestrationFactory { + private final T workflow; + private final String name; + + public WorkflowInstanceWrapper(T instance) { + this.name = instance.getClass().getCanonicalName(); + this.workflow = instance; + } + + @Override + public String getName() { + return name; + } + + @Override + public TaskOrchestration create() { + return ctx -> { + if (workflow.getSagaOption() != null) { + Saga saga = new Saga(workflow.getSagaOption()); + workflow.run(new DefaultWorkflowContext(ctx, saga)); + } else { + workflow.run(new DefaultWorkflowContext(ctx)); + } + }; + } +} diff --git a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java index 86d0cf1e09..397e58b30f 100644 --- a/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java +++ b/sdk-workflows/src/main/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilder.java @@ -92,11 +92,30 @@ public WorkflowRuntime build() { * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerWorkflow(Class clazz) { - this.builder.addOrchestration(new WorkflowWrapper<>(clazz)); + this.builder.addOrchestration(new WorkflowClassWrapper<>(clazz)); this.workflowSet.add(clazz.getCanonicalName()); this.workflows.add(clazz.getSimpleName()); - this.logger.info("Registered Workflow: " + clazz.getSimpleName()); + this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); + + return this; + } + + /** + * Registers a Workflow object. + * + * @param any Workflow type + * @param instance the workflow instance being registered + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerWorkflow(T instance) { + Class clazz = (Class) instance.getClass(); + + this.builder.addOrchestration(new WorkflowInstanceWrapper<>(instance)); + this.workflowSet.add(clazz.getCanonicalName()); + this.workflows.add(clazz.getSimpleName()); + + this.logger.info("Registered Workflow: {}", clazz.getSimpleName()); return this; } @@ -109,11 +128,30 @@ public WorkflowRuntimeBuilder registerWorkflow(Class cla * @return the WorkflowRuntimeBuilder */ public WorkflowRuntimeBuilder registerActivity(Class clazz) { - this.builder.addActivity(new WorkflowActivityWrapper<>(clazz)); + this.builder.addActivity(new WorkflowActivityClassWrapper<>(clazz)); + this.activitySet.add(clazz.getCanonicalName()); + this.activities.add(clazz.getSimpleName()); + + this.logger.info("Registered Activity: {}", clazz.getSimpleName()); + + return this; + } + + /** + * Registers an Activity object. + * + * @param any WorkflowActivity type + * @param instance the class instance being registered + * @return the WorkflowRuntimeBuilder + */ + public WorkflowRuntimeBuilder registerActivity(T instance) { + Class clazz = (Class) instance.getClass(); + + this.builder.addActivity(new WorkflowActivityInstanceWrapper<>(instance)); this.activitySet.add(clazz.getCanonicalName()); this.activities.add(clazz.getSimpleName()); - this.logger.info("Registered Activity: " + clazz.getSimpleName()); + this.logger.info("Registered Activity: {}", clazz.getSimpleName()); return this; } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index 32af9fc6f2..98b5dc23b2 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -23,12 +23,13 @@ import io.dapr.workflows.saga.Saga; import io.dapr.workflows.saga.SagaContext; -import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.slf4j.Logger; +import javax.annotation.Nullable; + import java.time.Duration; import java.time.Instant; import java.time.ZonedDateTime; diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java similarity index 61% rename from sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityWrapperTest.java rename to sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java index 754c02bd8b..0783176051 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityClassWrapperTest.java @@ -3,16 +3,15 @@ import com.microsoft.durabletask.TaskActivityContext; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; -import org.junit.Assert; import org.junit.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; - -public class WorkflowActivityWrapperTest { +public class WorkflowActivityClassWrapperTest { public static class TestActivity implements WorkflowActivity { @Override public Object run(WorkflowActivityContext ctx) { @@ -22,24 +21,26 @@ public Object run(WorkflowActivityContext ctx) { } @Test - public void getName() throws NoSuchMethodException { - WorkflowActivityWrapper wrapper = new WorkflowActivityWrapper<>( - WorkflowActivityWrapperTest.TestActivity.class); - Assert.assertEquals( - "io.dapr.workflows.runtime.WorkflowActivityWrapperTest.TestActivity", + public void getName() { + WorkflowActivityClassWrapper wrapper = new WorkflowActivityClassWrapper<>(TestActivity.class); + + assertEquals( + "io.dapr.workflows.runtime.WorkflowActivityClassWrapperTest.TestActivity", wrapper.getName() ); } @Test - public void createWithClass() throws NoSuchMethodException { + public void createWithClass() { TaskActivityContext mockContext = mock(TaskActivityContext.class); - WorkflowActivityWrapper wrapper = new WorkflowActivityWrapper<>( - WorkflowActivityWrapperTest.TestActivity.class); + WorkflowActivityClassWrapper wrapper = new WorkflowActivityClassWrapper<>(TestActivity.class); + when(mockContext.getInput(String.class)).thenReturn("Hello"); when(mockContext.getName()).thenReturn("TestActivityContext"); + Object result = wrapper.create().run(mockContext); + verify(mockContext, times(1)).getInput(String.class); - Assert.assertEquals("Hello world! from TestActivityContext", result); + assertEquals("Hello world! from TestActivityContext", result); } } diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java new file mode 100644 index 0000000000..bd8788bbdd --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowActivityInstanceWrapperTest.java @@ -0,0 +1,46 @@ +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskActivityContext; +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.junit.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class WorkflowActivityInstanceWrapperTest { + public static class TestActivity implements WorkflowActivity { + @Override + public Object run(WorkflowActivityContext ctx) { + String activityContextName = ctx.getName(); + return ctx.getInput(String.class) + " world! from " + activityContextName; + } + } + + @Test + public void getName() { + WorkflowActivityInstanceWrapper wrapper = new WorkflowActivityInstanceWrapper<>(new TestActivity()); + + assertEquals( + "io.dapr.workflows.runtime.WorkflowActivityInstanceWrapperTest.TestActivity", + wrapper.getName() + ); + } + + @Test + public void createWithInstance() { + TaskActivityContext mockContext = mock(TaskActivityContext.class); + WorkflowActivityInstanceWrapper wrapper = new WorkflowActivityInstanceWrapper<>(new TestActivity()); + + when(mockContext.getInput(String.class)).thenReturn("Hello"); + when(mockContext.getName()).thenReturn("TestActivityContext"); + + Object result = wrapper.create().run(mockContext); + + verify(mockContext, times(1)).getInput(String.class); + assertEquals("Hello world! from TestActivityContext", result); + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java similarity index 79% rename from sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowWrapperTest.java rename to sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java index 6066a7f7c0..a73b616bc2 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowWrapperTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowClassWrapperTest.java @@ -13,20 +13,19 @@ package io.dapr.workflows.runtime; - import com.microsoft.durabletask.TaskOrchestrationContext; import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowContext; import io.dapr.workflows.WorkflowStub; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -public class WorkflowWrapperTest { +public class WorkflowClassWrapperTest { public static class TestWorkflow implements Workflow { @Override public WorkflowStub create() { @@ -36,9 +35,10 @@ public WorkflowStub create() { @Test public void getName() { - WorkflowWrapper wrapper = new WorkflowWrapper<>(TestWorkflow.class); - Assertions.assertEquals( - "io.dapr.workflows.runtime.WorkflowWrapperTest.TestWorkflow", + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestWorkflow.class); + + assertEquals( + "io.dapr.workflows.runtime.WorkflowClassWrapperTest.TestWorkflow", wrapper.getName() ); } @@ -46,10 +46,11 @@ public void getName() { @Test public void createWithClass() { TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); - WorkflowWrapper wrapper = new WorkflowWrapper<>(TestWorkflow.class); + WorkflowClassWrapper wrapper = new WorkflowClassWrapper<>(TestWorkflow.class); + when(mockContext.getInstanceId()).thenReturn("uuid"); wrapper.create().run(mockContext); verify(mockContext, times(1)).getInstanceId(); } -} \ No newline at end of file +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java new file mode 100644 index 0000000000..22f315aa53 --- /dev/null +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowInstanceWrapperTest.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.workflows.runtime; + +import com.microsoft.durabletask.TaskOrchestrationContext; +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowContext; +import io.dapr.workflows.WorkflowStub; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class WorkflowInstanceWrapperTest { + public static class TestWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return WorkflowContext::getInstanceId; + } + } + + @Test + public void getName() { + WorkflowInstanceWrapper wrapper = new WorkflowInstanceWrapper<>(new TestWorkflow()); + + assertEquals( + "io.dapr.workflows.runtime.WorkflowInstanceWrapperTest.TestWorkflow", + wrapper.getName() + ); + } + + @Test + public void createWithInstance() { + TaskOrchestrationContext mockContext = mock(TaskOrchestrationContext.class); + WorkflowInstanceWrapper wrapper = new WorkflowInstanceWrapper<>(new TestWorkflow()); + + when(mockContext.getInstanceId()).thenReturn("uuid"); + wrapper.create().run(mockContext); + verify(mockContext, times(1)).getInstanceId(); + } + +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java index 81e3c30f18..c159930b91 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/runtime/WorkflowRuntimeBuilderTest.java @@ -12,16 +12,18 @@ */ package io.dapr.workflows.runtime; - import io.dapr.workflows.Workflow; import io.dapr.workflows.WorkflowActivity; import io.dapr.workflows.WorkflowActivityContext; import io.dapr.workflows.WorkflowStub; import org.junit.jupiter.api.Test; -import org.mockito.Mockito; import org.slf4j.Logger; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.ByteArrayOutputStream; import java.io.PrintStream; @@ -47,14 +49,30 @@ public void registerValidWorkflowClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(TestWorkflow.class)); } + @Test + public void registerValidWorkflowInstance() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerWorkflow(new TestWorkflow())); + } + @Test public void registerValidWorkflowActivityClass() { assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(TestActivity.class)); } + @Test + public void registerValidWorkflowActivityInstance() { + assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().registerActivity(new TestActivity())); + } + @Test public void buildTest() { - assertDoesNotThrow(() -> new WorkflowRuntimeBuilder().build()); + assertDoesNotThrow(() -> { + try (WorkflowRuntime runtime = new WorkflowRuntimeBuilder().build()) { + System.out.println("WorkflowRuntime created"); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); } @Test @@ -63,19 +81,20 @@ public void loggingOutputTest() { ByteArrayOutputStream outStreamCapture = new ByteArrayOutputStream(); System.setOut(new PrintStream(outStreamCapture)); - Logger testLogger = Mockito.mock(Logger.class); + Logger testLogger = mock(Logger.class); assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerWorkflow(TestWorkflow.class)); assertDoesNotThrow(() -> new WorkflowRuntimeBuilder(testLogger).registerActivity(TestActivity.class)); - WorkflowRuntimeBuilder wfRuntime = new WorkflowRuntimeBuilder(); + WorkflowRuntimeBuilder workflowRuntimeBuilder = new WorkflowRuntimeBuilder(); - wfRuntime.build(); + try (WorkflowRuntime runtime = workflowRuntimeBuilder.build()) { + verify(testLogger, times(1)) + .info(eq("Registered Workflow: {}"), eq("TestWorkflow")); - Mockito.verify(testLogger, Mockito.times(1)) - .info(Mockito.eq("Registered Workflow: TestWorkflow")); - Mockito.verify(testLogger, Mockito.times(1)) - .info(Mockito.eq("Registered Activity: TestActivity")); + verify(testLogger, times(1)) + .info(eq("Registered Activity: {}"), eq("TestActivity")); + } } } diff --git a/sdk/pom.xml b/sdk/pom.xml index 1c9817b87a..99ba186adc 100644 --- a/sdk/pom.xml +++ b/sdk/pom.xml @@ -44,17 +44,6 @@ reactor-core 3.5.0 - - com.squareup.okhttp3 - okhttp - 4.12.0 - - - org.jetbrains.kotlin - kotlin-stdlib-jdk8 - - - org.mockito mockito-core diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 1f39d7aad0..0c1264eb19 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -80,7 +80,6 @@ import io.grpc.Metadata; import io.grpc.stub.AbstractStub; import io.grpc.stub.StreamObserver; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; @@ -90,6 +89,8 @@ import reactor.util.context.ContextView; import reactor.util.retry.Retry; +import javax.annotation.Nonnull; + import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -441,7 +442,7 @@ public Subscription subscribeToEvents( return buildSubscription(listener, type, request); } - @NotNull + @Nonnull private Subscription buildSubscription( SubscriptionListener listener, TypeRef type, diff --git a/sdk/src/main/java/io/dapr/client/DaprHttp.java b/sdk/src/main/java/io/dapr/client/DaprHttp.java index d2478d87fd..5b23d733ec 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttp.java @@ -15,32 +15,28 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.dapr.client.domain.Metadata; -import io.dapr.config.Properties; import io.dapr.exceptions.DaprError; import io.dapr.exceptions.DaprException; import io.dapr.internal.exceptions.DaprHttpException; import io.dapr.utils.Version; -import okhttp3.Call; -import okhttp3.Callback; -import okhttp3.HttpUrl; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.RequestBody; -import okhttp3.ResponseBody; -import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; import reactor.util.context.ContextView; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -94,12 +90,12 @@ public enum HttpMethods { } public static class Response { - private byte[] body; - private Map headers; - private int statusCode; + private final byte[] body; + private final Map headers; + private final int statusCode; /** - * Represents an http response. + * Represents a HTTP response. * * @param body The body of the http response. * @param headers The headers of the http response. @@ -127,19 +123,17 @@ public int getStatusCode() { /** * Defines the standard application/json type for HTTP calls in Dapr. */ - private static final MediaType MEDIA_TYPE_APPLICATION_JSON = - MediaType.get("application/json; charset=utf-8"); + private static final String MEDIA_TYPE_APPLICATION_JSON = "application/json; charset=utf-8"; /** - * Shared object representing an empty request body in JSON. + * Empty input or output. */ - private static final RequestBody REQUEST_BODY_EMPTY_JSON = - RequestBody.Companion.create("", MEDIA_TYPE_APPLICATION_JSON); + private static final byte[] EMPTY_BYTES = new byte[0]; /** - * Empty input or output. + * Empty Body Publisher. */ - private static final byte[] EMPTY_BYTES = new byte[0]; + private static final HttpRequest.BodyPublisher EMPTY_BODY_PUBLISHER = HttpRequest.BodyPublishers.noBody(); /** * Endpoint used to communicate to Dapr's HTTP endpoint. @@ -147,38 +141,47 @@ public int getStatusCode() { private final URI uri; /** - * Http client used for all API calls. + * Dapr API Token required to interact with DAPR APIs. */ - private final OkHttpClient httpClient; + private final String daprApiToken; /** - * Dapr API Token required to interact with DAPR APIs. + * Http client request read timeout. */ - private final String daprApiToken; + private final Duration readTimeout; + + /** + * Http client used for all API calls. + */ + private final HttpClient httpClient; /** * Creates a new instance of {@link DaprHttp}. * * @param hostname Hostname for calling Dapr. (e.g. "127.0.0.1") * @param port Port for calling Dapr. (e.g. 3500) + * @param readTimeout HTTP request read timeout * @param httpClient RestClient used for all API calls in this new instance. */ - DaprHttp(String hostname, int port, String daprApiToken, OkHttpClient httpClient) { + DaprHttp(String hostname, int port, String daprApiToken, Duration readTimeout, HttpClient httpClient) { this.uri = URI.create(DEFAULT_HTTP_SCHEME + "://" + hostname + ":" + port); - this.httpClient = httpClient; this.daprApiToken = daprApiToken; + this.readTimeout = readTimeout; + this.httpClient = httpClient; } /** * Creates a new instance of {@link DaprHttp}. * - * @param uri Endpoint for calling Dapr. (e.g. "https://my-dapr-api.company.com") + * @param uri Endpoint for calling Dapr. + * @param readTimeout HTTP request read timeout * @param httpClient RestClient used for all API calls in this new instance. */ - DaprHttp(String uri, String daprApiToken, OkHttpClient httpClient) { + DaprHttp(String uri, String daprApiToken, Duration readTimeout, HttpClient httpClient) { this.uri = URI.create(uri); - this.httpClient = httpClient; this.daprApiToken = daprApiToken; + this.readTimeout = readTimeout; + this.httpClient = httpClient; } /** @@ -244,13 +247,13 @@ public Mono invokeApi( Map headers, ContextView context) { // fromCallable() is needed so the invocation does not happen early, causing a hot mono. - return Mono.fromCallable(() -> doInvokeApi(method, pathSegments, urlParameters, content, headers, context)) - .flatMap(f -> Mono.fromFuture(f)); + return Mono.fromCallable(() -> doInvokeApi(method, headers, pathSegments, urlParameters, content, context)) + .flatMap(Mono::fromFuture); } /** - * Shutdown call is not necessary for OkHttpClient. - * @see OkHttpClient + * Shutdown call is not necessary for HttpClient. + * @see HttpClient */ @Override public void close() { @@ -268,77 +271,155 @@ public void close() { * @param context OpenTelemetry's Context. * @return CompletableFuture for Response. */ - private CompletableFuture doInvokeApi(String method, + private CompletableFuture doInvokeApi( + String method, + Map headers, String[] pathSegments, Map> urlParameters, - byte[] content, Map headers, + byte[] content, ContextView context) { - final String requestId = UUID.randomUUID().toString(); - RequestBody body; - - String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null; - MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType); - if (content == null) { - body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON) - ? REQUEST_BODY_EMPTY_JSON - : RequestBody.Companion.create(new byte[0], mediaType); - } else { - body = RequestBody.Companion.create(content, mediaType); - } - HttpUrl.Builder urlBuilder = new HttpUrl.Builder(); - urlBuilder.scheme(uri.getScheme()) - .host(uri.getHost()); - if (uri.getPort() > 0) { - urlBuilder.port(uri.getPort()); - } - if (uri.getPath() != null) { - urlBuilder.addPathSegments(uri.getPath()); - } - for (String pathSegment : pathSegments) { - urlBuilder.addPathSegment(pathSegment); + HttpRequest.Builder requestBuilder = HttpRequest.newBuilder(); + + requestBuilder.uri(createUri(uri, pathSegments, urlParameters)); + addHeader(requestBuilder, Headers.DAPR_USER_AGENT, Version.getSdkVersion()); + addHeader(requestBuilder, HEADER_DAPR_REQUEST_ID, UUID.randomUUID().toString()); + addHeader(requestBuilder, "Content-Type", getContentType(headers)); + addHeaders(requestBuilder, headers); + + if (daprApiToken != null) { + addHeader(requestBuilder, Headers.DAPR_API_TOKEN, daprApiToken); } - Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream() - .forEach(urlParameter -> - Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream() - .forEach(urlParameterValue -> - urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue))); - - Request.Builder requestBuilder = new Request.Builder() - .url(urlBuilder.build()) - .addHeader(HEADER_DAPR_REQUEST_ID, requestId); + if (context != null) { context.stream() .filter(entry -> ALLOWED_CONTEXT_IN_HEADERS.contains(entry.getKey().toString().toLowerCase())) - .forEach(entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString())); + .forEach(entry -> addHeader(requestBuilder, entry.getKey().toString(), entry.getValue().toString())); } + + HttpRequest.BodyPublisher body = getBodyPublisher(content); + if (HttpMethods.GET.name().equals(method)) { - requestBuilder.get(); + requestBuilder.GET(); } else if (HttpMethods.DELETE.name().equals(method)) { - requestBuilder.delete(); + requestBuilder.DELETE(); } else if (HttpMethods.HEAD.name().equals(method)) { - requestBuilder.head(); + // HTTP HEAD is not exposed as a normal method + requestBuilder.method(HttpMethods.HEAD.name(), EMPTY_BODY_PUBLISHER); } else { requestBuilder.method(method, body); } - if (daprApiToken != null) { - requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken); + HttpRequest request = requestBuilder.timeout(readTimeout).build(); + + return httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray()) + .thenApply(this::createResponse); + } + + private static String getContentType(Map headers) { + String result = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null; + + return result == null ? MEDIA_TYPE_APPLICATION_JSON : result; + } + + private static URI createUri(URI uri, String[] pathSegments, Map> urlParameters) { + String path = createPath(uri, pathSegments); + String query = createQuery(urlParameters); + + try { + return new URI(uri.getScheme(), uri.getAuthority(), path, query, null); + } catch (URISyntaxException exception) { + throw new DaprException(exception); } - requestBuilder.addHeader(Headers.DAPR_USER_AGENT, Version.getSdkVersion()); + } + + private static String createPath(URI uri, String[] pathSegments) { + String basePath = uri.getPath(); - if (headers != null) { - Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream() - .forEach(header -> { - requestBuilder.addHeader(header.getKey(), header.getValue()); - }); + if (pathSegments == null || pathSegments.length == 0) { + return basePath; } - Request request = requestBuilder.build(); + StringBuilder pathBuilder = new StringBuilder(basePath); + if (!basePath.endsWith("/")) { // Add a "/" if needed + pathBuilder.append("/"); + } - CompletableFuture future = new CompletableFuture<>(); - this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future)); - return future; + for (String segment : pathSegments) { + pathBuilder.append(encodePathSegment(segment)).append("/"); // Encode each segment + } + + pathBuilder.deleteCharAt(pathBuilder.length() - 1); // Remove the trailing "/" + + return pathBuilder.toString(); + } + + private static String createQuery(Map> urlParameters) { + if (urlParameters == null || urlParameters.isEmpty()) { + return null; + } + + StringBuilder queryBuilder = new StringBuilder(); + + for (Map.Entry> entry : urlParameters.entrySet()) { + String key = entry.getKey(); + List values = entry.getValue(); + + for (String value : values) { + if (queryBuilder.length() > 0) { + queryBuilder.append("&"); + } + + queryBuilder.append(encodeQueryParam(key, value)); // Encode key and value + } + } + + return queryBuilder.toString(); + } + + private static String encodePathSegment(String segment) { + return URLEncoder.encode(segment, StandardCharsets.UTF_8).replace("+", "%20"); // Encode and handle spaces + } + + private static String encodeQueryParam(String key, String value) { + return URLEncoder.encode(key, StandardCharsets.UTF_8) + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8); + } + + private static void addHeader(HttpRequest.Builder requestBuilder, String name, String value) { + requestBuilder.header(name, value); + } + + private static void addHeaders(HttpRequest.Builder requestBuilder, Map headers) { + if (headers == null || headers.isEmpty()) { + return; + } + + headers.forEach((k, v) -> addHeader(requestBuilder, k, v)); + } + + private static HttpRequest.BodyPublisher getBodyPublisher(byte[] content) { + return HttpRequest.BodyPublishers.ofByteArray(Objects.requireNonNullElse(content, EMPTY_BYTES)); + } + + private Response createResponse(HttpResponse httpResponse) { + Optional headerValue = httpResponse.headers().firstValue("Metadata.statuscode"); + int httpStatusCode = parseHttpStatusCode(headerValue, httpResponse.statusCode()); + byte[] body = getBodyBytesOrEmptyArray(httpResponse.body()); + + if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) { + DaprError error = parseDaprError(body); + + if (error != null) { + throw new DaprException(error, body, httpStatusCode); + } else { + throw new DaprException("UNKNOWN", "", body, httpStatusCode); + } + } + + Map responseHeaders = new HashMap<>(); + httpResponse.headers().map().forEach((k, v) -> responseHeaders.put(k, v.isEmpty() ? null : v.get(0))); + + return new Response(body, responseHeaders, httpStatusCode); } /** @@ -360,70 +441,18 @@ private static DaprError parseDaprError(byte[] json) { } } - - private static byte[] getBodyBytesOrEmptyArray(okhttp3.Response response) throws IOException { - ResponseBody body = response.body(); - if (body != null) { - return body.bytes(); - } - - return EMPTY_BYTES; - } - - /** - * Converts the okhttp3 response into the response object expected internally by the SDK. - */ - private static class ResponseFutureCallback implements Callback { - private final CompletableFuture future; - - public ResponseFutureCallback(CompletableFuture future) { - this.future = future; - } - - @Override - public void onFailure(Call call, IOException e) { - future.completeExceptionally(e); - } - - @Override - public void onResponse(@NotNull Call call, @NotNull okhttp3.Response response) throws IOException { - int httpStatusCode = parseHttpStatusCode(response.header("Metadata.statuscode"), response.code()); - if (!DaprHttpException.isSuccessfulHttpStatusCode(httpStatusCode)) { - try { - byte[] payload = getBodyBytesOrEmptyArray(response); - DaprError error = parseDaprError(payload); - - if (error != null) { - future.completeExceptionally(new DaprException(error, payload, httpStatusCode)); - return; - } - - future.completeExceptionally( - new DaprException("UNKNOWN", "", payload, httpStatusCode)); - return; - } catch (DaprException e) { - future.completeExceptionally(e); - return; - } - } - - Map mapHeaders = new HashMap<>(); - byte[] result = getBodyBytesOrEmptyArray(response); - response.headers().forEach(pair -> { - mapHeaders.put(pair.getFirst(), pair.getSecond()); - }); - future.complete(new Response(result, mapHeaders, httpStatusCode)); - } + private static byte[] getBodyBytesOrEmptyArray(byte[] body) { + return body == null ? EMPTY_BYTES : body; } - private static int parseHttpStatusCode(String headerValue, int defaultStatusCode) { - if ((headerValue == null) || headerValue.isEmpty()) { + private static int parseHttpStatusCode(Optional headerValue, int defaultStatusCode) { + if (headerValue.isEmpty()) { return defaultStatusCode; } // Metadata used to override status code with code received from HTTP binding. try { - int httpStatusCode = Integer.parseInt(headerValue); + int httpStatusCode = Integer.parseInt(headerValue.get()); if (DaprHttpException.isValidHttpStatusCode(httpStatusCode)) { return httpStatusCode; } diff --git a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java index 8ef163dd97..2de7fe6311 100644 --- a/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java +++ b/sdk/src/main/java/io/dapr/client/DaprHttpBuilder.java @@ -14,15 +14,13 @@ package io.dapr.client; import io.dapr.config.Properties; -import okhttp3.ConnectionPool; -import okhttp3.Dispatcher; -import okhttp3.OkHttpClient; +import java.net.http.HttpClient; import java.time.Duration; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import static io.dapr.config.Properties.API_TOKEN; -import static io.dapr.config.Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS; import static io.dapr.config.Properties.HTTP_CLIENT_MAX_REQUESTS; import static io.dapr.config.Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS; import static io.dapr.config.Properties.HTTP_ENDPOINT; @@ -34,24 +32,13 @@ */ public class DaprHttpBuilder { - /** - * Singleton OkHttpClient. - */ - private static volatile OkHttpClient OK_HTTP_CLIENT; + private static volatile HttpClient HTTP_CLIENT; /** * Static lock object. */ private static final Object LOCK = new Object(); - /** - * HTTP keep alive duration in seconds. - * - *

Just hard code to a reasonable value. - */ - private static final int KEEP_ALIVE_DURATION = 30; - - /** * Build an instance of the Http client based on the provided setup. * @param properties to configure the DaprHttp client @@ -68,38 +55,30 @@ public DaprHttp build(Properties properties) { * @return Instance of {@link DaprHttp} */ private DaprHttp buildDaprHttp(Properties properties) { - if (OK_HTTP_CLIENT == null) { + if (HTTP_CLIENT == null) { synchronized (LOCK) { - if (OK_HTTP_CLIENT == null) { - OkHttpClient.Builder builder = new OkHttpClient.Builder(); - Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS)); - builder.readTimeout(readTimeout); - - Dispatcher dispatcher = new Dispatcher(); - dispatcher.setMaxRequests(properties.getValue(HTTP_CLIENT_MAX_REQUESTS)); - // The maximum number of requests for each host to execute concurrently. - // Default value is 5 in okhttp which is totally UNACCEPTABLE! - // For sidecar case, set it the same as maxRequests. - dispatcher.setMaxRequestsPerHost(HTTP_CLIENT_MAX_REQUESTS.get()); - builder.dispatcher(dispatcher); - - ConnectionPool pool = new ConnectionPool(properties.getValue(HTTP_CLIENT_MAX_IDLE_CONNECTIONS), - KEEP_ALIVE_DURATION, TimeUnit.SECONDS); - builder.connectionPool(pool); - - OK_HTTP_CLIENT = builder.build(); + if (HTTP_CLIENT == null) { + int maxRequests = properties.getValue(HTTP_CLIENT_MAX_REQUESTS); + Executor executor = Executors.newFixedThreadPool(maxRequests); + HTTP_CLIENT = HttpClient.newBuilder() + .executor(executor) + .version(HttpClient.Version.HTTP_1_1) + .build(); } } } String endpoint = properties.getValue(HTTP_ENDPOINT); + String apiToken = properties.getValue(API_TOKEN); + Duration readTimeout = Duration.ofSeconds(properties.getValue(HTTP_CLIENT_READ_TIMEOUT_SECONDS)); + if ((endpoint != null) && !endpoint.isEmpty()) { - return new DaprHttp(endpoint, properties.getValue(API_TOKEN), OK_HTTP_CLIENT); + return new DaprHttp(endpoint, apiToken, readTimeout, HTTP_CLIENT); } - return new DaprHttp(properties.getValue(SIDECAR_IP), properties.getValue(HTTP_PORT), properties.getValue(API_TOKEN), - OK_HTTP_CLIENT); - + String sidecarIp = properties.getValue(SIDECAR_IP); + int port = properties.getValue(HTTP_PORT); + return new DaprHttp(sidecarIp, port, apiToken, readTimeout, HTTP_CLIENT); } } diff --git a/sdk/src/main/java/io/dapr/client/Subscription.java b/sdk/src/main/java/io/dapr/client/Subscription.java index 9d1a2a5035..53e89e8456 100644 --- a/sdk/src/main/java/io/dapr/client/Subscription.java +++ b/sdk/src/main/java/io/dapr/client/Subscription.java @@ -19,9 +19,10 @@ import io.dapr.v1.DaprGrpc; import io.dapr.v1.DaprProtos; import io.grpc.stub.StreamObserver; -import org.jetbrains.annotations.NotNull; import reactor.core.publisher.Mono; +import javax.annotation.Nonnull; + import java.io.Closeable; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -153,7 +154,7 @@ private static Mono onEvent( }).onErrorReturn(SubscriptionListener.Status.RETRY); } - @NotNull + @Nonnull private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest( String id, SubscriptionListener.Status status) { DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = diff --git a/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java b/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java index 5b7d546e72..7f763c05ac 100644 --- a/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java +++ b/sdk/src/main/java/io/dapr/client/domain/HttpExtension.java @@ -14,12 +14,12 @@ package io.dapr.client.domain; import io.dapr.client.DaprHttp; -import okhttp3.HttpUrl; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; /** * HTTP Extension class. @@ -67,17 +67,17 @@ public final class HttpExtension { /** * HTTP verb. */ - private DaprHttp.HttpMethods method; + private final DaprHttp.HttpMethods method; /** * HTTP query params. */ - private Map> queryParams; + private final Map> queryParams; /** * HTTP headers. */ - private Map headers; + private final Map headers; /** * Construct a HttpExtension object. @@ -126,18 +126,29 @@ public Map getHeaders() { * @return Encoded HTTP query string. */ public String encodeQueryString() { - if ((this.queryParams == null) || (this.queryParams.isEmpty())) { + if (queryParams == null || queryParams.isEmpty()) { return ""; } - HttpUrl.Builder urlBuilder = new HttpUrl.Builder(); - // Setting required values but we only need query params in the end. - urlBuilder.scheme("http").host("localhost"); - Optional.ofNullable(this.queryParams).orElse(Collections.emptyMap()).entrySet().stream() - .forEach(urlParameter -> - Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream() - .forEach(urlParameterValue -> - urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue))); - return urlBuilder.build().encodedQuery(); + StringBuilder queryBuilder = new StringBuilder(); + + for (Map.Entry> entry : queryParams.entrySet()) { + String key = entry.getKey(); + List values = entry.getValue(); + + for (String value : values) { + if (queryBuilder.length() > 0) { + queryBuilder.append("&"); + } + + queryBuilder.append(encodeQueryParam(key, value)); // Encode key and value + } + } + + return queryBuilder.toString(); + } + + private static String encodeQueryParam(String key, String value) { + return URLEncoder.encode(key, StandardCharsets.UTF_8) + "=" + URLEncoder.encode(value, StandardCharsets.UTF_8); } } diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 44e36d06d0..6864a1ee07 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -13,24 +13,16 @@ package io.dapr.client; import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.dataformat.xml.XmlMapper; import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; -import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; import io.dapr.utils.TypeRef; import io.dapr.v1.DaprGrpc; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.ResponseBody; -import okhttp3.mock.Behavior; -import okhttp3.mock.MediaTypes; -import okhttp3.mock.MockInterceptor; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -40,13 +32,15 @@ import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import static io.dapr.utils.TestUtils.assertThrowsDaprException; import static io.dapr.utils.TestUtils.findFreePort; @@ -57,12 +51,19 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class DaprClientHttpTest { private final String EXPECTED_RESULT = "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}"; + + private static final int HTTP_NO_CONTENT = 204; + private static final int HTTP_NOT_FOUND = 404; + private static final int HTTP_SERVER_ERROR = 500; + private static final int HTTP_OK = 200; + private static final Duration READ_TIMEOUT = Duration.ofSeconds(60); private String sidecarIp; @@ -72,17 +73,14 @@ public class DaprClientHttpTest { private DaprHttp daprHttp; - private OkHttpClient okHttpClient; - - private MockInterceptor mockInterceptor; + private HttpClient httpClient; @BeforeEach public void setUp() { sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get()); daprApiToken = Properties.API_TOKEN.get(); - mockInterceptor = new MockInterceptor(Behavior.UNORDERED); - okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); - daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, okHttpClient); + httpClient = mock(HttpClient.class); + daprHttp = new DaprHttp(sidecarIp, 3000, daprApiToken, READ_TIMEOUT, httpClient); daprClientHttp = buildDaprClient(daprHttp); } @@ -100,14 +98,16 @@ private static DaprClient buildDaprClient(DaprHttp daprHttp) { @Test public void waitForSidecarTimeOutHealthCheck() throws Exception { - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, okHttpClient); + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3000, daprApiToken, READ_TIMEOUT, httpClient); DaprClient daprClientHttp = buildDaprClient(daprHttp); - mockInterceptor.addRule() - .get() - .path("/v1.0/healthz/outbound") - .delay(200) - .respond(204, ResponseBody.create("No Content", MediaType.get("application/json"))); + when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> { + Thread.sleep(200); + + return mockResponse; + }); StepVerifier.create(daprClientHttp.waitForSidecar(100)) .expectSubscription() @@ -123,15 +123,20 @@ public void waitForSidecarTimeOutHealthCheck() throws Exception { @Test public void waitForSidecarBadHealthCheck() throws Exception { + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NOT_FOUND); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); int port = findFreePort(); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, READ_TIMEOUT, httpClient); DaprClient daprClientHttp = buildDaprClient(daprHttp); + AtomicInteger count = new AtomicInteger(0); - mockInterceptor.addRule() - .get() - .path("/v1.0/healthz/outbound") - .times(6) - .respond(404, ResponseBody.create("Not Found", MediaType.get("application/json"))); + when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> { + if (count.getAndIncrement() < 6) { + return mockResponse; + } + + return CompletableFuture.failedFuture(new TimeoutException()); + }); // it will timeout. StepVerifier.create(daprClientHttp.waitForSidecar(5000)) @@ -143,24 +148,25 @@ public void waitForSidecarBadHealthCheck() throws Exception { @Test public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception { int port = findFreePort(); - daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, okHttpClient); + daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), port, daprApiToken, READ_TIMEOUT, httpClient); DaprClient daprClientHttp = buildDaprClient(daprHttp); + AtomicInteger count = new AtomicInteger(0); - // Simulate a slow response - mockInterceptor.addRule() - .get() - .path("/v1.0/healthz/outbound") - .delay(1000) - .times(2) - .respond(500, ResponseBody.create("Internal Server Error", MediaType.get("application/json"))); - - mockInterceptor.addRule() - .get() - .path("/v1.0/healthz/outbound") - .delay(1000) - .times(1) - .respond(204, ResponseBody.create("No Content", MediaType.get("application/json"))); + when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> { + if (count.getAndIncrement() < 2) { + Thread.sleep(1000); + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR); + return CompletableFuture.>completedFuture(mockHttpResponse); + } + + Thread.sleep(1000); + + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT); + return CompletableFuture.>completedFuture(mockHttpResponse); + }); + + // Simulate a slow response StepVerifier.create(daprClientHttp.waitForSidecar(5000)) .expectSubscription() .expectNext() @@ -170,14 +176,13 @@ public void waitForSidecarSlowSuccessfulHealthCheck() throws Exception { @Test public void waitForSidecarOK() throws Exception { + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); int port = findFreePort(); - daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient); + daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, READ_TIMEOUT, httpClient); DaprClient daprClientHttp = buildDaprClient(daprHttp); - mockInterceptor.addRule() - .get() - .path("/v1.0/healthz/outbound") - .respond(204); + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); StepVerifier.create(daprClientHttp.waitForSidecar(10000)) .expectSubscription() @@ -187,12 +192,14 @@ public void waitForSidecarOK() throws Exception { @Test public void waitForSidecarTimeoutOK() throws Exception { - mockInterceptor.addRule() - .get() - .path("/v1.0/healthz/outbound") - .respond(204); + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_NO_CONTENT); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + try (ServerSocket serverSocket = new ServerSocket(0)) { - final int port = serverSocket.getLocalPort(); + int port = serverSocket.getLocalPort(); + Thread t = new Thread(() -> { try { try (Socket socket = serverSocket.accept()) { @@ -201,7 +208,8 @@ public void waitForSidecarTimeoutOK() throws Exception { } }); t.start(); - daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, okHttpClient); + + daprHttp = new DaprHttp(sidecarIp, port, daprApiToken, READ_TIMEOUT, httpClient); DaprClient daprClientHttp = buildDaprClient(daprHttp); daprClientHttp.waitForSidecar(10000).block(); } @@ -209,80 +217,146 @@ public void waitForSidecarTimeoutOK() throws Exception { @Test public void invokeServiceVerbNull() { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/publish/A") - .respond(EXPECTED_RESULT); - String event = "{ \"message\": \"This is a test\" }"; + MockHttpResponse mockHttpResponse = new MockHttpResponse(EXPECTED_RESULT.getBytes(), HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); assertThrows(IllegalArgumentException.class, () -> - daprClientHttp.invokeMethod(null, "", "", null, null, (Class)null).block()); + daprClientHttp.invokeMethod( + null, + "", + "", + null, + null, + (Class)null + ).block()); } @Test public void invokeServiceIllegalArgumentException() { - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/badorder") - .respond("INVALID JSON"); + byte[] content = "INVALID JSON".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); assertThrows(IllegalArgumentException.class, () -> { // null HttpMethod - daprClientHttp.invokeMethod("1", "2", "3", new HttpExtension(null), null, (Class)null).block(); + daprClientHttp.invokeMethod( + "1", + "2", + "3", + new HttpExtension(null), + null, + (Class)null + ).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null HttpExtension - daprClientHttp.invokeMethod("1", "2", "3", null, null, (Class)null).block(); + daprClientHttp.invokeMethod( + "1", + "2", + "3", + null, + null, + (Class)null + ).block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty appId - daprClientHttp.invokeMethod("", "1", null, HttpExtension.GET, null, (Class)null).block(); + daprClientHttp.invokeMethod( + "", + "1", + null, + HttpExtension.GET, + null, + (Class)null + ).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null appId, empty method - daprClientHttp.invokeMethod(null, "", null, HttpExtension.POST, null, (Class)null).block(); + daprClientHttp.invokeMethod( + null, + "", + null, + HttpExtension.POST, + null, + (Class)null + ).block(); }); assertThrows(IllegalArgumentException.class, () -> { // empty method - daprClientHttp.invokeMethod("1", "", null, HttpExtension.PUT, null, (Class)null).block(); + daprClientHttp.invokeMethod( + "1", + "", + null, + HttpExtension.PUT, + null, + (Class)null + ).block(); }); assertThrows(IllegalArgumentException.class, () -> { // null method - daprClientHttp.invokeMethod("1", null, null, HttpExtension.DELETE, null, (Class)null).block(); + daprClientHttp.invokeMethod( + "1", + null, + null, + HttpExtension.DELETE, + null, + (Class)null + ).block(); }); assertThrowsDaprException(JsonParseException.class, () -> { // invalid JSON response - daprClientHttp.invokeMethod("41", "badorder", null, HttpExtension.GET, null, String.class).block(); + daprClientHttp.invokeMethod( + "41", + "badorder", + null, + HttpExtension.GET, + null, + String.class + ).block(); }); } @Test public void invokeServiceDaprError() { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod") - .respond(500, - ResponseBody.create( - "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}", - MediaTypes.MEDIATYPE_JSON)); + byte[] content = "{ \"errorCode\": \"MYCODE\", \"message\": \"My Message\"}".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); DaprException exception = assertThrows(DaprException.class, () -> { - daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + daprClientHttp.invokeMethod( + "myapp", + "mymethod", + "anything", + HttpExtension.POST + ).block(); }); assertEquals("MYCODE", exception.getErrorCode()); assertEquals("MYCODE: My Message (HTTP status code: 500)", exception.getMessage()); - assertEquals(500, exception.getHttpStatusCode()); + assertEquals(HTTP_SERVER_ERROR, exception.getHttpStatusCode()); } @Test public void invokeServiceDaprErrorFromGRPC() { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod") - .respond(500, - ResponseBody.create( - "{ \"code\": 7 }", - MediaTypes.MEDIATYPE_JSON)); + byte[] content = "{ \"code\": 7 }".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); DaprException exception = assertThrows(DaprException.class, () -> { - daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + daprClientHttp.invokeMethod( + "myapp", + "mymethod", + "anything", + HttpExtension.POST + ).block(); }); assertEquals("PERMISSION_DENIED", exception.getErrorCode()); @@ -291,12 +365,11 @@ public void invokeServiceDaprErrorFromGRPC() { @Test public void invokeServiceDaprErrorUnknownJSON() { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod") - .respond(500, - ResponseBody.create( - "{ \"anything\": 7 }", - MediaTypes.MEDIATYPE_JSON)); + byte[] content = "{ \"anything\": 7 }".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); DaprException exception = assertThrows(DaprException.class, () -> { daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); @@ -309,119 +382,203 @@ public void invokeServiceDaprErrorUnknownJSON() { @Test public void invokeServiceDaprErrorEmptyString() { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/invoke/myapp/method/mymethod") - .respond(500, - ResponseBody.create( - "", - MediaTypes.MEDIATYPE_JSON)); + byte[] content = "".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); DaprException exception = assertThrows(DaprException.class, () -> { - daprClientHttp.invokeMethod("myapp", "mymethod", "anything", HttpExtension.POST).block(); + daprClientHttp.invokeMethod( + "myapp", + "mymethod", + "anything", + HttpExtension.POST + ).block(); }); assertEquals("UNKNOWN", exception.getErrorCode()); assertEquals("UNKNOWN: HTTP status code: 500", exception.getMessage()); } - @Test public void invokeServiceMethodNull() { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/publish/A") - .respond(EXPECTED_RESULT); + byte[] content = EXPECTED_RESULT.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); assertThrows(IllegalArgumentException.class, () -> - daprClientHttp.invokeMethod("1", "", null, HttpExtension.POST, null, (Class)null).block()); + daprClientHttp.invokeMethod( + "1", + "", + null, + HttpExtension.POST, + null, + (Class)null + ).block()); } @Test public void invokeService() { - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond("\"hello world\""); + byte[] content = "\"hello world\"".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + null, + HttpExtension.GET, + null, + String.class + ); - Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class); assertEquals("hello world", mono.block()); } @Test public void invokeServiceNullResponse() { - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond(new byte[0]); + byte[] content = new byte[0]; + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + null, + HttpExtension.GET, + null, + String.class + ); - Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, null, String.class); assertNull(mono.block()); } @Test public void simpleInvokeService() { - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond(EXPECTED_RESULT); + byte[] content = EXPECTED_RESULT.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + null, + HttpExtension.GET, + byte[].class + ); - Mono mono = daprClientHttp.invokeMethod("41", "neworder", null, HttpExtension.GET, byte[].class); assertEquals(new String(mono.block()), EXPECTED_RESULT); } @Test public void invokeServiceWithMetadataMap() { - Map map = new HashMap<>(); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond(EXPECTED_RESULT); - - Mono mono = daprClientHttp.invokeMethod("41", "neworder", (byte[]) null, HttpExtension.GET, map); + Map map = Map.of(); + byte[] content = EXPECTED_RESULT.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + (byte[]) null, + HttpExtension.GET, + map + ); String monoString = new String(mono.block()); + assertEquals(monoString, EXPECTED_RESULT); } @Test public void invokeServiceWithOutRequest() { - Map map = new HashMap<>(); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond(EXPECTED_RESULT); + Map map = Map.of(); + byte[] content = EXPECTED_RESULT.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + HttpExtension.GET, + map + ); - Mono mono = daprClientHttp.invokeMethod("41", "neworder", HttpExtension.GET, map); assertNull(mono.block()); } @Test public void invokeServiceWithRequest() { - Map map = new HashMap<>(); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond(EXPECTED_RESULT); + Map map = Map.of(); + byte[] content = EXPECTED_RESULT.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + "", + HttpExtension.GET, + map + ); - Mono mono = daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map); assertNull(mono.block()); } @Test public void invokeServiceWithRequestAndQueryString() { - Map map = new HashMap<>(); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder?param1=1¶m2=a¶m2=b%2Fc") - .respond(EXPECTED_RESULT); - - Map> queryString = new HashMap<>(); - queryString.put("param1", Collections.singletonList("1")); - queryString.put("param2", Arrays.asList("a", "b/c")); + Map map = Map.of(); + Map> queryString = Map.of( + "param1", List.of("1"), + "param2", List.of("a", "b/c") + ); + byte[] content = EXPECTED_RESULT.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + HttpExtension httpExtension = new HttpExtension(DaprHttp.HttpMethods.GET, queryString, null); - Mono mono = daprClientHttp.invokeMethod("41", "neworder", "", httpExtension, map); + Mono mono = daprClientHttp.invokeMethod( + "41", + "neworder", + "", + httpExtension, + map + ); + assertNull(mono.block()); } @Test public void invokeServiceNoHotMono() { - Map map = new HashMap<>(); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .respond(500); - - daprClientHttp.invokeMethod("41", "neworder", "", HttpExtension.GET, map); + Map map = Map.of(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + daprClientHttp.invokeMethod( + "41", + "neworder", + "", + HttpExtension.GET, + map + ); // No exception should be thrown because did not call block() on mono above. } @@ -433,18 +590,27 @@ public void invokeServiceWithContext() { .put("traceparent", traceparent) .put("tracestate", tracestate) .put("not_added", "xyz"); - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3000/v1.0/invoke/41/method/neworder") - .header("traceparent", traceparent) - .header("tracestate", tracestate) - .respond(new byte[0]); + byte[] content = new byte[0]; + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder") .setBody("request") .setHttpExtension(HttpExtension.POST); Mono result = daprClientHttp.invokeMethod(req, TypeRef.get(Void.class)) .contextWrite(it -> it.putAll((ContextView) context)); + result.block(); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + + assertEquals(traceparent, request.headers().firstValue("traceparent").get()); + assertEquals(tracestate, request.headers().firstValue("tracestate").get()); } @Test @@ -467,23 +633,4 @@ public void close() throws Exception { daprClientHttp.close(); } - private static class XmlSerializer implements DaprObjectSerializer { - - private static final XmlMapper XML_MAPPER = new XmlMapper(); - - @Override - public byte[] serialize(Object o) throws IOException { - return XML_MAPPER.writeValueAsBytes(o); - } - - @Override - public T deserialize(byte[] data, TypeRef type) throws IOException { - return XML_MAPPER.readValue(data, new TypeReference() {}); - } - - @Override - public String getContentType() { - return "application/xml"; - } - } -} \ No newline at end of file +} diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java index 78470719cb..85863c046c 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpBuilderTest.java @@ -14,10 +14,10 @@ package io.dapr.client; import io.dapr.config.Properties; -import okhttp3.OkHttpClient; import org.junit.jupiter.api.Test; import java.lang.reflect.Field; +import java.net.http.HttpClient; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -30,14 +30,13 @@ public void singletonOkHttpClient() throws Exception { DaprHttp daprHttp = new DaprHttpBuilder().build(properties); DaprHttp anotherDaprHttp = new DaprHttpBuilder().build(properties); - assertSame(getOkHttpClient(daprHttp), getOkHttpClient(anotherDaprHttp)); + assertSame(getHttpClient(daprHttp), getHttpClient(anotherDaprHttp)); } - - private static OkHttpClient getOkHttpClient(DaprHttp daprHttp) throws Exception { + private static HttpClient getHttpClient(DaprHttp daprHttp) throws Exception { Field httpClientField = DaprHttp.class.getDeclaredField("httpClient"); httpClientField.setAccessible(true); - OkHttpClient okHttpClient = (OkHttpClient) httpClientField.get(daprHttp); + HttpClient okHttpClient = (HttpClient) httpClientField.get(daprHttp); assertNotNull(okHttpClient); return okHttpClient; } diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java index 2b38d78090..17d9205c29 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpStub.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpStub.java @@ -16,6 +16,7 @@ import reactor.core.publisher.Mono; import reactor.util.context.ContextView; +import java.time.Duration; import java.util.List; import java.util.Map; @@ -25,6 +26,8 @@ */ public class DaprHttpStub extends DaprHttp { + private static final Duration READ_TIMEOUT = Duration.ofSeconds(60); + public static class ResponseStub extends DaprHttp.Response { public ResponseStub(byte[] body, Map headers, int statusCode) { super(body, headers, statusCode); @@ -34,7 +37,7 @@ public ResponseStub(byte[] body, Map headers, int statusCode) { * Instantiates a stub for DaprHttp */ public DaprHttpStub() { - super(null, 3000, "stubToken", null); + super(null, 3000, "stubToken", READ_TIMEOUT, null); } /** diff --git a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java index bf21e64191..ad6753479a 100644 --- a/sdk/src/test/java/io/dapr/client/DaprHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprHttpTest.java @@ -16,14 +16,10 @@ import io.dapr.exceptions.DaprErrorDetails; import io.dapr.exceptions.DaprException; import io.dapr.utils.TypeRef; -import okhttp3.MediaType; -import okhttp3.OkHttpClient; -import okhttp3.ResponseBody; -import okhttp3.mock.Behavior; -import okhttp3.mock.MockInterceptor; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.util.context.Context; @@ -32,214 +28,407 @@ import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension; import java.io.IOException; -import java.net.HttpURLConnection; -import java.util.Collections; -import java.util.HashMap; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import static io.dapr.utils.TestUtils.formatIpAddress; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(SystemStubsExtension.class) public class DaprHttpTest { - @SystemStub - public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); - - private static final String STATE_PATH = DaprHttp.API_VERSION + "/state"; - + private static final int HTTP_OK = 200; + private static final int HTTP_SERVER_ERROR = 500; + private static final int HTTP_NO_CONTENT = 204; + private static final int HTTP_NOT_FOUND = 404; private static final String EXPECTED_RESULT = "{\"data\":\"ewoJCSJwcm9wZXJ0eUEiOiAidmFsdWVBIiwKCQkicHJvcGVydHlCIjogInZhbHVlQiIKCX0=\"}"; - + private static final Duration READ_TIMEOUT = Duration.ofSeconds(60); + + @SystemStub + private final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + private String sidecarIp; private String daprTokenApi; - private OkHttpClient okHttpClient; + private HttpClient httpClient; - private MockInterceptor mockInterceptor; - - private ObjectSerializer serializer = new ObjectSerializer(); + private final ObjectSerializer serializer = new ObjectSerializer(); @BeforeEach public void setUp() { sidecarIp = formatIpAddress(Properties.SIDECAR_IP.get()); daprTokenApi = Properties.API_TOKEN.get(); - mockInterceptor = new MockInterceptor(Behavior.UNORDERED); - okHttpClient = new OkHttpClient.Builder().addInterceptor(mockInterceptor).build(); + httpClient = mock(HttpClient.class); } @Test public void invokeApi_daprApiToken_present() throws IOException { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .hasHeader(Headers.DAPR_API_TOKEN) - .respond(serializer.serialize(EXPECTED_RESULT)); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + environmentVariables.set(Properties.API_TOKEN.getEnvName(), "xyz"); assertEquals("xyz", Properties.API_TOKEN.get()); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), okHttpClient); - Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty()); + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, Properties.API_TOKEN.get(), READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + (byte[]) null, + null, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("POST", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); + assertEquals("xyz", request.headers().firstValue(Headers.DAPR_API_TOKEN).get()); } @Test public void invokeApi_daprApiToken_absent() throws IOException { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .not() - .hasHeader(Headers.DAPR_API_TOKEN) - .respond(serializer.serialize(EXPECTED_RESULT)); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + assertNull(Properties.API_TOKEN.get()); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, null, Context.empty()); + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + (byte[]) null, + null, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("POST", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); + assertFalse(request.headers().map().containsKey(Headers.DAPR_API_TOKEN)); } @Test public void invokeMethod() throws IOException { - Map headers = new HashMap<>(); - headers.put("content-type", "text/html"); - headers.put("header1", "value1"); - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .respond(serializer.serialize(EXPECTED_RESULT)); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty()); + Map headers = Map.of( + "content-type", "text/html", + "header1", "value1" + ); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + (byte[]) null, + headers, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("POST", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); + assertEquals("text/html", request.headers().firstValue("content-type").get()); + assertEquals("value1", request.headers().firstValue("header1").get()); } @Test public void invokeMethodIPv6() throws IOException { sidecarIp = formatIpAddress("2001:db8:3333:4444:5555:6666:7777:8888"); - Map headers = new HashMap<>(); - headers.put("content-type", "text/html"); - headers.put("header1", "value1"); - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .respond(serializer.serialize(EXPECTED_RESULT)); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, (byte[]) null, headers, Context.empty()); + Map headers = Map.of( + "content-type", "text/html", + "header1", "value1" + ); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + (byte[]) null, + headers, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("POST", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); + assertEquals("text/html", request.headers().firstValue("content-type").get()); + assertEquals("value1", request.headers().firstValue("header1").get()); } @Test public void invokePostMethod() throws IOException { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .respond(serializer.serialize(EXPECTED_RESULT)) - .addHeader("Header", "Value"); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, "", null, Context.empty()); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + "", + null, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("POST", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); } @Test public void invokeDeleteMethod() throws IOException { - mockInterceptor.addRule() - .delete("http://" + sidecarIp + ":3500/v1.0/state") - .respond(serializer.serialize(EXPECTED_RESULT)); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("DELETE", "v1.0/state".split("/"), null, (String) null, null, Context.empty()); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "DELETE", + "v1.0/state".split("/"), + null, + (String) null, + null, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("DELETE", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); } @Test - public void invokeHEADMethod() throws IOException { - mockInterceptor.addRule().head("http://127.0.0.1:3500/v1.0/state").respond(HttpURLConnection.HTTP_OK); - DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("HEAD", "v1.0/state".split("/"), null, (String) null, null, Context.empty()); + public void invokeHeadMethod() { + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "HEAD", + "v1.0/state".split("/"), + null, + (String) null, + null, + Context.empty() + ); DaprHttp.Response response = mono.block(); - assertEquals(HttpURLConnection.HTTP_OK, response.getStatusCode()); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + + assertEquals("HEAD", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); + assertEquals(HTTP_OK, response.getStatusCode()); } - + @Test public void invokeGetMethod() throws IOException { - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3500/v1.0/get") - .respond(serializer.serialize(EXPECTED_RESULT)); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = daprHttp.invokeApi("GET", "v1.0/get".split("/"), null, null, Context.empty()); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "GET", + "v1.0/state".split("/"), + null, + null, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("GET", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state", request.uri().toString()); } @Test public void invokeMethodWithHeaders() throws IOException { - Map headers = new HashMap<>(); - headers.put("header", "value"); - headers.put("header1", "value1"); - Map> urlParameters = new HashMap<>(); - urlParameters.put("orderId", Collections.singletonList("41")); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41") - .respond(serializer.serialize(EXPECTED_RESULT)); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("GET", "v1.0/state/order".split("/"), urlParameters, headers, Context.empty()); + Map headers = Map.of( + "header", "value", + "header1", "value1" + ); + Map> urlParameters = Map.of( + "orderId", List.of("41") + ); + byte[] content = serializer.serialize(EXPECTED_RESULT); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_OK); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(HttpRequest.class); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "GET", + "v1.0/state/order".split("/"), + urlParameters, + headers, + Context.empty() + ); DaprHttp.Response response = mono.block(); String body = serializer.deserialize(response.getBody(), String.class); + + verify(httpClient).sendAsync(requestCaptor.capture(), any()); + + HttpRequest request = requestCaptor.getValue(); + assertEquals(EXPECTED_RESULT, body); + assertEquals("GET", request.method()); + assertEquals("http://" + sidecarIp + ":3500/v1.0/state/order?orderId=41", request.uri().toString()); + assertEquals("value", request.headers().firstValue("header").get()); + assertEquals("value1", request.headers().firstValue("header1").get()); } @Test - public void invokePostMethodRuntime() throws IOException { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .respond(500); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = - daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); + public void invokePostMethodRuntime() { + MockHttpResponse mockHttpResponse = new MockHttpResponse(HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + null, + Context.empty()); + StepVerifier.create(mono).expectError(RuntimeException.class).verify(); } @Test - public void invokePostDaprError() throws IOException { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .respond(500, ResponseBody.create(MediaType.parse("text"), - "{\"errorCode\":null,\"message\":null}")); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); + public void invokePostDaprError() { + byte[] content = "{\"errorCode\":null,\"message\":null}".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + null, + Context.empty() + ); + StepVerifier.create(mono).expectError(RuntimeException.class).verify(); } @Test - public void invokePostMethodUnknownError() throws IOException { - mockInterceptor.addRule() - .post("http://" + sidecarIp + ":3500/v1.0/state") - .respond(500, ResponseBody.create(MediaType.parse("application/json"), - "{\"errorCode\":\"null\",\"message\":\"null\"}")); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono mono = daprHttp.invokeApi("POST", "v1.0/state".split("/"), null, null, Context.empty()); + public void invokePostMethodUnknownError() { + byte[] content = "{\"errorCode\":null,\"message\":null}".getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/state".split("/"), + null, + null, + Context.empty() + ); + StepVerifier.create(mono).expectError(RuntimeException.class).verify(); } @Test public void validateExceptionParsing() { - final String payload = "{" + + String payload = "{" + "\"errorCode\":\"ERR_PUBSUB_NOT_FOUND\"," + "\"message\":\"pubsub abc is not found\"," + "\"details\":[" + @@ -249,14 +438,24 @@ public void validateExceptionParsing() { "\"metadata\":{}," + "\"reason\":\"DAPR_PUBSUB_NOT_FOUND\"" + "}]}"; - mockInterceptor.addRule() - .post("http://127.0.0.1:3500/v1.0/pubsub/publish") - .respond(500, ResponseBody.create(MediaType.parse("application/json"), - payload)); - DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, okHttpClient); - Mono mono = daprHttp.invokeApi("POST", "v1.0/pubsub/publish".split("/"), null, null, Context.empty()); + byte[] content = payload.getBytes(); + MockHttpResponse mockHttpResponse = new MockHttpResponse(content, HTTP_SERVER_ERROR); + CompletableFuture> mockResponse = CompletableFuture.completedFuture(mockHttpResponse); + + when(httpClient.sendAsync(any(), any())).thenReturn(mockResponse); + + DaprHttp daprHttp = new DaprHttp(Properties.SIDECAR_IP.get(), 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono mono = daprHttp.invokeApi( + "POST", + "v1.0/pubsub/publish".split("/"), + null, + null, + Context.empty() + ); + StepVerifier.create(mono).expectErrorMatches(e -> { assertEquals(DaprException.class, e.getClass()); + DaprException daprException = (DaprException)e; assertEquals("ERR_PUBSUB_NOT_FOUND", daprException.getErrorCode()); assertEquals("DAPR_PUBSUB_NOT_FOUND", @@ -267,15 +466,15 @@ public void validateExceptionParsing() { } /** - * The purpose of this test is to show that it doesn't matter when the client is called, the actual coll to DAPR + * The purpose of this test is to show that it doesn't matter when the client is called, the actual call to DAPR * will be done when the output Mono response call the Mono.block method. - * Like for instanche if you call getState, withouth blocking for the response, and then call delete for the same state - * you just retrived but block for the delete response, when later you block for the response of the getState, you will - * not found the state. + * Like for instance if you call getState, without blocking for the response, and then call delete for the same state + * you just retrieved but block for the delete response, when later you block for the response of the getState, you will + * not find the state. *

This test will execute the following flow:

*
    - *
  1. Exeucte client getState for Key=key1
  2. - *
  3. Block for result to the the state
  4. + *
  5. Execute client getState for Key=key1
  6. + *
  7. Block for result to the state
  8. *
  9. Assert the Returned State is the expected to key1
  10. *
  11. Execute client getState for Key=key2
  12. *
  13. Execute client deleteState for Key=key2
  14. @@ -285,35 +484,64 @@ public void validateExceptionParsing() { * * @throws IOException - Test will fail if any unexpected exception is being thrown */ - @Test() + @Test public void testCallbackCalledAtTheExpectedTimeTest() throws IOException { - String deletedStateKey = "deletedKey"; String existingState = "existingState"; - String urlDeleteState = STATE_PATH + "/" + deletedStateKey; - String urlExistingState = STATE_PATH + "/" + existingState; - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3500/" + urlDeleteState) - .respond(200, ResponseBody.create(MediaType.parse("application/json"), - deletedStateKey)); - mockInterceptor.addRule() - .delete("http://" + sidecarIp + ":3500/" + urlDeleteState) - .respond(204); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3500/" + urlExistingState) - .respond(200, ResponseBody.create(MediaType.parse("application/json"), - serializer.serialize(existingState))); - DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, okHttpClient); - Mono response = daprHttp.invokeApi("GET", urlExistingState.split("/"), null, null, Context.empty()); + String urlExistingState = "v1.0/state/" + existingState; + String deletedStateKey = "deletedKey"; + String urlDeleteState = "v1.0/state/" + deletedStateKey; + + when(httpClient.sendAsync(any(), any())).thenAnswer(invocation -> { + HttpRequest request = invocation.getArgument(0); + String url = request.uri().toString(); + + if (request.method().equals("GET") && url.contains(urlExistingState)) { + MockHttpResponse mockHttpResponse = new MockHttpResponse(serializer.serialize(existingState), HTTP_OK); + + return CompletableFuture.completedFuture(mockHttpResponse); + } + + if (request.method().equals("DELETE")) { + return CompletableFuture.completedFuture(new MockHttpResponse(HTTP_NO_CONTENT)); + } + + if (request.method().equals("GET")) { + byte [] content = "{\"errorCode\":\"404\",\"message\":\"State Not Found\"}".getBytes(); + + return CompletableFuture.completedFuture(new MockHttpResponse(content, HTTP_NOT_FOUND)); + } + + return CompletableFuture.failedFuture(new RuntimeException("Unexpected call")); + }); + + DaprHttp daprHttp = new DaprHttp(sidecarIp, 3500, daprTokenApi, READ_TIMEOUT, httpClient); + Mono response = daprHttp.invokeApi( + "GET", + urlExistingState.split("/"), + null, + null, + Context.empty() + ); + assertEquals(existingState, serializer.deserialize(response.block().getBody(), String.class)); - Mono responseDeleted = daprHttp.invokeApi("GET", urlDeleteState.split("/"), null, null, Context.empty()); - Mono responseDeleteKey = - daprHttp.invokeApi("DELETE", urlDeleteState.split("/"), null, null, Context.empty()); + + Mono responseDeleted = daprHttp.invokeApi( + "GET", + urlDeleteState.split("/"), + null, + null, + Context.empty() + ); + Mono responseDeleteKey = daprHttp.invokeApi( + "DELETE", + urlDeleteState.split("/"), + null, + null, + Context.empty() + ); + assertNull(serializer.deserialize(responseDeleteKey.block().getBody(), String.class)); - mockInterceptor.reset(); - mockInterceptor.addRule() - .get("http://" + sidecarIp + ":3500/" + urlDeleteState) - .respond(404, ResponseBody.create(MediaType.parse("application/json"), - "{\"errorCode\":\"404\",\"message\":\"State Not Found\"}")); + try { responseDeleted.block(); fail("Expected DaprException"); @@ -321,5 +549,4 @@ public void testCallbackCalledAtTheExpectedTimeTest() throws IOException { assertEquals(DaprException.class, ex.getClass()); } } - } diff --git a/sdk/src/test/java/io/dapr/client/MockHttpResponse.java b/sdk/src/test/java/io/dapr/client/MockHttpResponse.java new file mode 100644 index 0000000000..b5f0510a05 --- /dev/null +++ b/sdk/src/test/java/io/dapr/client/MockHttpResponse.java @@ -0,0 +1,80 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client; + +import javax.net.ssl.SSLSession; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.util.Collections; +import java.util.Optional; + +public class MockHttpResponse implements HttpResponse { + + private final byte[] body; + private final int statusCode; + + public MockHttpResponse(int statusCode) { + this.body = null; + this.statusCode = statusCode; + } + + public MockHttpResponse(byte[] body, int statusCode) { + this.body = body; + this.statusCode = statusCode; + } + + @Override + public int statusCode() { + return statusCode; + } + + @Override + public HttpRequest request() { + return null; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + @Override + public HttpHeaders headers() { + return HttpHeaders.of(Collections.emptyMap(), (a, b) -> true); + } + + @Override + public byte[] body() { + return body; + } + + @Override + public Optional sslSession() { + return Optional.empty(); + } + + @Override + public URI uri() { + return null; + } + + @Override + public HttpClient.Version version() { + return null; + } +} diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java index 9fce309346..145f61deaa 100644 --- a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java +++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java @@ -15,6 +15,7 @@ import io.dapr.testcontainers.converter.ComponentYamlConverter; import io.dapr.testcontainers.converter.ConfigurationYamlConverter; +import io.dapr.testcontainers.converter.HttpEndpointYamlConverter; import io.dapr.testcontainers.converter.SubscriptionYamlConverter; import io.dapr.testcontainers.converter.YamlConverter; import io.dapr.testcontainers.converter.YamlMapperFactory; @@ -48,6 +49,7 @@ public class DaprContainer extends GenericContainer { private static final Yaml YAML_MAPPER = YamlMapperFactory.create(); private static final YamlConverter COMPONENT_CONVERTER = new ComponentYamlConverter(YAML_MAPPER); private static final YamlConverter SUBSCRIPTION_CONVERTER = new SubscriptionYamlConverter(YAML_MAPPER); + private static final YamlConverter HTTPENDPOINT_CONVERTER = new HttpEndpointYamlConverter(YAML_MAPPER); private static final YamlConverter CONFIGURATION_CONVERTER = new ConfigurationYamlConverter( YAML_MAPPER); private static final WaitStrategy WAIT_STRATEGY = Wait.forHttp("/v1.0/healthz/outbound") @@ -56,6 +58,7 @@ public class DaprContainer extends GenericContainer { private final Set components = new HashSet<>(); private final Set subscriptions = new HashSet<>(); + private final Set httpEndpoints = new HashSet<>(); private DaprLogLevel daprLogLevel = DaprLogLevel.INFO; private String appChannelAddress = "localhost"; private String placementService = "placement"; @@ -65,6 +68,7 @@ public class DaprContainer extends GenericContainer { private DaprPlacementContainer placementContainer; private String appName; private Integer appPort; + private String appHealthCheckPath; private boolean shouldReusePlacement; /** @@ -99,6 +103,10 @@ public Set getSubscriptions() { return subscriptions; } + public Set getHttpEndpoints() { + return httpEndpoints; + } + public DaprContainer withAppPort(Integer port) { this.appPort = port; return this; @@ -109,6 +117,11 @@ public DaprContainer withAppChannelAddress(String appChannelAddress) { return this; } + public DaprContainer withAppHealthCheckPath(String appHealthCheckPath) { + this.appHealthCheckPath = appHealthCheckPath; + return this; + } + public DaprContainer withConfiguration(Configuration configuration) { this.configuration = configuration; return this; @@ -134,6 +147,11 @@ public DaprContainer withSubscription(Subscription subscription) { return this; } + public DaprContainer withHttpEndpoint(HttpEndpoint httpEndpoint) { + httpEndpoints.add(httpEndpoint); + return this; + } + public DaprContainer withPlacementImage(String placementDockerImageName) { this.placementDockerImageName = placementDockerImageName; return this; @@ -161,7 +179,7 @@ public DaprContainer withComponent(Component component) { */ public DaprContainer withComponent(Path path) { try { - Map component = this.YAML_MAPPER.loadAs(Files.newInputStream(path), Map.class); + Map component = YAML_MAPPER.loadAs(Files.newInputStream(path), Map.class); String type = (String) component.get("type"); Map metadata = (Map) component.get("metadata"); @@ -221,12 +239,12 @@ protected void configure() { List cmds = new ArrayList<>(); cmds.add("./daprd"); - cmds.add("-app-id"); + cmds.add("--app-id"); cmds.add(appName); cmds.add("--dapr-listen-addresses=0.0.0.0"); cmds.add("--app-protocol"); cmds.add(DAPR_PROTOCOL.getName()); - cmds.add("-placement-host-address"); + cmds.add("--placement-host-address"); cmds.add(placementService + ":50005"); if (appChannelAddress != null && !appChannelAddress.isEmpty()) { @@ -239,6 +257,12 @@ protected void configure() { cmds.add(Integer.toString(appPort)); } + if (appHealthCheckPath != null && !appHealthCheckPath.isEmpty()) { + cmds.add("--enable-app-health-check"); + cmds.add("--app-health-check-path"); + cmds.add(appHealthCheckPath); + } + if (configuration != null) { cmds.add("--config"); cmds.add("/dapr-resources/" + configuration.getName() + ".yaml"); @@ -291,6 +315,15 @@ protected void configure() { withCopyToContainer(Transferable.of(subscriptionYaml), "/dapr-resources/" + subscription.getName() + ".yaml"); } + for (HttpEndpoint endpoint : httpEndpoints) { + String endpointYaml = HTTPENDPOINT_CONVERTER.convert(endpoint); + + LOGGER.info("> HTTPEndpoint YAML: \n"); + LOGGER.info("\t\n" + endpointYaml + "\n"); + + withCopyToContainer(Transferable.of(endpointYaml), "/dapr-resources/" + endpoint.getName() + ".yaml"); + } + dependsOn(placementContainer); } diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/HttpEndpoint.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/HttpEndpoint.java new file mode 100644 index 0000000000..482cac9a76 --- /dev/null +++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/HttpEndpoint.java @@ -0,0 +1,19 @@ +package io.dapr.testcontainers; + +public class HttpEndpoint { + private String name; + private String baseUrl; + + public HttpEndpoint(String name, String baseUrl) { + this.name = name; + this.baseUrl = baseUrl; + } + + public String getName() { + return name; + } + + public String getBaseUrl() { + return baseUrl; + } +} diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/converter/HttpEndpointYamlConverter.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/converter/HttpEndpointYamlConverter.java new file mode 100644 index 0000000000..db4a9cba43 --- /dev/null +++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/converter/HttpEndpointYamlConverter.java @@ -0,0 +1,32 @@ +package io.dapr.testcontainers.converter; + +import io.dapr.testcontainers.HttpEndpoint; +import org.yaml.snakeyaml.Yaml; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class HttpEndpointYamlConverter implements YamlConverter { + private final Yaml mapper; + + public HttpEndpointYamlConverter(Yaml mapper) { + this.mapper = mapper; + } + + @Override + public String convert(HttpEndpoint endpoint) { + Map endpointProps = new LinkedHashMap<>(); + endpointProps.put("apiVersion", "dapr.io/v1alpha1"); + endpointProps.put("kind", "HTTPEndpoint"); + + Map endpointMetadata = new LinkedHashMap<>(); + endpointMetadata.put("name", endpoint.getName()); + endpointProps.put("metadata", endpointMetadata); + + Map endpointSpec = new LinkedHashMap<>(); + endpointSpec.put("baseUrl", endpoint.getBaseUrl()); + endpointProps.put("spec", endpointSpec); + + return mapper.dumpAsMap(endpointProps); + } +} diff --git a/testcontainers-dapr/src/test/java/io/dapr/testcontainers/converter/HttpEndpointYamlConverterTest.java b/testcontainers-dapr/src/test/java/io/dapr/testcontainers/converter/HttpEndpointYamlConverterTest.java new file mode 100644 index 0000000000..ec2540fe92 --- /dev/null +++ b/testcontainers-dapr/src/test/java/io/dapr/testcontainers/converter/HttpEndpointYamlConverterTest.java @@ -0,0 +1,40 @@ +package io.dapr.testcontainers.converter; + +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.HttpEndpoint; +import org.junit.jupiter.api.Test; +import org.yaml.snakeyaml.Yaml; + +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HttpEndpointYamlConverterTest { + private final Yaml MAPPER = YamlMapperFactory.create(); + + private final HttpEndpointYamlConverter converter = new HttpEndpointYamlConverter(MAPPER); + + @Test + void testHttpEndpointToYaml() { + DaprContainer dapr = new DaprContainer("daprio/daprd") + .withAppName("dapr-app") + .withAppPort(8081) + .withHttpEndpoint(new HttpEndpoint("my-endpoint", "http://localhost:8080")) + .withAppChannelAddress("host.testcontainers.internal"); + + Set endpoints = dapr.getHttpEndpoints(); + assertEquals(1, endpoints.size()); + + HttpEndpoint endpoint = endpoints.iterator().next(); + String endpointYaml = converter.convert(endpoint); + String expectedEndpointYaml = + "apiVersion: dapr.io/v1alpha1\n" + + "kind: HTTPEndpoint\n" + + "metadata:\n" + + " name: my-endpoint\n" + + "spec:\n" + + " baseUrl: http://localhost:8080\n"; + + assertEquals(expectedEndpointYaml, endpointYaml); + } +}