Skip to content

Commit

Permalink
Implement Promises for Workflows (#1515)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored May 20, 2024
1 parent dbf2f3e commit bdf0106
Show file tree
Hide file tree
Showing 25 changed files with 1,200 additions and 45 deletions.
9 changes: 9 additions & 0 deletions crates/invoker-api/src/entry_enricher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ pub mod mocks {
EnrichedEntryHeader::GetStateKeys { is_completed }
}
PlainEntryHeader::ClearAllState {} => EnrichedEntryHeader::ClearAllState {},
PlainEntryHeader::GetPromise { is_completed } => {
EnrichedEntryHeader::GetPromise { is_completed }
}
PlainEntryHeader::PeekPromise { is_completed } => {
EnrichedEntryHeader::PeekPromise { is_completed }
}
PlainEntryHeader::CompletePromise { is_completed } => {
EnrichedEntryHeader::CompletePromise { is_completed }
}
PlainEntryHeader::Sleep { is_completed } => {
EnrichedEntryHeader::Sleep { is_completed }
}
Expand Down
7 changes: 7 additions & 0 deletions crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum KeyKind {
ServiceStatus,
State,
Timers,
Promise,
}

impl KeyKind {
Expand Down Expand Up @@ -73,6 +74,7 @@ impl KeyKind {
KeyKind::ServiceStatus => b"ss",
KeyKind::State => b"st",
KeyKind::Timers => b"ti",
KeyKind::Promise => b"pr",
}
}

Expand All @@ -96,6 +98,7 @@ impl KeyKind {
b"ss" => Some(KeyKind::ServiceStatus),
b"st" => Some(KeyKind::State),
b"ti" => Some(KeyKind::Timers),
b"pr" => Some(KeyKind::Promise),
_ => None,
}
}
Expand Down Expand Up @@ -244,6 +247,10 @@ macro_rules! define_table_key {
pub fn into_inner(self) -> ($(Option<$ty>,)+) {
return ( $(self.$element,)+ )
}

pub fn into_inner_ok_or(self) -> crate::Result<($($ty,)+)> {
return crate::Result::Ok(( $(self.$element.ok_or_else(|| restate_storage_api::StorageError::DataIntegrityError)?,)+ ))
}
}

// serde
Expand Down
1 change: 1 addition & 0 deletions crates/partition-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod outbox_table;
mod owned_iter;
mod partition_store;
mod partition_store_manager;
pub mod promise_table;
pub mod scan;
pub mod service_status_table;
pub mod state_table;
Expand Down
2 changes: 2 additions & 0 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub enum TableKind {
Idempotency,
Inbox,
Journal,
Promise,
}

impl TableKind {
Expand All @@ -96,6 +97,7 @@ impl TableKind {
Self::PartitionStateMachine => &[KeyKind::Fsm],
Self::Timers => &[KeyKind::Timers],
Self::Journal => &[KeyKind::Journal],
Self::Promise => &[KeyKind::Promise],
}
}

Expand Down
148 changes: 148 additions & 0 deletions crates/partition-store/src/promise_table/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::keys::{define_table_key, KeyKind, TableKey};
use crate::owned_iter::OwnedIterator;
use crate::scan::TableScan;
use crate::{PartitionStore, TableKind, TableScanIterationDecision};
use crate::{RocksDBTransaction, StorageAccess};
use anyhow::anyhow;
use bytes::Bytes;
use bytestring::ByteString;
use futures::Stream;
use futures_util::stream;
use restate_storage_api::promise_table::{
OwnedPromiseRow, Promise, PromiseTable, ReadOnlyPromiseTable,
};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{PartitionKey, ServiceId, WithPartitionKey};
use restate_types::storage::StorageCodec;
use std::ops::RangeInclusive;

define_table_key!(
TableKind::Promise,
KeyKind::Promise,
PromiseKey(
partition_key: PartitionKey,
service_name: ByteString,
service_key: Bytes,
key: ByteString
)
);

fn create_key(service_id: &ServiceId, key: &ByteString) -> PromiseKey {
PromiseKey::default()
.partition_key(service_id.partition_key())
.service_name(service_id.service_name.clone())
.service_key(service_id.key.as_bytes().clone())
.key(key.clone())
}

fn get_promise<S: StorageAccess>(
storage: &mut S,
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<Promise>> {
storage.get_value(create_key(service_id, key))
}

fn all_promise<S: StorageAccess>(
storage: &mut S,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<OwnedPromiseRow>> + Send + '_ {
let iter = storage.iterator_from(TableScan::FullScanPartitionKeyRange::<PromiseKey>(range));
stream::iter(OwnedIterator::new(iter).map(|(mut k, mut v)| {
let key = PromiseKey::deserialize_from(&mut k)?;
let metadata = StorageCodec::decode::<Promise, _>(&mut v)
.map_err(|err| StorageError::Generic(err.into()))?;

let (partition_key, service_name, service_key, promise_key) = key.into_inner_ok_or()?;

Ok(OwnedPromiseRow {
service_id: ServiceId::with_partition_key(
partition_key,
service_name,
ByteString::try_from(service_key)
.map_err(|e| anyhow!("Cannot convert to string {e}"))?,
),
key: promise_key,
metadata,
})
}))
}

fn put_promise<S: StorageAccess>(
storage: &mut S,
service_id: &ServiceId,
key: &ByteString,
metadata: Promise,
) {
storage.put_kv(create_key(service_id, key), metadata);
}

fn delete_all_promises<S: StorageAccess>(storage: &mut S, service_id: &ServiceId) {
let prefix_key = PromiseKey::default()
.partition_key(service_id.partition_key())
.service_name(service_id.service_name.clone())
.service_key(service_id.key.as_bytes().clone());

let keys = storage.for_each_key_value_in_place(
TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), prefix_key),
|k, _| TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))),
);

for k in keys {
storage.delete_cf(TableKind::Promise, &k.unwrap());
}
}

impl ReadOnlyPromiseTable for PartitionStore {
async fn get_promise(
&mut self,
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<Promise>> {
get_promise(self, service_id, key)
}

fn all_promises(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<OwnedPromiseRow>> + Send {
all_promise(self, range)
}
}

impl<'a> ReadOnlyPromiseTable for RocksDBTransaction<'a> {
async fn get_promise(
&mut self,
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<Promise>> {
get_promise(self, service_id, key)
}

fn all_promises(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<OwnedPromiseRow>> + Send {
all_promise(self, range)
}
}

impl<'a> PromiseTable for RocksDBTransaction<'a> {
async fn put_promise(&mut self, service_id: &ServiceId, key: &ByteString, metadata: Promise) {
put_promise(self, service_id, key, metadata)
}

async fn delete_all_promises(&mut self, service_id: &ServiceId) {
delete_all_promises(self, service_id)
}
}
1 change: 1 addition & 0 deletions crates/partition-store/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod inbox_table_test;
mod invocation_status_table_test;
mod journal_table_test;
mod outbox_table_test;
mod promise_table_test;
mod state_table_test;
mod timer_table_test;
mod virtual_object_status_table_test;
Expand Down
125 changes: 125 additions & 0 deletions crates/partition-store/tests/promise_table_test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// Unfortunately we need this because of https://github.com/rust-lang/rust-clippy/issues/9801
#![allow(clippy::borrow_interior_mutable_const)]
#![allow(clippy::declare_interior_mutable_const)]

use crate::storage_test_environment;
use bytes::Bytes;
use bytestring::ByteString;
use restate_storage_api::promise_table::{
Promise, PromiseState, PromiseTable, ReadOnlyPromiseTable,
};
use restate_storage_api::Transaction;
use restate_types::identifiers::{InvocationId, InvocationUuid, JournalEntryId, ServiceId};
use restate_types::journal::EntryResult;

const SERVICE_ID_1: ServiceId = ServiceId::from_static(10, "MySvc", "a");
const SERVICE_ID_2: ServiceId = ServiceId::from_static(11, "MySvc", "b");

const PROMISE_KEY_1: ByteString = ByteString::from_static("prom1");
const PROMISE_KEY_2: ByteString = ByteString::from_static("prom2");
const PROMISE_KEY_3: ByteString = ByteString::from_static("prom3");

const PROMISE_COMPLETED: Promise = Promise {
state: PromiseState::Completed(EntryResult::Success(Bytes::from_static(b"{}"))),
};

#[tokio::test]
async fn test_promise_table() {
let mut rocksdb = storage_test_environment().await;

let promise_not_completed = Promise {
state: PromiseState::NotCompleted(vec![
JournalEntryId::from_parts(
InvocationId::from_parts(
10,
InvocationUuid::from_parts(1706027034946, 12345678900001),
),
1,
),
JournalEntryId::from_parts(
InvocationId::from_parts(
11,
InvocationUuid::from_parts(1706027034946, 12345678900021),
),
2,
),
]),
};

// Fill in some data
let mut txn = rocksdb.transaction();
txn.put_promise(&SERVICE_ID_1, &PROMISE_KEY_1, PROMISE_COMPLETED)
.await;
txn.put_promise(&SERVICE_ID_1, &PROMISE_KEY_2, promise_not_completed.clone())
.await;
txn.put_promise(&SERVICE_ID_2, &PROMISE_KEY_3, PROMISE_COMPLETED)
.await;
txn.commit().await.unwrap();

// Query
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_1,)
.await
.unwrap(),
Some(PROMISE_COMPLETED)
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_2,)
.await
.unwrap(),
Some(promise_not_completed)
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_2, &PROMISE_KEY_3,)
.await
.unwrap(),
Some(PROMISE_COMPLETED)
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_3,)
.await
.unwrap(),
None
);

// Delete and query afterwards
let mut txn = rocksdb.transaction();
txn.delete_all_promises(&SERVICE_ID_1).await;
txn.commit().await.unwrap();

assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_1,)
.await
.unwrap(),
None
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_2,)
.await
.unwrap(),
None
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_2, &PROMISE_KEY_3,)
.await
.unwrap(),
Some(PROMISE_COMPLETED)
);
}
13 changes: 9 additions & 4 deletions crates/service-protocol/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ impl RawEntryCodec for ProtobufRawEntryCodec {
ClearState,
ClearAllState,
GetStateKeys,
GetPromise,
PeekPromise,
CompletePromise,
Sleep,
Call,
OneWayCall,
Expand Down Expand Up @@ -172,7 +175,7 @@ mod mocks {
};
use restate_types::journal::{
AwakeableEntry, CompletableEntry, CompleteAwakeableEntry, EntryResult, GetStateKeysEntry,
GetStateKeysResult, GetStateResult, InputEntry, OutputEntry,
GetStateKeysResult, InputEntry, OutputEntry,
};
use restate_types::service_protocol::{
awakeable_entry_message, call_entry_message, complete_awakeable_entry_message,
Expand Down Expand Up @@ -219,11 +222,13 @@ mod mocks {
GetStateEntryMessage {
key: entry.key,
result: entry.value.map(|value| match value {
GetStateResult::Empty => {
CompletionResult::Empty => {
get_state_entry_message::Result::Empty(service_protocol::Empty {})
}
GetStateResult::Result(v) => get_state_entry_message::Result::Value(v),
GetStateResult::Failure(code, reason) => {
CompletionResult::Success(v) => {
get_state_entry_message::Result::Value(v)
}
CompletionResult::Failure(code, reason) => {
get_state_entry_message::Result::Failure(Failure {
code: code.into(),
message: reason.to_string(),
Expand Down
Loading

0 comments on commit bdf0106

Please sign in to comment.