Skip to content

Commit

Permalink
Rename ctx.sideEffect in ctx.run
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed Apr 5, 2024
1 parent 45fc2d0 commit a2330bc
Show file tree
Hide file tree
Showing 17 changed files with 61 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void run(WorkflowContext ctx, LoanRequest loanRequest) {
LOG.info("Loan request submitted");

// 2. Ask human approval
ctx.sideEffect(() -> askHumanApproval(ctx.workflowKey()));
ctx.run(() -> askHumanApproval(ctx.workflowKey()));
ctx.set(STATUS, Status.WAITING_HUMAN_APPROVAL);

// 3. Wait human approval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,7 @@ internal class ContextImpl internal constructor(private val syscalls: Syscalls)
}
}

override suspend fun <T : Any?> sideEffect(
serde: Serde<T>,
sideEffectAction: suspend () -> T
): T {
override suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteString>>
->
Expand Down
33 changes: 16 additions & 17 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import kotlin.time.Duration

/**
* This interface exposes the Restate functionalities to Restate services. It can be used to
* interact with other Restate services, record side effects, execute timers and synchronize with
* external systems.
* interact with other Restate services, record non-deterministic closures, execute timers and
* synchronize with external systems.
*
* To use it within your Restate service, implement [RestateKtComponent] and get an instance with
* [RestateKtComponent.restateContext].
Expand Down Expand Up @@ -114,9 +114,9 @@ sealed interface Context {
* Errors occurring within this closure won't be propagated to the caller, unless they are
* [TerminalException]. Consider the following code:
* ```
* // Bad usage of try-catch outside the side effect
* // Bad usage of try-catch outside the run
* try {
* ctx.sideEffect {
* ctx.run {
* throw IllegalStateException();
* };
* } catch (e: IllegalStateException) {
Expand All @@ -125,29 +125,28 @@ sealed interface Context {
* // following the invocation retry policy.
* }
*
* // Good usage of try-catch outside the side effect
* // Good usage of try-catch outside the run
* try {
* ctx.sideEffect {
* ctx.run {
* throw TerminalException("my error");
* };
* } catch (e: TerminalException) {
* // This is invoked
* }
* ```
*
* To propagate side effects failures to the side effect call-site, make sure to wrap them in
* [TerminalException].
* To propagate failures to the run call-site, make sure to wrap them in [TerminalException].
*
* @param serde the type tag of the return value, used to serialize/deserialize it.
* @param action to execute for its side effects.
* @param action closure to execute.
* @param T type of the return value.
* @return value of the side effect operation.
* @return value of the run operation.
*/
suspend fun <T : Any?> sideEffect(serde: Serde<T>, sideEffectAction: suspend () -> T): T
suspend fun <T : Any?> run(serde: Serde<T>, sideEffectAction: suspend () -> T): T

/** Like [sideEffect] without a return value. */
suspend fun sideEffect(sideEffectAction: suspend () -> Unit) {
sideEffect(KtSerdes.UNIT, sideEffectAction)
/** Like [run] without a return value. */
suspend fun run(sideEffectAction: suspend () -> Unit) {
run(KtSerdes.UNIT, sideEffectAction)
}

/**
Expand Down Expand Up @@ -177,9 +176,9 @@ sealed interface Context {
*
* This instance is useful to generate identifiers, idempotency keys, and for uniform sampling
* from a set of options. If a cryptographically secure value is needed, please generate that
* externally using [sideEffect].
* externally using [run].
*
* You MUST NOT use this [Random] instance inside a [sideEffect].
* You MUST NOT use this [Random] instance inside a [run].
*
* @return the [Random] instance.
*/
Expand Down Expand Up @@ -234,7 +233,7 @@ class RestateRandom(seed: Long, private val syscalls: Syscalls) : Random() {
private val r = Random(seed)

override fun nextBits(bitCount: Int): Int {
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside a side effect!" }
check(!syscalls.isInsideSideEffect) { "You can't use RestateRandom inside ctx.run!" }
return r.nextBits(bitCount)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RandomTest : RandomTestSuite() {

override fun randomInsideSideEffect(): TestInvocationBuilder =
testDefinitionForService<Unit, Int>("RandomInsideSideEffect") { ctx, _: Unit ->
ctx.sideEffect { ctx.random().nextInt() }
ctx.run { ctx.random().nextInt() }
throw IllegalStateException("This should not unreachable")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ class SideEffectTest : SideEffectTestSuite() {

override fun sideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("SideEffect") { ctx, _: Unit ->
val result = ctx.sideEffect(CoreSerdes.JSON_STRING) { sideEffectOutput }
val result = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
"Hello $result"
}

override fun consecutiveSideEffect(sideEffectOutput: String): TestInvocationBuilder =
testDefinitionForService("ConsecutiveSideEffect") { ctx, _: Unit ->
val firstResult = ctx.sideEffect(CoreSerdes.JSON_STRING) { sideEffectOutput }
val firstResult = ctx.run(CoreSerdes.JSON_STRING) { sideEffectOutput }
val secondResult =
ctx.sideEffect(CoreSerdes.JSON_STRING) { firstResult.uppercase(Locale.getDefault()) }
ctx.run(CoreSerdes.JSON_STRING) { firstResult.uppercase(Locale.getDefault()) }
"Hello $secondResult"
}

Expand All @@ -43,9 +43,7 @@ class SideEffectTest : SideEffectTestSuite() {
Dispatchers.Unconfined + CoroutineName("CheckContextSwitchingTestCoroutine"))) {
handler("run") { ctx, _: Unit ->
val sideEffectCoroutine =
ctx.sideEffect(CoreSerdes.JSON_STRING) {
coroutineContext[CoroutineName]!!.name
}
ctx.run(CoreSerdes.JSON_STRING) { coroutineContext[CoroutineName]!!.name }
check(sideEffectCoroutine == "CheckContextSwitchingTestCoroutine") {
"Side effect thread is not running within the same coroutine context of the handler method: $sideEffectCoroutine"
}
Expand All @@ -56,7 +54,7 @@ class SideEffectTest : SideEffectTestSuite() {

override fun sideEffectGuard(): TestInvocationBuilder =
testDefinitionForService<Unit, String>("SideEffectGuard") { ctx, _: Unit ->
ctx.sideEffect { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
ctx.run { ctx.send(GREETER_SERVICE_TARGET, KtSerdes.json(), "something") }
throw IllegalStateException("This point should not be reached")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class StateMachineFailuresTest : StateMachineFailuresTestSuite() {

override fun sideEffectFailure(serde: Serde<Int>): TestInvocationBuilder =
testDefinitionForService("SideEffectFailure") { ctx, _: Unit ->
ctx.sideEffect(serde) { 0 }
ctx.run(serde) { 0 }
"Francesco"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class UserFailuresTest : UserFailuresTestSuite() {
): TestInvocationBuilder =
testDefinitionForService<Unit, Unit>("SideEffectThrowIllegalStateException") { ctx, _: Unit ->
try {
ctx.sideEffect { throw IllegalStateException("Whatever") }
ctx.run { throw IllegalStateException("Whatever") }
} catch (e: Throwable) {
if (e !is CancellationException && e !is TerminalException) {
nonTerminalExceptionsSeen.addAndGet(1)
Expand All @@ -44,7 +44,7 @@ class UserFailuresTest : UserFailuresTestSuite() {

override fun sideEffectThrowTerminalException(code: Int, message: String): TestInvocationBuilder =
testDefinitionForService<Unit, Unit>("SideEffectThrowTerminalException") { ctx, _: Unit ->
ctx.sideEffect { throw TerminalException(code, message) }
ctx.run { throw TerminalException(code, message) }
throw IllegalStateException("Not expected to reach this point")
}
}
28 changes: 14 additions & 14 deletions sdk-api/src/main/java/dev/restate/sdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

/**
* This interface exposes the Restate functionalities to Restate services. It can be used to
* interact with other Restate services, record side effects, execute timers and synchronize with
* external systems.
* interact with other Restate services, record non-deterministic closures, execute timers and
* synchronize with external systems.
*
* <p>All methods of this interface, and related interfaces, throws either {@link TerminalException}
* or {@link AbortedExecutionException}, where the former can be caught and acted upon, while the
Expand Down Expand Up @@ -109,9 +109,9 @@ default void sleep(Duration duration) {
* TerminalException}. Consider the following code:
*
* <pre>{@code
* // Bad usage of try-catch outside the side effect
* // Bad usage of try-catch outside the run
* try {
* ctx.sideEffect(() -> {
* ctx.run(() -> {
* throw new IllegalStateException();
* });
* } catch (IllegalStateException e) {
Expand All @@ -120,29 +120,29 @@ default void sleep(Duration duration) {
* // following the invocation retry policy.
* }
*
* // Good usage of try-catch outside the side effect
* // Good usage of try-catch outside the run
* try {
* ctx.sideEffect(() -> {
* ctx.run(() -> {
* throw new TerminalException("my error");
* });
* } catch (TerminalException e) {
* // This is invoked
* }
* }</pre>
*
* To propagate side effects failures to the side effect call-site, make sure to wrap them in
* {@link TerminalException}.
* To propagate run failures to the call-site, make sure to wrap them in {@link
* TerminalException}.
*
* @param serde the type tag of the return value, used to serialize/deserialize it.
* @param action to execute for its side effects.
* @param action closure to execute.
* @param <T> type of the return value.
* @return value of the side effect operation.
* @return value of the run operation.
*/
<T> T sideEffect(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;
<T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;

/** Like {@link #sideEffect(Serde, ThrowingSupplier)}, but without returning a value. */
default void sideEffect(ThrowingRunnable runnable) throws TerminalException {
sideEffect(
/** Like {@link #run(Serde, ThrowingSupplier)}, but without returning a value. */
default void run(ThrowingRunnable runnable) throws TerminalException {
run(
CoreSerdes.VOID,
() -> {
runnable.run();
Expand Down
2 changes: 1 addition & 1 deletion sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration d
}

@Override
public <T> T sideEffect(Serde<T> serde, ThrowingSupplier<T> action) {
public <T> T run(Serde<T> serde, ThrowingSupplier<T> action) {
CompletableFuture<CompletableFuture<ByteString>> enterFut = new CompletableFuture<>();
syscalls.enterSideEffectBlock(
new EnterSideEffectSyscallCallback() {
Expand Down
7 changes: 3 additions & 4 deletions sdk-api/src/main/java/dev/restate/sdk/RestateRandom.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
*
* <p>This instance is useful to generate identifiers, idempotency keys, and for uniform sampling
* from a set of options. If a cryptographically secure value is needed, please generate that
* externally using {@link ObjectContext#sideEffect(Serde, ThrowingSupplier)}.
* externally using {@link ObjectContext#run(Serde, ThrowingSupplier)}.
*
* <p>You MUST NOT use this object inside a {@link ObjectContext#sideEffect(Serde,
* ThrowingSupplier)}.
* <p>You MUST NOT use this object inside a {@link ObjectContext#run(Serde, ThrowingSupplier)}.
*/
public class RestateRandom extends Random {

Expand Down Expand Up @@ -58,7 +57,7 @@ public UUID nextUUID() {
@Override
protected int next(int bits) {
if (this.syscalls.isInsideSideEffect()) {
throw new IllegalStateException("You can't use RestateRandom inside a side effect!");
throw new IllegalStateException("You can't use RestateRandom inside ctx.run!");
}

return super.next(bits);
Expand Down
2 changes: 1 addition & 1 deletion sdk-api/src/test/java/dev/restate/sdk/RandomTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected TestInvocationBuilder randomInsideSideEffect() {
CoreSerdes.VOID,
CoreSerdes.JSON_INT,
(ctx, unused) -> {
ctx.sideEffect(() -> ctx.random().nextInt());
ctx.run(() -> ctx.random().nextInt());
throw new IllegalStateException("This should not unreachable");
});
}
Expand Down
10 changes: 5 additions & 5 deletions sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ protected TestInvocationBuilder sideEffect(String sideEffectOutput) {
CoreSerdes.VOID,
CoreSerdes.JSON_STRING,
(ctx, unused) -> {
String result = ctx.sideEffect(CoreSerdes.JSON_STRING, () -> sideEffectOutput);
String result = ctx.run(CoreSerdes.JSON_STRING, () -> sideEffectOutput);
return "Hello " + result;
});
}
Expand All @@ -35,8 +35,8 @@ protected TestInvocationBuilder consecutiveSideEffect(String sideEffectOutput) {
CoreSerdes.VOID,
CoreSerdes.JSON_STRING,
(ctx, unused) -> {
String firstResult = ctx.sideEffect(CoreSerdes.JSON_STRING, () -> sideEffectOutput);
String secondResult = ctx.sideEffect(CoreSerdes.JSON_STRING, firstResult::toUpperCase);
String firstResult = ctx.run(CoreSerdes.JSON_STRING, () -> sideEffectOutput);
String secondResult = ctx.run(CoreSerdes.JSON_STRING, firstResult::toUpperCase);

return "Hello " + secondResult;
});
Expand All @@ -51,7 +51,7 @@ protected TestInvocationBuilder checkContextSwitching() {
String currentThread = Thread.currentThread().getName();

String sideEffectThread =
ctx.sideEffect(CoreSerdes.JSON_STRING, () -> Thread.currentThread().getName());
ctx.run(CoreSerdes.JSON_STRING, () -> Thread.currentThread().getName());

if (!Objects.equals(currentThread, sideEffectThread)) {
throw new IllegalStateException(
Expand All @@ -71,7 +71,7 @@ protected TestInvocationBuilder sideEffectGuard() {
CoreSerdes.VOID,
CoreSerdes.JSON_STRING,
(ctx, unused) -> {
ctx.sideEffect(() -> ctx.send(GREETER_SERVICE_TARGET, new byte[] {}));
ctx.run(() -> ctx.send(GREETER_SERVICE_TARGET, new byte[] {}));
throw new IllegalStateException("This point should not be reached");
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected TestInvocationBuilder sideEffectFailure(Serde<Integer> serde) {
CoreSerdes.VOID,
CoreSerdes.JSON_STRING,
(ctx, unused) -> {
ctx.sideEffect(serde, () -> 0);
ctx.run(serde, () -> 0);
return "Francesco";
});
}
Expand Down
4 changes: 2 additions & 2 deletions sdk-api/src/test/java/dev/restate/sdk/UserFailuresTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected TestInvocationBuilder sideEffectThrowIllegalStateException(
CoreSerdes.VOID,
(ctx, unused) -> {
try {
ctx.sideEffect(
ctx.run(
() -> {
throw new IllegalStateException("Whatever");
});
Expand Down Expand Up @@ -72,7 +72,7 @@ protected TestInvocationBuilder sideEffectThrowTerminalException(int code, Strin
CoreSerdes.VOID,
CoreSerdes.VOID,
(ctx, unused) -> {
ctx.sideEffect(
ctx.run(
() -> {
throw new TerminalException(code, message);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public Stream<TestDefinition> definitions() {
.withInput(startMessage(1).setDebugId(debugId), ProtoUtils.inputMessage())
.assertingOutput(
containsOnlyExactErrorMessage(
new IllegalStateException(
"You can't use RestateRandom inside a side effect!"))));
new IllegalStateException("You can't use RestateRandom inside ctx.run!"))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
LOG.info("I am on the thread I am before executing side effect")
check(Vertx.currentContext() == null)
check(coroutineContext[CoroutineName] == nonBlockingCoroutineName)
ctx.sideEffect {
ctx.run {
LOG.info("I am on the thread I am when executing side effect")
check(coroutineContext[CoroutineName] == nonBlockingCoroutineName)
check(Vertx.currentContext() == null)
Expand All @@ -52,7 +52,7 @@ class VertxExecutorsTest : TestDefinitions.TestSuite {
): Void? {
val id = Thread.currentThread().id
check(Vertx.currentContext() == null)
ctx.sideEffect {
ctx.run {
check(Thread.currentThread().id == id)
check(Vertx.currentContext() == null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration d
}

@Override
public <T> T sideEffect(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException {
return ctx.sideEffect(serde, action);
public <T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException {
return ctx.run(serde, action);
}

@Override
public void sideEffect(ThrowingRunnable runnable) throws TerminalException {
ctx.sideEffect(runnable);
public void run(ThrowingRunnable runnable) throws TerminalException {
ctx.run(runnable);
}

@Override
Expand Down

0 comments on commit a2330bc

Please sign in to comment.