Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize state as JSON. #208

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class Counter extends CounterRestate.CounterRestateImplBase {

private static final Logger LOG = LogManager.getLogger(Counter.class);

private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.LONG);
private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.JSON_LONG);

@Override
public void reset(RestateContext ctx, CounterRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class VanillaGrpcCounter extends CounterGrpc.CounterImplBase implements R

private static final Logger LOG = LogManager.getLogger(VanillaGrpcCounter.class);

private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.LONG);
private static final StateKey<Long> TOTAL = StateKey.of("total", CoreSerdes.JSON_LONG);

@Override
public void reset(CounterRequest request, StreamObserver<Empty> responseObserver) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CounterKt : CounterGrpcKt.CounterCoroutineImplBase(Dispatchers.Unconfined)

private val LOG = LogManager.getLogger(CounterKt::class.java)

private val TOTAL = StateKey.of("total", CoreSerdes.LONG)
private val TOTAL = StateKey.of("total", CoreSerdes.JSON_LONG)

override suspend fun reset(request: CounterRequest): Empty {
restateContext().clear(TOTAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class AwakeableIdTest : AwakeableIdTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {

override suspend fun greet(request: GreetingRequest): GreetingResponse {
val id: String = restateContext().awakeable(CoreSerdes.STRING_UTF8).id
val id: String = restateContext().awakeable(CoreSerdes.JSON_STRING).id
return greetingResponse { message = id }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class DeferredTest : DeferredTestSuite() {
val a1 = ctx.callAsync(GreeterGrpcKt.greetMethod, greetingRequest { name = "Francesco" })
val a2 = ctx.callAsync(GreeterGrpcKt.greetMethod, greetingRequest { name = "Till" })
val a2Res = a2.await().getMessage()
ctx.set(StateKey.of("A2", CoreSerdes.STRING_UTF8), a2Res)
ctx.set(StateKey.of("A2", CoreSerdes.JSON_STRING), a2Res)
val a1Res = a1.await().getMessage()
return greetingResponse { message = "$a1Res-$a2Res" }
}
Expand Down Expand Up @@ -99,10 +99,10 @@ class DeferredTest : DeferredTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()
val a1 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a2 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a3 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a4 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a1 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a2 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a3 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a4 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a12 = Awaitable.any(a1, a2)
val a23 = Awaitable.any(a2, a3)
val a34 = Awaitable.any(a3, a4)
Expand All @@ -122,10 +122,10 @@ class DeferredTest : DeferredTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()
val a1 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a2 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a3 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a4 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a1 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a2 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a3 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a4 = ctx.awakeable(CoreSerdes.JSON_STRING)

return greetingResponse {
message = Awaitable.any(a1, Awaitable.all(a2, a3), a4).awaitIndex().toString()
Expand All @@ -141,8 +141,8 @@ class DeferredTest : DeferredTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()
val a1 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a2 = ctx.awakeable(CoreSerdes.STRING_UTF8)
val a1 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a2 = ctx.awakeable(CoreSerdes.JSON_STRING)
val a12 = Awaitable.all(a1, a2)
val a12and1 = Awaitable.all(a12, a1)
val a121and12 = Awaitable.all(a12and1, a12)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class EagerStateTest : EagerStateTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()
val stateIsEmpty = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)) == null
val stateIsEmpty = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)) == null
return greetingResponse { message = stateIsEmpty.toString() }
}
}
Expand All @@ -37,7 +37,7 @@ class EagerStateTest : EagerStateTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
return greetingResponse {
message = restateContext().get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))!!
message = restateContext().get(StateKey.of("STATE", CoreSerdes.JSON_STRING))!!
}
}
}
Expand All @@ -50,9 +50,9 @@ class EagerStateTest : EagerStateTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()
val oldState = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))!!
ctx.set(StateKey.of("STATE", CoreSerdes.STRING_UTF8), oldState + request.getName())
val newState = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))!!
val oldState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))!!
ctx.set(StateKey.of("STATE", CoreSerdes.JSON_STRING), oldState + request.getName())
val newState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))!!
return greetingResponse { message = newState }
}
}
Expand All @@ -65,9 +65,9 @@ class EagerStateTest : EagerStateTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()
val oldState = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))!!
ctx.clear(StateKey.of("STATE", CoreSerdes.STRING_UTF8))
assertThat(ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))).isNull()
val oldState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))!!
ctx.clear(StateKey.of("STATE", CoreSerdes.JSON_STRING))
assertThat(ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))).isNull()
return greetingResponse { message = oldState }
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class SideEffectTest : SideEffectTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx: RestateContext = restateContext()
val result = ctx.sideEffect(CoreSerdes.STRING_UTF8) { sideEffectOutput }
val result = ctx.sideEffect(CoreSerdes.JSON_STRING) { sideEffectOutput }
return greetingResponse { message = "Hello $result" }
}
}
Expand All @@ -34,9 +34,9 @@ class SideEffectTest : SideEffectTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx: RestateContext = restateContext()
val firstResult = ctx.sideEffect(CoreSerdes.STRING_UTF8) { sideEffectOutput }
val firstResult = ctx.sideEffect(CoreSerdes.JSON_STRING) { sideEffectOutput }
val secondResult =
ctx.sideEffect(CoreSerdes.STRING_UTF8) { firstResult.uppercase(Locale.getDefault()) }
ctx.sideEffect(CoreSerdes.JSON_STRING) { firstResult.uppercase(Locale.getDefault()) }
return greetingResponse { message = "Hello $secondResult" }
}
}
Expand All @@ -52,7 +52,7 @@ class SideEffectTest : SideEffectTestSuite() {

override suspend fun greet(request: GreetingRequest): GreetingResponse {
val sideEffectThread =
restateContext().sideEffect(CoreSerdes.STRING_UTF8) { Thread.currentThread().name }
restateContext().sideEffect(CoreSerdes.JSON_STRING) { Thread.currentThread().name }
check(sideEffectThread.contains("CheckContextSwitchingTestCoroutine")) {
"Side effect thread is not running within the same coroutine context of the handler method: $sideEffectThread"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class StateTest : StateTestSuite() {
GreeterGrpcKt.GreeterCoroutineImplBase(Dispatchers.Unconfined), RestateKtService {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val state: String =
restateContext().get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)) ?: "Unknown"
restateContext().get(StateKey.of("STATE", CoreSerdes.JSON_STRING)) ?: "Unknown"
return greetingResponse { message = "Hello $state" }
}
}
Expand All @@ -37,8 +37,8 @@ class StateTest : StateTestSuite() {
override suspend fun greet(request: GreetingRequest): GreetingResponse {
val ctx = restateContext()

val state = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))!!
ctx.set(StateKey.of("STATE", CoreSerdes.STRING_UTF8), request.getName())
val state = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))!!
ctx.set(StateKey.of("STATE", CoreSerdes.JSON_STRING), request.getName())

return greetingResponse { message = "Hello $state" }
}
Expand Down
2 changes: 1 addition & 1 deletion sdk-api/src/test/java/dev/restate/sdk/AwakeableIdTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ private static class ReturnAwakeableId extends GreeterGrpc.GreeterImplBase

@Override
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
String id = restateContext().awakeable(CoreSerdes.STRING_UTF8).id();
String id = restateContext().awakeable(CoreSerdes.JSON_STRING).id();
responseObserver.onNext(greetingResponse(id));
responseObserver.onCompleted();
}
Expand Down
22 changes: 11 additions & 11 deletions sdk-api/src/test/java/dev/restate/sdk/DeferredTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void greet(GreetingRequest request, StreamObserver<GreetingResponse> resp
ctx.call(GreeterGrpc.getGreetMethod(), greetingRequest("Till"));

String a2Res = a2.await().getMessage();
ctx.set(StateKey.of("A2", CoreSerdes.STRING_UTF8), a2Res);
ctx.set(StateKey.of("A2", CoreSerdes.JSON_STRING), a2Res);

String a1Res = a1.await().getMessage();

Expand Down Expand Up @@ -121,10 +121,10 @@ private static class CombineAnyWithAll extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

Awaitable<String> a1 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a2 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a3 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a4 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a1 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a2 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a3 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a4 = ctx.awakeable(CoreSerdes.JSON_STRING);

Awaitable<Object> a12 = Awaitable.any(a1, a2);
Awaitable<Object> a23 = Awaitable.any(a2, a3);
Expand All @@ -146,10 +146,10 @@ private static class AwaitAnyIndex extends GreeterGrpc.GreeterImplBase implement
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

Awaitable<String> a1 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a2 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a3 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a4 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a1 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a2 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a3 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a4 = ctx.awakeable(CoreSerdes.JSON_STRING);

responseObserver.onNext(
greetingResponse(
Expand All @@ -169,8 +169,8 @@ private static class AwaitOnAlreadyResolvedAwaitables extends GreeterGrpc.Greete
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

Awaitable<String> a1 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a2 = ctx.awakeable(CoreSerdes.STRING_UTF8);
Awaitable<String> a1 = ctx.awakeable(CoreSerdes.JSON_STRING);
Awaitable<String> a2 = ctx.awakeable(CoreSerdes.JSON_STRING);

Awaitable<Void> a12 = Awaitable.all(a1, a2);
Awaitable<Void> a12and1 = Awaitable.all(a12, a1);
Expand Down
16 changes: 8 additions & 8 deletions sdk-api/src/test/java/dev/restate/sdk/EagerStateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private static class GetEmpty extends GreeterGrpc.GreeterImplBase implements Res
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

boolean stateIsEmpty = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).isEmpty();
boolean stateIsEmpty = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).isEmpty();

responseObserver.onNext(
GreetingResponse.newBuilder().setMessage(String.valueOf(stateIsEmpty)).build());
Expand All @@ -44,7 +44,7 @@ private static class Get extends GreeterGrpc.GreeterImplBase implements RestateS
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

String state = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).get();
String state = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).get();

responseObserver.onNext(GreetingResponse.newBuilder().setMessage(state).build());
responseObserver.onCompleted();
Expand All @@ -62,10 +62,10 @@ private static class GetAppendAndGet extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

String oldState = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).get();
ctx.set(StateKey.of("STATE", CoreSerdes.STRING_UTF8), oldState + request.getName());
String oldState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).get();
ctx.set(StateKey.of("STATE", CoreSerdes.JSON_STRING), oldState + request.getName());

String newState = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).get();
String newState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).get();

responseObserver.onNext(GreetingResponse.newBuilder().setMessage(newState).build());
responseObserver.onCompleted();
Expand All @@ -83,10 +83,10 @@ private static class GetClearAndGet extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

String oldState = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).get();
String oldState = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).get();

ctx.clear(StateKey.of("STATE", CoreSerdes.STRING_UTF8));
assertThat(ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8))).isEmpty();
ctx.clear(StateKey.of("STATE", CoreSerdes.JSON_STRING));
assertThat(ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING))).isEmpty();

responseObserver.onNext(GreetingResponse.newBuilder().setMessage(oldState).build());
responseObserver.onCompleted();
Expand Down
8 changes: 4 additions & 4 deletions sdk-api/src/test/java/dev/restate/sdk/SideEffectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private static class SideEffect extends GreeterGrpc.GreeterImplBase implements R
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

String result = ctx.sideEffect(CoreSerdes.STRING_UTF8, () -> this.sideEffectOutput);
String result = ctx.sideEffect(CoreSerdes.JSON_STRING, () -> this.sideEffectOutput);

responseObserver.onNext(GreetingResponse.newBuilder().setMessage("Hello " + result).build());
responseObserver.onCompleted();
Expand All @@ -58,8 +58,8 @@ private static class ConsecutiveSideEffect extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

String firstResult = ctx.sideEffect(CoreSerdes.STRING_UTF8, () -> this.sideEffectOutput);
String secondResult = ctx.sideEffect(CoreSerdes.STRING_UTF8, firstResult::toUpperCase);
String firstResult = ctx.sideEffect(CoreSerdes.JSON_STRING, () -> this.sideEffectOutput);
String secondResult = ctx.sideEffect(CoreSerdes.JSON_STRING, firstResult::toUpperCase);

responseObserver.onNext(
GreetingResponse.newBuilder().setMessage("Hello " + secondResult).build());
Expand All @@ -81,7 +81,7 @@ public void greet(GreetingRequest request, StreamObserver<GreetingResponse> resp

String sideEffectThread =
restateContext()
.sideEffect(CoreSerdes.STRING_UTF8, () -> Thread.currentThread().getName());
.sideEffect(CoreSerdes.JSON_STRING, () -> Thread.currentThread().getName());

if (!Objects.equals(currentThread, sideEffectThread)) {
throw new IllegalStateException(
Expand Down
6 changes: 3 additions & 3 deletions sdk-api/src/test/java/dev/restate/sdk/StateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private static class GetState extends GreeterGrpc.GreeterImplBase implements Res
@Override
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
String state =
restateContext().get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).orElse("Unknown");
restateContext().get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).orElse("Unknown");

responseObserver.onNext(GreetingResponse.newBuilder().setMessage("Hello " + state).build());
responseObserver.onCompleted();
Expand All @@ -44,9 +44,9 @@ private static class GetAndSetState extends GreeterGrpc.GreeterImplBase
public void greet(GreetingRequest request, StreamObserver<GreetingResponse> responseObserver) {
RestateContext ctx = restateContext();

String state = ctx.get(StateKey.of("STATE", CoreSerdes.STRING_UTF8)).get();
String state = ctx.get(StateKey.of("STATE", CoreSerdes.JSON_STRING)).get();

ctx.set(StateKey.of("STATE", CoreSerdes.STRING_UTF8), request.getName());
ctx.set(StateKey.of("STATE", CoreSerdes.JSON_STRING), request.getName());

responseObserver.onNext(GreetingResponse.newBuilder().setMessage("Hello " + state).build());
responseObserver.onCompleted();
Expand Down
2 changes: 2 additions & 0 deletions sdk-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ dependencies {
api(coreLibs.protobuf.java)
api(coreLibs.grpc.api)

implementation(platform(jacksonLibs.jackson.bom))
implementation(jacksonLibs.jackson.core)
testImplementation(testingLibs.junit.jupiter)
testImplementation(testingLibs.assertj)
}
Loading
Loading