From 4215303ba6308a24158f2ac72945082a964ef5eb Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Wed, 20 Mar 2024 12:02:00 +0100 Subject: [PATCH] Add RequestOptions to ingress requests. --- .../src/main/resources/templates/Client.hbs | 18 +++- .../src/main/resources/templates/Client.hbs | 10 ++- .../sdk/client/DefaultIngressClient.java | 29 ++++-- .../dev/restate/sdk/client/IngressClient.java | 5 +- .../restate/sdk/client/RequestOptions.java | 90 +++++++++++++++++++ .../workflow/impl/WorkflowCodegenUtil.java | 19 ++-- 6 files changed, 152 insertions(+), 19 deletions(-) create mode 100644 sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java diff --git a/sdk-api-gen/src/main/resources/templates/Client.hbs b/sdk-api-gen/src/main/resources/templates/Client.hbs index d90dff2f..fd835a55 100644 --- a/sdk-api-gen/src/main/resources/templates/Client.hbs +++ b/sdk-api-gen/src/main/resources/templates/Client.hbs @@ -97,11 +97,18 @@ public class {{generatedClassSimpleName}} { {{#handlers}} public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { + {{^outputEmpty}}return {{/outputEmpty}}this.{{name}}( + {{^inputEmpty}}req, {{/inputEmpty}} + dev.restate.sdk.client.RequestOptions.DEFAULT); + } + + public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{name}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) { {{^outputEmpty}}return {{/outputEmpty}}this.ingressClient.call( {{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, {{outputSerdeFieldName}}, - {{#if inputEmpty}}null{{else}}req{{/if}}); + {{#if inputEmpty}}null{{else}}req{{/if}}, + requestOptions); }{{/handlers}} public Send send() { @@ -111,10 +118,17 @@ public class {{generatedClassSimpleName}} { public class Send { {{#handlers}} public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) { + return this.{{name}}( + {{^inputEmpty}}req, {{/inputEmpty}} + dev.restate.sdk.client.RequestOptions.DEFAULT); + } + + public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) { return IngressClient.this.ingressClient.send( {{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, - {{#if inputEmpty}}null{{else}}req{{/if}}); + {{#if inputEmpty}}null{{else}}req{{/if}}, + requestOptions); }{{/handlers}} } } diff --git a/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs b/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs index f0d88264..3c36cc97 100644 --- a/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs +++ b/sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs @@ -72,12 +72,13 @@ object {{generatedClassSimpleName}} { class IngressClient(private val ingressClient: dev.restate.sdk.client.IngressClient{{#isObject}}, private val key: String{{/isObject}}) { {{#handlers}} - suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}): {{{boxedOutputFqcn}}} { + suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): {{{boxedOutputFqcn}}} { return this.ingressClient.call( {{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, {{outputSerdeFieldName}}, - {{#if inputEmpty}}null{{else}}req{{/if}}); + {{#if inputEmpty}}null{{else}}req{{/if}}, + requestOptions); }{{/handlers}} fun send(): Send { @@ -86,11 +87,12 @@ object {{generatedClassSimpleName}} { inner class Send { {{#handlers}} - suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}): String { + suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String { return this@IngressClient.ingressClient.send( {{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}}, {{inputSerdeFieldName}}, - {{#if inputEmpty}}null{{else}}req{{/if}}); + {{#if inputEmpty}}null{{else}}req{{/if}}, + requestOptions); }{{/handlers}} } } diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java index 32c1a533..97ffb32b 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/DefaultIngressClient.java @@ -34,8 +34,13 @@ public DefaultIngressClient(HttpClient httpClient, String baseUri) { } @Override - public Res call(Target target, Serde reqSerde, Serde resSerde, Req req) { - HttpRequest request = prepareHttpRequest(target, false, reqSerde, req); + public Res call( + Target target, + Serde reqSerde, + Serde resSerde, + Req req, + RequestOptions requestOptions) { + HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions); HttpResponse response; try { response = httpClient.send(request, HttpResponse.BodyHandlers.ofByteArray()); @@ -54,8 +59,8 @@ public Res call(Target target, Serde reqSerde, Serde resSer } @Override - public String send(Target target, Serde reqSerde, Req req) { - HttpRequest request = prepareHttpRequest(target, true, reqSerde, req); + public String send(Target target, Serde reqSerde, Req req, RequestOptions options) { + HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options); HttpResponse response; try { response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream()); @@ -93,11 +98,25 @@ private URI toRequestURI(Target target, boolean isSend) { } private HttpRequest prepareHttpRequest( - Target target, boolean isSend, Serde reqSerde, Req req) { + Target target, boolean isSend, Serde reqSerde, Req req, RequestOptions options) { var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend)); if (reqSerde.contentType() != null) { reqBuilder.header("content-type", reqSerde.contentType()); } + + // Add idempotency key and period + if (options.getIdempotencyKey() != null) { + reqBuilder.header("idempotency-key", options.getIdempotencyKey()); + } + if (options.getIdempotencyRetainPeriod() != null) { + reqBuilder.header( + "idempotency-retention-period", + String.valueOf(options.getIdempotencyRetainPeriod().toSeconds())); + } + + // Add additional headers + options.getAdditionalHeaders().forEach(reqBuilder::header); + return reqBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(reqSerde.serialize(req))).build(); } diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java index 2b6d9125..9a2e6218 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java +++ b/sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java @@ -13,9 +13,10 @@ import java.net.http.HttpClient; public interface IngressClient { - Res call(Target target, Serde reqSerde, Serde resSerde, Req req); + Res call( + Target target, Serde reqSerde, Serde resSerde, Req req, RequestOptions options); - String send(Target target, Serde reqSerde, Req req); + String send(Target target, Serde reqSerde, Req req, RequestOptions options); static IngressClient defaultClient(String baseUri) { return new DefaultIngressClient(HttpClient.newHttpClient(), baseUri); diff --git a/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java b/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java new file mode 100644 index 00000000..71a3a2d1 --- /dev/null +++ b/sdk-common/src/main/java/dev/restate/sdk/client/RequestOptions.java @@ -0,0 +1,90 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH +// +// This file is part of the Restate Java SDK, +// which is released under the MIT license. +// +// You can find a copy of the license in file LICENSE in the root +// directory of this repository or package, or at +// https://github.com/restatedev/sdk-java/blob/main/LICENSE +package dev.restate.sdk.client; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class RequestOptions { + + public static final RequestOptions DEFAULT = new RequestOptions(); + + private String idempotencyKey; + private Duration idempotencyRetainPeriod; + private final Map additionalHeaders = new HashMap<>(); + + public RequestOptions withIdempotency(String idempotencyKey) { + this.idempotencyKey = idempotencyKey; + return this; + } + + public RequestOptions withIdempotency(String idempotencyKey, Duration idempotencyRetainPeriod) { + this.idempotencyKey = idempotencyKey; + this.idempotencyRetainPeriod = idempotencyRetainPeriod; + return this; + } + + public RequestOptions withHeader(String name, String value) { + this.additionalHeaders.put(name, value); + return this; + } + + public RequestOptions withHeaders(Map additionalHeaders) { + this.additionalHeaders.putAll(additionalHeaders); + return this; + } + + public String getIdempotencyKey() { + return idempotencyKey; + } + + public Duration getIdempotencyRetainPeriod() { + return idempotencyRetainPeriod; + } + + public Map getAdditionalHeaders() { + return additionalHeaders; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RequestOptions that = (RequestOptions) o; + + if (!Objects.equals(idempotencyKey, that.idempotencyKey)) return false; + if (!Objects.equals(idempotencyRetainPeriod, that.idempotencyRetainPeriod)) return false; + return Objects.equals(additionalHeaders, that.additionalHeaders); + } + + @Override + public int hashCode() { + int result = idempotencyKey != null ? idempotencyKey.hashCode() : 0; + result = + 31 * result + (idempotencyRetainPeriod != null ? idempotencyRetainPeriod.hashCode() : 0); + result = 31 * result + (additionalHeaders != null ? additionalHeaders.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return "RequestOptions{" + + "idempotencyKey='" + + idempotencyKey + + '\'' + + ", idempotencyRetainPeriod=" + + idempotencyRetainPeriod + + ", additionalHeaders=" + + additionalHeaders + + '}'; + } +} diff --git a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java index aa068616..b6887694 100644 --- a/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java +++ b/sdk-workflow-api/src/main/java/dev/restate/sdk/workflow/impl/WorkflowCodegenUtil.java @@ -13,6 +13,7 @@ import dev.restate.sdk.Awaitable; import dev.restate.sdk.Context; import dev.restate.sdk.client.IngressClient; +import dev.restate.sdk.client.RequestOptions; import dev.restate.sdk.common.*; import dev.restate.sdk.workflow.WorkflowExecutionState; import dev.restate.sdk.workflow.generated.GetOutputResponse; @@ -153,7 +154,8 @@ public static WorkflowExecutionState submit( Target.service(workflowName, "submit"), WorkflowImpl.INVOKE_REQUEST_SERDE, WorkflowImpl.WORKFLOW_EXECUTION_STATE_SERDE, - InvokeRequest.fromAny(workflowKey, payload)); + InvokeRequest.fromAny(workflowKey, payload), + RequestOptions.DEFAULT); } public static Optional getOutput( @@ -164,7 +166,8 @@ public static Optional getOutput( workflowManagerObjectName(workflowName), workflowKey, "getOutput"), CoreSerdes.VOID, WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE, - null); + null, + RequestOptions.DEFAULT); if (response.hasNotCompleted()) { return Optional.empty(); } @@ -184,7 +187,8 @@ public static boolean isCompleted( workflowManagerObjectName(workflowName), workflowKey, "getOutput"), CoreSerdes.VOID, WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE, - null); + null, + RequestOptions.DEFAULT); if (response.hasFailure()) { throw new TerminalException( TerminalException.Code.fromValue(response.getFailure().getCode()), @@ -204,7 +208,8 @@ public static T invokeShared( Target.service(workflowName, handlerName), WorkflowImpl.INVOKE_REQUEST_SERDE, resSerde, - InvokeRequest.fromAny(workflowKey, payload)); + InvokeRequest.fromAny(workflowKey, payload), + RequestOptions.DEFAULT); } public static void invokeSharedSend( @@ -216,7 +221,8 @@ public static void invokeSharedSend( ingressClient.send( Target.service(workflowName, handlerName), WorkflowImpl.INVOKE_REQUEST_SERDE, - InvokeRequest.fromAny(workflowKey, payload)); + InvokeRequest.fromAny(workflowKey, payload), + RequestOptions.DEFAULT); } public static Optional getState( @@ -227,7 +233,8 @@ public static Optional getState( workflowManagerObjectName(workflowName), workflowKey, "getState"), CoreSerdes.JSON_STRING, WorkflowImpl.GET_STATE_RESPONSE_SERDE, - key.name()); + key.name(), + RequestOptions.DEFAULT); if (response.hasEmpty()) { return Optional.empty(); }