Skip to content

Commit

Permalink
Add promise table
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 14, 2024
1 parent fc6f69b commit 212f115
Show file tree
Hide file tree
Showing 11 changed files with 580 additions and 10 deletions.
3 changes: 3 additions & 0 deletions crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub enum KeyKind {
ServiceStatus,
State,
Timers,
Promise,
}

impl KeyKind {
Expand Down Expand Up @@ -73,6 +74,7 @@ impl KeyKind {
KeyKind::ServiceStatus => b"ss",
KeyKind::State => b"st",
KeyKind::Timers => b"ti",
KeyKind::Promise => b"pr",
}
}

Expand All @@ -96,6 +98,7 @@ impl KeyKind {
b"ss" => Some(KeyKind::ServiceStatus),
b"st" => Some(KeyKind::State),
b"ti" => Some(KeyKind::Timers),
b"pr" => Some(KeyKind::Promise),
_ => None,
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/partition-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod outbox_table;
mod owned_iter;
mod partition_store;
mod partition_store_manager;
pub mod promise_table;
pub mod scan;
pub mod service_status_table;
pub mod state_table;
Expand Down
2 changes: 2 additions & 0 deletions crates/partition-store/src/partition_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub enum TableKind {
Idempotency,
Inbox,
Journal,
Promise,
}

impl TableKind {
Expand All @@ -96,6 +97,7 @@ impl TableKind {
Self::PartitionStateMachine => &[KeyKind::Fsm],
Self::Timers => &[KeyKind::Timers],
Self::Journal => &[KeyKind::Journal],
Self::Promise => &[KeyKind::Promise],
}
}

Expand Down
151 changes: 151 additions & 0 deletions crates/partition-store/src/promise_table/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::keys::{define_table_key, KeyKind, TableKey};
use crate::owned_iter::OwnedIterator;
use crate::scan::TableScan;
use crate::{PartitionStore, TableKind, TableScanIterationDecision};
use crate::{RocksDBTransaction, StorageAccess};
use anyhow::anyhow;
use bytes::Bytes;
use bytestring::ByteString;
use futures::Stream;
use futures_util::stream;
use restate_storage_api::promise_table::{
OwnedPromiseRow, PromiseMetadata, PromiseTable, ReadOnlyPromiseTable,
};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{PartitionKey, ServiceId, WithPartitionKey};
use restate_types::storage::StorageCodec;
use std::ops::RangeInclusive;

define_table_key!(
TableKind::Promise,
KeyKind::Promise,
PromiseKey(
partition_key: PartitionKey,
service_name: ByteString,
service_key: Bytes,
key: ByteString
)
);

fn create_key(service_id: &ServiceId, key: &ByteString) -> PromiseKey {
PromiseKey::default()
.partition_key(service_id.partition_key())
.service_name(service_id.service_name.clone())
.service_key(service_id.key.as_bytes().clone())
.key(key.clone())
}

fn get_promise_metadata<S: StorageAccess>(
storage: &mut S,
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<PromiseMetadata>> {
storage.get_value(create_key(service_id, key))
}

fn all_promise_metadata<S: StorageAccess>(
storage: &mut S,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<OwnedPromiseRow>> + Send + '_ {
let iter = storage.iterator_from(TableScan::FullScanPartitionKeyRange::<PromiseKey>(range));
stream::iter(OwnedIterator::new(iter).map(|(mut k, mut v)| {
let key = PromiseKey::deserialize_from(&mut k)?;
let metadata = StorageCodec::decode::<PromiseMetadata, _>(&mut v)
.map_err(|err| StorageError::Generic(err.into()))?;

Ok(OwnedPromiseRow {
service_id: ServiceId::with_partition_key(
*key.partition_key_ok_or()?,
key.service_name_ok_or()?.clone(),
ByteString::try_from(key.service_key_ok_or()?.clone())
.map_err(|e| anyhow!("Cannot convert to string {e}"))?,
),
key: key.key_ok_or()?.clone(),
metadata,
})
}))
}

fn put_promise_metadata<S: StorageAccess>(
storage: &mut S,
service_id: &ServiceId,
key: &ByteString,
metadata: PromiseMetadata,
) {
storage.put_kv(create_key(service_id, key), metadata);
}

fn delete_all_promises_metadata<S: StorageAccess>(storage: &mut S, service_id: &ServiceId) {
let prefix_key = PromiseKey::default()
.partition_key(service_id.partition_key())
.service_name(service_id.service_name.clone())
.service_key(service_id.key.as_bytes().clone());

let keys = storage.for_each_key_value_in_place(
TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), prefix_key),
|k, _| TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))),
);

for k in keys {
storage.delete_cf(TableKind::Promise, &k.unwrap());
}
}

impl ReadOnlyPromiseTable for PartitionStore {
async fn get_promise(
&mut self,
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<PromiseMetadata>> {
get_promise_metadata(self, service_id, key)
}

fn all_promises(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<OwnedPromiseRow>> + Send {
all_promise_metadata(self, range)
}
}

impl<'a> ReadOnlyPromiseTable for RocksDBTransaction<'a> {
async fn get_promise(
&mut self,
service_id: &ServiceId,
key: &ByteString,
) -> Result<Option<PromiseMetadata>> {
get_promise_metadata(self, service_id, key)
}

fn all_promises(
&mut self,
range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = Result<OwnedPromiseRow>> + Send {
all_promise_metadata(self, range)
}
}

impl<'a> PromiseTable for RocksDBTransaction<'a> {
async fn put_promise(
&mut self,
service_id: &ServiceId,
key: &ByteString,
metadata: PromiseMetadata,
) {
put_promise_metadata(self, service_id, key, metadata)
}

async fn delete_all_promises(&mut self, service_id: &ServiceId) {
delete_all_promises_metadata(self, service_id)
}
}
1 change: 1 addition & 0 deletions crates/partition-store/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod inbox_table_test;
mod invocation_status_table_test;
mod journal_table_test;
mod outbox_table_test;
mod promise_table_test;
mod state_table_test;
mod timer_table_test;
mod virtual_object_status_table_test;
Expand Down
129 changes: 129 additions & 0 deletions crates/partition-store/tests/promise_table_test/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// Unfortunately we need this because of https://github.com/rust-lang/rust-clippy/issues/9801
#![allow(clippy::borrow_interior_mutable_const)]
#![allow(clippy::declare_interior_mutable_const)]

use crate::storage_test_environment;
use bytes::Bytes;
use bytestring::ByteString;
use restate_storage_api::promise_table::{
PromiseMetadata, PromiseState, PromiseTable, ReadOnlyPromiseTable,
};
use restate_storage_api::Transaction;
use restate_types::identifiers::{InvocationId, InvocationUuid, JournalEntryId, ServiceId};
use restate_types::journal::EntryResult;

const SERVICE_ID_1: ServiceId = ServiceId::from_static(10, "MySvc", "a");
const SERVICE_ID_2: ServiceId = ServiceId::from_static(11, "MySvc", "b");

const PROMISE_KEY_1: ByteString = ByteString::from_static("prom1");
const PROMISE_KEY_2: ByteString = ByteString::from_static("prom2");
const PROMISE_KEY_3: ByteString = ByteString::from_static("prom3");

const PROMISE_METADATA_COMPLETED: PromiseMetadata = PromiseMetadata {
state: PromiseState::Completed(EntryResult::Success(Bytes::from_static(b"{}"))),
};

#[tokio::test]
async fn test_promise_table() {
let mut rocksdb = storage_test_environment().await;

let promise_metadata_not_completed = PromiseMetadata {
state: PromiseState::NotCompleted(vec![
JournalEntryId::from_parts(
InvocationId::from_parts(
10,
InvocationUuid::from_parts(1706027034946, 12345678900001),
),
1,
),
JournalEntryId::from_parts(
InvocationId::from_parts(
11,
InvocationUuid::from_parts(1706027034946, 12345678900021),
),
2,
),
]),
};

// Fill in some data
let mut txn = rocksdb.transaction();
txn.put_promise(&SERVICE_ID_1, &PROMISE_KEY_1, PROMISE_METADATA_COMPLETED)
.await;
txn.put_promise(
&SERVICE_ID_1,
&PROMISE_KEY_2,
promise_metadata_not_completed.clone(),
)
.await;
txn.put_promise(&SERVICE_ID_2, &PROMISE_KEY_3, PROMISE_METADATA_COMPLETED)
.await;
txn.commit().await.unwrap();

// Query
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_1,)
.await
.unwrap(),
Some(PROMISE_METADATA_COMPLETED)
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_2,)
.await
.unwrap(),
Some(promise_metadata_not_completed)
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_2, &PROMISE_KEY_3,)
.await
.unwrap(),
Some(PROMISE_METADATA_COMPLETED)
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_3,)
.await
.unwrap(),
None
);

// Delete and query afterwards
let mut txn = rocksdb.transaction();
txn.delete_all_promises(&SERVICE_ID_1).await;
txn.commit().await.unwrap();

assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_1,)
.await
.unwrap(),
None
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_1, &PROMISE_KEY_2,)
.await
.unwrap(),
None
);
assert_eq!(
rocksdb
.get_promise(&SERVICE_ID_2, &PROMISE_KEY_3,)
.await
.unwrap(),
Some(PROMISE_METADATA_COMPLETED)
);
}
39 changes: 38 additions & 1 deletion crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ message SequenceNumber {
uint64 sequence_number = 1;
}

message JournalEntryId {
uint64 partition_key = 1;
bytes invocation_uuid = 2;
uint32 entry_index = 3;
}

message EntryResult {
message Failure {
uint32 error_code = 1;
bytes message = 2;
}

oneof result {
bytes value = 1;
Failure failure = 2;
}
}

// ---------------------------------------------------------------------
// Service Invocation
// ---------------------------------------------------------------------
Expand Down Expand Up @@ -500,4 +518,23 @@ message IdempotencyMetadata {
message IdempotentRequestMetadata {
string key = 1;
Duration retention = 2;
}
}

// ---------------------------------------------------------------------
// Promises
// ---------------------------------------------------------------------

message PromiseMetadata {
message CompletedState {
EntryResult result = 1;
}

message NotCompletedState {
repeated JournalEntryId listening_journal_entries = 1;
}

oneof state {
CompletedState completed_state = 1;
NotCompletedState not_completed_state = 2;
}
}
Loading

0 comments on commit 212f115

Please sign in to comment.