Skip to content

Commit

Permalink
Add RequestOptions to ingress requests. (#250)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Mar 20, 2024
1 parent 6bbf851 commit aee2f71
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 21 deletions.
18 changes: 16 additions & 2 deletions sdk-api-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,18 @@ public class {{generatedClassSimpleName}} {

{{#handlers}}
public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
{{^outputEmpty}}return {{/outputEmpty}}this.{{name}}(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
}

public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{name}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
{{^outputEmpty}}return {{/outputEmpty}}this.ingressClient.call(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{outputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}});
{{#if inputEmpty}}null{{else}}req{{/if}},
requestOptions);
}{{/handlers}}

public Send send() {
Expand All @@ -111,10 +118,17 @@ public class {{generatedClassSimpleName}} {
public class Send {
{{#handlers}}
public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.{{name}}(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
}

public String {{name}}({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
return IngressClient.this.ingressClient.send(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, IngressClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}});
{{#if inputEmpty}}null{{else}}req{{/if}},
requestOptions);
}{{/handlers}}
}
}
Expand Down
10 changes: 6 additions & 4 deletions sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,13 @@ object {{generatedClassSimpleName}} {
class IngressClient(private val ingressClient: dev.restate.sdk.client.IngressClient{{#isObject}}, private val key: String{{/isObject}}) {

{{#handlers}}
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}): {{{boxedOutputFqcn}}} {
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): {{{boxedOutputFqcn}}} {
return this.ingressClient.call(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{outputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}});
{{#if inputEmpty}}null{{else}}req{{/if}},
requestOptions);
}{{/handlers}}

fun send(): Send {
Expand All @@ -86,11 +87,12 @@ object {{generatedClassSimpleName}} {

inner class Send {
{{#handlers}}
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}): String {
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): String {
return this@IngressClient.ingressClient.send(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@IngressClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}});
{{#if inputEmpty}}null{{else}}req{{/if}},
requestOptions);
}{{/handlers}}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,30 @@
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class DefaultIngressClient implements IngressClient {

private static final JsonFactory JSON_FACTORY = new JsonFactory();

private final HttpClient httpClient;
private final URI baseUri;
private final Map<String, String> headers;

public DefaultIngressClient(HttpClient httpClient, String baseUri) {
public DefaultIngressClient(HttpClient httpClient, String baseUri, Map<String, String> headers) {
this.httpClient = httpClient;
this.baseUri = URI.create(baseUri);
this.headers = headers;
}

@Override
public <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req) {
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req);
public <Req, Res> Res call(
Target target,
Serde<Req> reqSerde,
Serde<Res> resSerde,
Req req,
RequestOptions requestOptions) {
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, requestOptions);
HttpResponse<byte[]> response;
try {
response = httpClient.send(request, HttpResponse.BodyHandlers.ofByteArray());
Expand All @@ -54,8 +62,8 @@ public <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSer
}

@Override
public <Req> String send(Target target, Serde<Req> reqSerde, Req req) {
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req);
public <Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options) {
HttpRequest request = prepareHttpRequest(target, true, reqSerde, req, options);
HttpResponse<InputStream> response;
try {
response = httpClient.send(request, HttpResponse.BodyHandlers.ofInputStream());
Expand Down Expand Up @@ -93,11 +101,30 @@ private URI toRequestURI(Target target, boolean isSend) {
}

private <Req> HttpRequest prepareHttpRequest(
Target target, boolean isSend, Serde<Req> reqSerde, Req req) {
Target target, boolean isSend, Serde<Req> reqSerde, Req req, RequestOptions options) {
var reqBuilder = HttpRequest.newBuilder().uri(toRequestURI(target, isSend));

// Add content-type
if (reqSerde.contentType() != null) {
reqBuilder.header("content-type", reqSerde.contentType());
}

// Add headers
this.headers.forEach(reqBuilder::header);

// Add idempotency key and period
if (options.getIdempotencyKey() != null) {
reqBuilder.header("idempotency-key", options.getIdempotencyKey());
}
if (options.getIdempotencyRetainPeriod() != null) {
reqBuilder.header(
"idempotency-retention-period",
String.valueOf(options.getIdempotencyRetainPeriod().toSeconds()));
}

// Add additional headers
options.getAdditionalHeaders().forEach(reqBuilder::header);

return reqBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(reqSerde.serialize(req))).build();
}

Expand Down
21 changes: 18 additions & 3 deletions sdk-common/src/main/java/dev/restate/sdk/client/IngressClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,28 @@
import dev.restate.sdk.common.Serde;
import dev.restate.sdk.common.Target;
import java.net.http.HttpClient;
import java.util.Collections;
import java.util.Map;

public interface IngressClient {
<Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req);
<Req, Res> Res call(
Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req, RequestOptions options);

<Req> String send(Target target, Serde<Req> reqSerde, Req req);
default <Req, Res> Res call(Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req) {
return call(target, reqSerde, resSerde, req, RequestOptions.DEFAULT);
}

<Req> String send(Target target, Serde<Req> reqSerde, Req req, RequestOptions options);

default <Req> String send(Target target, Serde<Req> reqSerde, Req req) {
return send(target, reqSerde, req, RequestOptions.DEFAULT);
}

static IngressClient defaultClient(String baseUri) {
return new DefaultIngressClient(HttpClient.newHttpClient(), baseUri);
return defaultClient(baseUri, Collections.emptyMap());
}

static IngressClient defaultClient(String baseUri, Map<String, String> headers) {
return new DefaultIngressClient(HttpClient.newHttpClient(), baseUri, headers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate Java SDK,
// which is released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.client;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class RequestOptions {

public static final RequestOptions DEFAULT = new RequestOptions();

private String idempotencyKey;
private Duration idempotencyRetainPeriod;
private final Map<String, String> additionalHeaders = new HashMap<>();

public RequestOptions withIdempotency(String idempotencyKey) {
this.idempotencyKey = idempotencyKey;
return this;
}

public RequestOptions withIdempotency(String idempotencyKey, Duration idempotencyRetainPeriod) {
this.idempotencyKey = idempotencyKey;
this.idempotencyRetainPeriod = idempotencyRetainPeriod;
return this;
}

public RequestOptions withHeader(String name, String value) {
this.additionalHeaders.put(name, value);
return this;
}

public RequestOptions withHeaders(Map<String, String> additionalHeaders) {
this.additionalHeaders.putAll(additionalHeaders);
return this;
}

public String getIdempotencyKey() {
return idempotencyKey;
}

public Duration getIdempotencyRetainPeriod() {
return idempotencyRetainPeriod;
}

public Map<String, String> getAdditionalHeaders() {
return additionalHeaders;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

RequestOptions that = (RequestOptions) o;

if (!Objects.equals(idempotencyKey, that.idempotencyKey)) return false;
if (!Objects.equals(idempotencyRetainPeriod, that.idempotencyRetainPeriod)) return false;
return Objects.equals(additionalHeaders, that.additionalHeaders);
}

@Override
public int hashCode() {
int result = idempotencyKey != null ? idempotencyKey.hashCode() : 0;
result =
31 * result + (idempotencyRetainPeriod != null ? idempotencyRetainPeriod.hashCode() : 0);
result = 31 * result + (additionalHeaders != null ? additionalHeaders.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "RequestOptions{"
+ "idempotencyKey='"
+ idempotencyKey
+ '\''
+ ", idempotencyRetainPeriod="
+ idempotencyRetainPeriod
+ ", additionalHeaders="
+ additionalHeaders
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import dev.restate.sdk.Awaitable;
import dev.restate.sdk.Context;
import dev.restate.sdk.client.IngressClient;
import dev.restate.sdk.client.RequestOptions;
import dev.restate.sdk.common.*;
import dev.restate.sdk.workflow.WorkflowExecutionState;
import dev.restate.sdk.workflow.generated.GetOutputResponse;
Expand Down Expand Up @@ -151,7 +152,8 @@ public static WorkflowExecutionState submit(
Target.service(workflowName, "submit"),
WorkflowImpl.INVOKE_REQUEST_SERDE,
WorkflowImpl.WORKFLOW_EXECUTION_STATE_SERDE,
InvokeRequest.fromAny(workflowKey, payload));
InvokeRequest.fromAny(workflowKey, payload),
RequestOptions.DEFAULT);
}

public static <T> Optional<T> getOutput(
Expand All @@ -162,7 +164,8 @@ public static <T> Optional<T> getOutput(
workflowManagerObjectName(workflowName), workflowKey, "getOutput"),
CoreSerdes.VOID,
WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE,
null);
null,
RequestOptions.DEFAULT);
if (response.hasNotCompleted()) {
return Optional.empty();
}
Expand All @@ -181,7 +184,8 @@ public static boolean isCompleted(
workflowManagerObjectName(workflowName), workflowKey, "getOutput"),
CoreSerdes.VOID,
WorkflowImpl.GET_OUTPUT_RESPONSE_SERDE,
null);
null,
RequestOptions.DEFAULT);
if (response.hasFailure()) {
throw new TerminalException(
response.getFailure().getCode(), response.getFailure().getMessage());
Expand All @@ -200,7 +204,8 @@ public static <T> T invokeShared(
Target.service(workflowName, handlerName),
WorkflowImpl.INVOKE_REQUEST_SERDE,
resSerde,
InvokeRequest.fromAny(workflowKey, payload));
InvokeRequest.fromAny(workflowKey, payload),
RequestOptions.DEFAULT);
}

public static void invokeSharedSend(
Expand All @@ -212,7 +217,8 @@ public static void invokeSharedSend(
ingressClient.send(
Target.service(workflowName, handlerName),
WorkflowImpl.INVOKE_REQUEST_SERDE,
InvokeRequest.fromAny(workflowKey, payload));
InvokeRequest.fromAny(workflowKey, payload),
RequestOptions.DEFAULT);
}

public static <T> Optional<T> getState(
Expand All @@ -223,7 +229,8 @@ public static <T> Optional<T> getState(
workflowManagerObjectName(workflowName), workflowKey, "getState"),
CoreSerdes.JSON_STRING,
WorkflowImpl.GET_STATE_RESPONSE_SERDE,
key.name());
key.name(),
RequestOptions.DEFAULT);
if (response.hasEmpty()) {
return Optional.empty();
}
Expand Down

0 comments on commit aee2f71

Please sign in to comment.