Skip to content

Commit

Permalink
Merge send and sendDelayed. Fix #262
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Mar 25, 2024
1 parent 34537fb commit 828d131
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 67 deletions.
22 changes: 6 additions & 16 deletions sdk-api-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -49,38 +49,28 @@ public class {{generatedClassSimpleName}} {
}{{/handlers}}

public Send send() {
return new Send();
return new Send(null);
}

public SendDelayed sendDelayed(Duration delay) {
return new SendDelayed(delay);
public Send send(Duration delay) {
return new Send(delay);
}

public class Send {
{{#handlers}}
public void {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
ContextClient.this.ctx.send(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, ContextClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}});
}{{/handlers}}
}

public class SendDelayed {

private final Duration delay;

SendDelayed(Duration delay) {
Send(Duration delay) {
this.delay = delay;
}

{{#handlers}}
public void {{name}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
ContextClient.this.ctx.sendDelayed(
ContextClient.this.ctx.send(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, ContextClient.this.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}null{{else}}req{{/if}},
this.delay);
delay);
}{{/handlers}}
}
}
Expand Down
23 changes: 4 additions & 19 deletions sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,18 @@ object {{generatedClassSimpleName}} {
{{#if inputEmpty}}Unit{{else}}req{{/if}})
}{{/handlers}}

fun send(): Send {
return Send()
}

fun sendDelayed(delay: Duration): SendDelayed {
return SendDelayed(delay)
fun send(delay: Duration = Duration.ZERO): Send {
return Send(delay)
}

inner class Send {
inner class Send(private val delay: Duration) {
{{#handlers}}
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}) {
this@ContextClient.ctx.send(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@ContextClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}Unit{{else}}req{{/if}});
}{{/handlers}}
}

inner class SendDelayed(private val delay: Duration) {

{{#handlers}}
suspend fun {{name}}({{^inputEmpty}}req: {{{inputFqcn}}}{{/inputEmpty}}) {
this@ContextClient.ctx.sendDelayed(
{{#if isObject}}Target.virtualObject(COMPONENT_NAME, this@ContextClient.key, "{{name}}"){{else}}Target.service(COMPONENT_NAME, "{{name}}"){{/if}},
{{inputSerdeFieldName}},
{{#if inputEmpty}}Unit{{else}}req{{/if}},
this.delay);
delay);
}{{/handlers}}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
return SingleSerdeAwaitableImpl(syscalls, deferred, outputSerde)
}

override suspend fun <T : Any> send(target: Target, inputSerde: Serde<T>, parameter: T) {
val input = inputSerde.serializeWrappingException(syscalls, parameter)

return suspendCancellableCoroutine { cont: CancellableContinuation<Unit> ->
syscalls.send(target, input, null, completingUnitContinuation(cont))
}
}

override suspend fun <T : Any> sendDelayed(
override suspend fun <T : Any> send(
target: Target,
inputSerde: Serde<T>,
parameter: T,
Expand Down
16 changes: 2 additions & 14 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -91,25 +91,13 @@ sealed interface Context {
* @param target the address of the callee
* @param inputSerde Input serde
* @param parameter the invocation request parameter.
*/
suspend fun <T : Any> send(target: Target, inputSerde: Serde<T>, parameter: T)

/**
* Invoke another Restate service without waiting for the response after the provided `delay` has
* elapsed.
*
* This method returns immediately, as the timer is executed and awaited on Restate.
*
* @param target the address of the callee
* @param inputSerde Input serde
* @param parameter the invocation request parameter.
* @param delay time to wait before executing the call
*/
suspend fun <T : Any> sendDelayed(
suspend fun <T : Any> send(
target: Target,
inputSerde: Serde<T>,
parameter: T,
delay: Duration
delay: Duration = Duration.ZERO
)

/**
Expand Down
8 changes: 4 additions & 4 deletions sdk-api/src/main/java/dev/restate/sdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ default void send(Target target, byte[] parameter) {
* @param parameter the invocation request parameter.
* @param delay time to wait before executing the call.
*/
<T> void sendDelayed(Target target, Serde<T> inputSerde, T parameter, Duration delay);
<T> void send(Target target, Serde<T> inputSerde, T parameter, Duration delay);

/** Like {@link #sendDelayed(Target, Serde, Object, Duration)} with raw input. */
default void sendDelayed(Target target, byte[] parameter, Duration delay) {
sendDelayed(target, CoreSerdes.RAW, parameter, delay);
/** Like {@link #send(Target, Serde, Object, Duration)} with raw input. */
default void send(Target target, byte[] parameter, Duration delay) {
send(target, CoreSerdes.RAW, parameter, delay);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter) {
}

@Override
public <T> void sendDelayed(Target target, Serde<T> inputSerde, T parameter, Duration delay) {
public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration delay) {
ByteString input = Util.serializeWrappingException(syscalls, inputSerde, parameter);
Util.<Void>blockOnSyscall(cb -> syscalls.send(target, input, delay, cb));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public void send(
if (target.getKey() != null) {
builder.setKey(target.getKey());
}
if (delay != null) {
if (delay != null && !delay.isZero()) {
builder.setInvokeTime(Instant.now().toEpochMilli() + delay.toMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static void invokeSharedSendDelayed(
String workflowKey,
@Nullable Object payload,
Duration delay) {
ctx.sendDelayed(
ctx.send(
Target.service(workflowName, handlerName),
WorkflowImpl.INVOKE_REQUEST_SERDE,
InvokeRequest.fromAny(workflowKey, payload),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter) {
}

@Override
public <T> void sendDelayed(Target target, Serde<T> inputSerde, T parameter, Duration delay) {
ctx.sendDelayed(target, inputSerde, parameter, delay);
public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration delay) {
ctx.send(target, inputSerde, parameter, delay);
}

@Override
Expand Down

0 comments on commit 828d131

Please sign in to comment.