diff --git a/crates/invoker-api/src/entry_enricher.rs b/crates/invoker-api/src/entry_enricher.rs index 8c6d3da0fd..dcbe02dd7c 100644 --- a/crates/invoker-api/src/entry_enricher.rs +++ b/crates/invoker-api/src/entry_enricher.rs @@ -56,6 +56,15 @@ pub mod mocks { EnrichedEntryHeader::GetStateKeys { is_completed } } PlainEntryHeader::ClearAllState {} => EnrichedEntryHeader::ClearAllState {}, + PlainEntryHeader::GetPromise { is_completed } => { + EnrichedEntryHeader::GetPromise { is_completed } + } + PlainEntryHeader::PeekPromise { is_completed } => { + EnrichedEntryHeader::PeekPromise { is_completed } + } + PlainEntryHeader::CompletePromise { is_completed } => { + EnrichedEntryHeader::CompletePromise { is_completed } + } PlainEntryHeader::Sleep { is_completed } => { EnrichedEntryHeader::Sleep { is_completed } } diff --git a/crates/partition-store/src/keys.rs b/crates/partition-store/src/keys.rs index c42d48516e..d7712346f5 100644 --- a/crates/partition-store/src/keys.rs +++ b/crates/partition-store/src/keys.rs @@ -35,6 +35,7 @@ pub enum KeyKind { ServiceStatus, State, Timers, + Promise, } impl KeyKind { @@ -73,6 +74,7 @@ impl KeyKind { KeyKind::ServiceStatus => b"ss", KeyKind::State => b"st", KeyKind::Timers => b"ti", + KeyKind::Promise => b"pr", } } @@ -96,6 +98,7 @@ impl KeyKind { b"ss" => Some(KeyKind::ServiceStatus), b"st" => Some(KeyKind::State), b"ti" => Some(KeyKind::Timers), + b"pr" => Some(KeyKind::Promise), _ => None, } } @@ -244,6 +247,10 @@ macro_rules! define_table_key { pub fn into_inner(self) -> ($(Option<$ty>,)+) { return ( $(self.$element,)+ ) } + + pub fn into_inner_ok_or(self) -> crate::Result<($($ty,)+)> { + return crate::Result::Ok(( $(self.$element.ok_or_else(|| restate_storage_api::StorageError::DataIntegrityError)?,)+ )) + } } // serde diff --git a/crates/partition-store/src/lib.rs b/crates/partition-store/src/lib.rs index 12855693ad..01430f0a8d 100644 --- a/crates/partition-store/src/lib.rs +++ b/crates/partition-store/src/lib.rs @@ -19,6 +19,7 @@ pub mod outbox_table; mod owned_iter; mod partition_store; mod partition_store_manager; +pub mod promise_table; pub mod scan; pub mod service_status_table; pub mod state_table; diff --git a/crates/partition-store/src/partition_store.rs b/crates/partition-store/src/partition_store.rs index 03c7f3e1ab..000ac07e41 100644 --- a/crates/partition-store/src/partition_store.rs +++ b/crates/partition-store/src/partition_store.rs @@ -81,6 +81,7 @@ pub enum TableKind { Idempotency, Inbox, Journal, + Promise, } impl TableKind { @@ -96,6 +97,7 @@ impl TableKind { Self::PartitionStateMachine => &[KeyKind::Fsm], Self::Timers => &[KeyKind::Timers], Self::Journal => &[KeyKind::Journal], + Self::Promise => &[KeyKind::Promise], } } diff --git a/crates/partition-store/src/promise_table/mod.rs b/crates/partition-store/src/promise_table/mod.rs new file mode 100644 index 0000000000..cae9b6177a --- /dev/null +++ b/crates/partition-store/src/promise_table/mod.rs @@ -0,0 +1,148 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::keys::{define_table_key, KeyKind, TableKey}; +use crate::owned_iter::OwnedIterator; +use crate::scan::TableScan; +use crate::{PartitionStore, TableKind, TableScanIterationDecision}; +use crate::{RocksDBTransaction, StorageAccess}; +use anyhow::anyhow; +use bytes::Bytes; +use bytestring::ByteString; +use futures::Stream; +use futures_util::stream; +use restate_storage_api::promise_table::{ + OwnedPromiseRow, Promise, PromiseTable, ReadOnlyPromiseTable, +}; +use restate_storage_api::{Result, StorageError}; +use restate_types::identifiers::{PartitionKey, ServiceId, WithPartitionKey}; +use restate_types::storage::StorageCodec; +use std::ops::RangeInclusive; + +define_table_key!( + TableKind::Promise, + KeyKind::Promise, + PromiseKey( + partition_key: PartitionKey, + service_name: ByteString, + service_key: Bytes, + key: ByteString + ) +); + +fn create_key(service_id: &ServiceId, key: &ByteString) -> PromiseKey { + PromiseKey::default() + .partition_key(service_id.partition_key()) + .service_name(service_id.service_name.clone()) + .service_key(service_id.key.as_bytes().clone()) + .key(key.clone()) +} + +fn get_promise( + storage: &mut S, + service_id: &ServiceId, + key: &ByteString, +) -> Result> { + storage.get_value(create_key(service_id, key)) +} + +fn all_promise( + storage: &mut S, + range: RangeInclusive, +) -> impl Stream> + Send + '_ { + let iter = storage.iterator_from(TableScan::FullScanPartitionKeyRange::(range)); + stream::iter(OwnedIterator::new(iter).map(|(mut k, mut v)| { + let key = PromiseKey::deserialize_from(&mut k)?; + let metadata = StorageCodec::decode::(&mut v) + .map_err(|err| StorageError::Generic(err.into()))?; + + let (partition_key, service_name, service_key, promise_key) = key.into_inner_ok_or()?; + + Ok(OwnedPromiseRow { + service_id: ServiceId::with_partition_key( + partition_key, + service_name, + ByteString::try_from(service_key) + .map_err(|e| anyhow!("Cannot convert to string {e}"))?, + ), + key: promise_key, + metadata, + }) + })) +} + +fn put_promise( + storage: &mut S, + service_id: &ServiceId, + key: &ByteString, + metadata: Promise, +) { + storage.put_kv(create_key(service_id, key), metadata); +} + +fn delete_all_promises(storage: &mut S, service_id: &ServiceId) { + let prefix_key = PromiseKey::default() + .partition_key(service_id.partition_key()) + .service_name(service_id.service_name.clone()) + .service_key(service_id.key.as_bytes().clone()); + + let keys = storage.for_each_key_value_in_place( + TableScan::SinglePartitionKeyPrefix(service_id.partition_key(), prefix_key), + |k, _| TableScanIterationDecision::Emit(Ok(Bytes::copy_from_slice(k))), + ); + + for k in keys { + storage.delete_cf(TableKind::Promise, &k.unwrap()); + } +} + +impl ReadOnlyPromiseTable for PartitionStore { + async fn get_promise( + &mut self, + service_id: &ServiceId, + key: &ByteString, + ) -> Result> { + get_promise(self, service_id, key) + } + + fn all_promises( + &mut self, + range: RangeInclusive, + ) -> impl Stream> + Send { + all_promise(self, range) + } +} + +impl<'a> ReadOnlyPromiseTable for RocksDBTransaction<'a> { + async fn get_promise( + &mut self, + service_id: &ServiceId, + key: &ByteString, + ) -> Result> { + get_promise(self, service_id, key) + } + + fn all_promises( + &mut self, + range: RangeInclusive, + ) -> impl Stream> + Send { + all_promise(self, range) + } +} + +impl<'a> PromiseTable for RocksDBTransaction<'a> { + async fn put_promise(&mut self, service_id: &ServiceId, key: &ByteString, metadata: Promise) { + put_promise(self, service_id, key, metadata) + } + + async fn delete_all_promises(&mut self, service_id: &ServiceId) { + delete_all_promises(self, service_id) + } +} diff --git a/crates/partition-store/tests/integration_test.rs b/crates/partition-store/tests/integration_test.rs index 2d60bb01cd..7f193149c0 100644 --- a/crates/partition-store/tests/integration_test.rs +++ b/crates/partition-store/tests/integration_test.rs @@ -31,6 +31,7 @@ mod inbox_table_test; mod invocation_status_table_test; mod journal_table_test; mod outbox_table_test; +mod promise_table_test; mod state_table_test; mod timer_table_test; mod virtual_object_status_table_test; diff --git a/crates/partition-store/tests/promise_table_test/mod.rs b/crates/partition-store/tests/promise_table_test/mod.rs new file mode 100644 index 0000000000..6eddc75a63 --- /dev/null +++ b/crates/partition-store/tests/promise_table_test/mod.rs @@ -0,0 +1,125 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +// Unfortunately we need this because of https://github.com/rust-lang/rust-clippy/issues/9801 +#![allow(clippy::borrow_interior_mutable_const)] +#![allow(clippy::declare_interior_mutable_const)] + +use crate::storage_test_environment; +use bytes::Bytes; +use bytestring::ByteString; +use restate_storage_api::promise_table::{ + Promise, PromiseState, PromiseTable, ReadOnlyPromiseTable, +}; +use restate_storage_api::Transaction; +use restate_types::identifiers::{InvocationId, InvocationUuid, JournalEntryId, ServiceId}; +use restate_types::journal::EntryResult; + +const SERVICE_ID_1: ServiceId = ServiceId::from_static(10, "MySvc", "a"); +const SERVICE_ID_2: ServiceId = ServiceId::from_static(11, "MySvc", "b"); + +const PROMISE_KEY_1: ByteString = ByteString::from_static("prom1"); +const PROMISE_KEY_2: ByteString = ByteString::from_static("prom2"); +const PROMISE_KEY_3: ByteString = ByteString::from_static("prom3"); + +const PROMISE_COMPLETED: Promise = Promise { + state: PromiseState::Completed(EntryResult::Success(Bytes::from_static(b"{}"))), +}; + +#[tokio::test] +async fn test_promise_table() { + let mut rocksdb = storage_test_environment().await; + + let promise_not_completed = Promise { + state: PromiseState::NotCompleted(vec![ + JournalEntryId::from_parts( + InvocationId::from_parts( + 10, + InvocationUuid::from_parts(1706027034946, 12345678900001), + ), + 1, + ), + JournalEntryId::from_parts( + InvocationId::from_parts( + 11, + InvocationUuid::from_parts(1706027034946, 12345678900021), + ), + 2, + ), + ]), + }; + + // Fill in some data + let mut txn = rocksdb.transaction(); + txn.put_promise(&SERVICE_ID_1, &PROMISE_KEY_1, PROMISE_COMPLETED) + .await; + txn.put_promise(&SERVICE_ID_1, &PROMISE_KEY_2, promise_not_completed.clone()) + .await; + txn.put_promise(&SERVICE_ID_2, &PROMISE_KEY_3, PROMISE_COMPLETED) + .await; + txn.commit().await.unwrap(); + + // Query + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_1, &PROMISE_KEY_1,) + .await + .unwrap(), + Some(PROMISE_COMPLETED) + ); + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_1, &PROMISE_KEY_2,) + .await + .unwrap(), + Some(promise_not_completed) + ); + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_2, &PROMISE_KEY_3,) + .await + .unwrap(), + Some(PROMISE_COMPLETED) + ); + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_1, &PROMISE_KEY_3,) + .await + .unwrap(), + None + ); + + // Delete and query afterwards + let mut txn = rocksdb.transaction(); + txn.delete_all_promises(&SERVICE_ID_1).await; + txn.commit().await.unwrap(); + + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_1, &PROMISE_KEY_1,) + .await + .unwrap(), + None + ); + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_1, &PROMISE_KEY_2,) + .await + .unwrap(), + None + ); + assert_eq!( + rocksdb + .get_promise(&SERVICE_ID_2, &PROMISE_KEY_3,) + .await + .unwrap(), + Some(PROMISE_COMPLETED) + ); +} diff --git a/crates/service-protocol/src/codec.rs b/crates/service-protocol/src/codec.rs index 74ed8e44d3..3aee35f286 100644 --- a/crates/service-protocol/src/codec.rs +++ b/crates/service-protocol/src/codec.rs @@ -87,6 +87,9 @@ impl RawEntryCodec for ProtobufRawEntryCodec { ClearState, ClearAllState, GetStateKeys, + GetPromise, + PeekPromise, + CompletePromise, Sleep, Call, OneWayCall, @@ -172,7 +175,7 @@ mod mocks { }; use restate_types::journal::{ AwakeableEntry, CompletableEntry, CompleteAwakeableEntry, EntryResult, GetStateKeysEntry, - GetStateKeysResult, GetStateResult, InputEntry, OutputEntry, + GetStateKeysResult, InputEntry, OutputEntry, }; use restate_types::service_protocol::{ awakeable_entry_message, call_entry_message, complete_awakeable_entry_message, @@ -219,11 +222,13 @@ mod mocks { GetStateEntryMessage { key: entry.key, result: entry.value.map(|value| match value { - GetStateResult::Empty => { + CompletionResult::Empty => { get_state_entry_message::Result::Empty(service_protocol::Empty {}) } - GetStateResult::Result(v) => get_state_entry_message::Result::Value(v), - GetStateResult::Failure(code, reason) => { + CompletionResult::Success(v) => { + get_state_entry_message::Result::Value(v) + } + CompletionResult::Failure(code, reason) => { get_state_entry_message::Result::Failure(Failure { code: code.into(), message: reason.to_string(), diff --git a/crates/service-protocol/src/message/encoding.rs b/crates/service-protocol/src/message/encoding.rs index a18e8058ea..53f48cc90c 100644 --- a/crates/service-protocol/src/message/encoding.rs +++ b/crates/service-protocol/src/message/encoding.rs @@ -302,6 +302,15 @@ fn message_header_to_raw_header(message_header: &MessageHeader) -> PlainEntryHea is_completed: expect_flag!(message_header, completed), }, MessageType::ClearAllStateEntry => PlainEntryHeader::ClearAllState {}, + MessageType::GetPromiseEntry => PlainEntryHeader::GetPromise { + is_completed: expect_flag!(message_header, completed), + }, + MessageType::PeekPromiseEntry => PlainEntryHeader::PeekPromise { + is_completed: expect_flag!(message_header, completed), + }, + MessageType::CompletePromiseEntry => PlainEntryHeader::CompletePromise { + is_completed: expect_flag!(message_header, completed), + }, MessageType::SleepEntry => PlainEntryHeader::Sleep { is_completed: expect_flag!(message_header, completed), }, @@ -332,6 +341,9 @@ fn raw_header_to_message_type(entry_header: &PlainEntryHeader) -> MessageType { PlainEntryHeader::ClearState { .. } => MessageType::ClearStateEntry, PlainEntryHeader::GetStateKeys { .. } => MessageType::GetStateKeysEntry, PlainEntryHeader::ClearAllState { .. } => MessageType::ClearAllStateEntry, + PlainEntryHeader::GetPromise { .. } => MessageType::GetPromiseEntry, + PlainEntryHeader::PeekPromise { .. } => MessageType::PeekPromiseEntry, + PlainEntryHeader::CompletePromise { .. } => MessageType::CompletePromiseEntry, PlainEntryHeader::Sleep { .. } => MessageType::SleepEntry, PlainEntryHeader::Call { .. } => MessageType::InvokeEntry, PlainEntryHeader::OneWayCall { .. } => MessageType::BackgroundInvokeEntry, diff --git a/crates/service-protocol/src/message/header.rs b/crates/service-protocol/src/message/header.rs index fb2179557a..349e33aca3 100644 --- a/crates/service-protocol/src/message/header.rs +++ b/crates/service-protocol/src/message/header.rs @@ -46,6 +46,9 @@ pub enum MessageType { AwakeableEntry, CompleteAwakeableEntry, SideEffectEntry, + GetPromiseEntry, + PeekPromiseEntry, + CompletePromiseEntry, CustomEntry(u16), } @@ -71,6 +74,9 @@ impl MessageType { MessageType::AwakeableEntry => MessageKind::Syscall, MessageType::CompleteAwakeableEntry => MessageKind::Syscall, MessageType::SideEffectEntry => MessageKind::Syscall, + MessageType::GetPromiseEntry => MessageKind::State, + MessageType::PeekPromiseEntry => MessageKind::State, + MessageType::CompletePromiseEntry => MessageKind::State, MessageType::CustomEntry(_) => MessageKind::CustomEntry, } } @@ -83,6 +89,9 @@ impl MessageType { | MessageType::SleepEntry | MessageType::InvokeEntry | MessageType::AwakeableEntry + | MessageType::GetPromiseEntry + | MessageType::PeekPromiseEntry + | MessageType::CompletePromiseEntry ) } @@ -107,6 +116,9 @@ const SET_STATE_ENTRY_MESSAGE_TYPE: u16 = 0x0801; const CLEAR_STATE_ENTRY_MESSAGE_TYPE: u16 = 0x0802; const CLEAR_ALL_STATE_ENTRY_MESSAGE_TYPE: u16 = 0x0803; const GET_STATE_KEYS_ENTRY_MESSAGE_TYPE: u16 = 0x0804; +const GET_PROMISE_ENTRY_MESSAGE_TYPE: u16 = 0x0808; +const PEEK_PROMISE_ENTRY_MESSAGE_TYPE: u16 = 0x0809; +const COMPLETE_PROMISE_ENTRY_MESSAGE_TYPE: u16 = 0x080A; const SLEEP_ENTRY_MESSAGE_TYPE: u16 = 0x0C00; const INVOKE_ENTRY_MESSAGE_TYPE: u16 = 0x0C01; const BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE: u16 = 0x0C02; @@ -136,6 +148,9 @@ impl From for MessageTypeId { MessageType::AwakeableEntry => AWAKEABLE_ENTRY_MESSAGE_TYPE, MessageType::CompleteAwakeableEntry => COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE, MessageType::SideEffectEntry => SIDE_EFFECT_ENTRY_MESSAGE_TYPE, + MessageType::GetPromiseEntry => GET_PROMISE_ENTRY_MESSAGE_TYPE, + MessageType::PeekPromiseEntry => PEEK_PROMISE_ENTRY_MESSAGE_TYPE, + MessageType::CompletePromiseEntry => COMPLETE_PROMISE_ENTRY_MESSAGE_TYPE, MessageType::CustomEntry(id) => id, } } @@ -168,6 +183,9 @@ impl TryFrom for MessageType { BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE => Ok(MessageType::BackgroundInvokeEntry), AWAKEABLE_ENTRY_MESSAGE_TYPE => Ok(MessageType::AwakeableEntry), COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE => Ok(MessageType::CompleteAwakeableEntry), + GET_PROMISE_ENTRY_MESSAGE_TYPE => Ok(MessageType::GetPromiseEntry), + PEEK_PROMISE_ENTRY_MESSAGE_TYPE => Ok(MessageType::PeekPromiseEntry), + COMPLETE_PROMISE_ENTRY_MESSAGE_TYPE => Ok(MessageType::CompletePromiseEntry), SIDE_EFFECT_ENTRY_MESSAGE_TYPE => Ok(MessageType::SideEffectEntry), v if ((v & CUSTOM_MESSAGE_MASK) != 0) => Ok(MessageType::CustomEntry(v)), v => Err(UnknownMessageType(v)), @@ -193,6 +211,9 @@ impl TryFrom for EntryType { MessageType::AwakeableEntry => Ok(EntryType::Awakeable), MessageType::CompleteAwakeableEntry => Ok(EntryType::CompleteAwakeable), MessageType::SideEffectEntry => Ok(EntryType::Run), + MessageType::GetPromiseEntry => Ok(EntryType::GetPromise), + MessageType::PeekPromiseEntry => Ok(EntryType::PeekPromise), + MessageType::CompletePromiseEntry => Ok(EntryType::CompletePromise), MessageType::CustomEntry(_) => Ok(EntryType::Custom), MessageType::Start | MessageType::Completion diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index a28cbbf98d..1536c0f0c2 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -54,6 +54,24 @@ message SequenceNumber { uint64 sequence_number = 1; } +message JournalEntryId { + uint64 partition_key = 1; + bytes invocation_uuid = 2; + uint32 entry_index = 3; +} + +message EntryResult { + message Failure { + uint32 error_code = 1; + bytes message = 2; + } + + oneof result { + bytes value = 1; + Failure failure = 2; + } +} + // --------------------------------------------------------------------- // Service Invocation // --------------------------------------------------------------------- @@ -281,6 +299,18 @@ message EnrichedEntryHeader { message ClearAllState { } + message GetPromise { + bool is_completed = 1; + } + + message PeekPromise { + bool is_completed = 1; + } + + message CompletePromise { + bool is_completed = 1; + } + message Sleep { bool is_completed = 1; } @@ -318,6 +348,9 @@ message EnrichedEntryHeader { ClearState clear_state = 5; ClearAllState clear_all_state = 12; GetStateKeys get_state_keys = 13; + GetPromise get_promise = 15; + PeekPromise peek_promise = 16; + CompletePromise complete_promise = 17; Sleep sleep = 6; Invoke invoke = 7; BackgroundCall background_call = 8; @@ -468,4 +501,23 @@ message DedupSequenceNumber { message IdempotencyMetadata { InvocationId invocation_id = 1; +} + +// --------------------------------------------------------------------- +// Promises +// --------------------------------------------------------------------- + +message Promise { + message CompletedState { + EntryResult result = 1; + } + + message NotCompletedState { + repeated JournalEntryId listening_journal_entries = 1; + } + + oneof state { + CompletedState completed_state = 1; + NotCompletedState not_completed_state = 2; + } } \ No newline at end of file diff --git a/crates/storage-api/src/lib.rs b/crates/storage-api/src/lib.rs index 1345f744c3..daf109fd59 100644 --- a/crates/storage-api/src/lib.rs +++ b/crates/storage-api/src/lib.rs @@ -32,6 +32,7 @@ pub mod inbox_table; pub mod invocation_status_table; pub mod journal_table; pub mod outbox_table; +pub mod promise_table; pub mod service_status_table; pub mod state_table; mod storage; @@ -56,6 +57,7 @@ pub trait Transaction: + fsm_table::FsmTable + timer_table::TimerTable + idempotency_table::IdempotencyTable + + promise_table::PromiseTable + Send { fn commit(self) -> impl Future> + Send; diff --git a/crates/storage-api/src/promise_table/mod.rs b/crates/storage-api/src/promise_table/mod.rs new file mode 100644 index 0000000000..813374351f --- /dev/null +++ b/crates/storage-api/src/promise_table/mod.rs @@ -0,0 +1,71 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use super::{protobuf_storage_encode_decode, Result}; + +use bytestring::ByteString; +use futures_util::Stream; +use restate_types::identifiers::{JournalEntryId, PartitionKey, ServiceId}; +use restate_types::journal::EntryResult; +use std::future::Future; +use std::ops::RangeInclusive; + +#[derive(Debug, Clone, PartialEq)] +pub enum PromiseState { + Completed(EntryResult), + NotCompleted( + // Journal entries listening for this promise to be completed + Vec, + ), +} + +impl Default for PromiseState { + fn default() -> Self { + PromiseState::NotCompleted(vec![]) + } +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct Promise { + pub state: PromiseState, +} + +protobuf_storage_encode_decode!(Promise); + +#[derive(Debug, Clone, PartialEq)] +pub struct OwnedPromiseRow { + pub service_id: ServiceId, + pub key: ByteString, + pub metadata: Promise, +} + +pub trait ReadOnlyPromiseTable { + fn get_promise( + &mut self, + service_id: &ServiceId, + key: &ByteString, + ) -> impl Future>> + Send; + + fn all_promises( + &mut self, + range: RangeInclusive, + ) -> impl Stream> + Send; +} + +pub trait PromiseTable: ReadOnlyPromiseTable { + fn put_promise( + &mut self, + service_id: &ServiceId, + key: &ByteString, + metadata: Promise, + ) -> impl Future + Send; + + fn delete_all_promises(&mut self, service_id: &ServiceId) -> impl Future + Send; +} diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index a7bc20bb14..973df7c6a9 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -81,7 +81,7 @@ pub mod v1 { use restate_types::deployment::PinnedDeployment; use restate_types::errors::{IdDecodeError, InvocationError}; - use restate_types::identifiers::{DeploymentId, WithPartitionKey}; + use restate_types::identifiers::{DeploymentId, WithInvocationId, WithPartitionKey}; use restate_types::invocation::{InvocationTermination, TerminationFlavor}; use restate_types::journal::enriched::AwakeableEnrichmentResult; use restate_types::service_protocol::ServiceProtocolVersion; @@ -93,8 +93,9 @@ pub mod v1 { use crate::storage::v1::dedup_sequence_number::Variant; use crate::storage::v1::enriched_entry_header::{ - Awakeable, BackgroundCall, ClearAllState, ClearState, CompleteAwakeable, Custom, - GetState, GetStateKeys, Input, Invoke, Output, SetState, SideEffect, Sleep, + Awakeable, BackgroundCall, ClearAllState, ClearState, CompleteAwakeable, + CompletePromise, Custom, GetPromise, GetState, GetStateKeys, Input, Invoke, Output, + PeekPromise, SetState, SideEffect, Sleep, }; use crate::storage::v1::invocation_status::{Completed, Free, Inboxed, Invoked, Suspended}; use crate::storage::v1::journal_entry::completion_result::{Empty, Failure, Success}; @@ -106,14 +107,15 @@ pub mod v1 { Ingress, PartitionProcessor, ResponseSink, }; use crate::storage::v1::{ - enriched_entry_header, inbox_entry, invocation_resolution_result, invocation_status, - invocation_target, outbox_message, response_result, source, span_relation, timer, - virtual_object_status, BackgroundCallResolutionResult, DedupSequenceNumber, Duration, - EnrichedEntryHeader, EpochSequenceNumber, Header, IdempotencyMetadata, InboxEntry, - InvocationId, InvocationResolutionResult, InvocationStatus, InvocationTarget, - JournalEntry, JournalMeta, KvPair, OutboxMessage, ResponseResult, SequenceNumber, - ServiceId, ServiceInvocation, ServiceInvocationResponseSink, Source, SpanContext, - SpanRelation, StateMutation, Timer, VirtualObjectStatus, + enriched_entry_header, entry_result, inbox_entry, invocation_resolution_result, + invocation_status, invocation_target, outbox_message, promise, response_result, source, + span_relation, timer, virtual_object_status, BackgroundCallResolutionResult, + DedupSequenceNumber, Duration, EnrichedEntryHeader, EntryResult, EpochSequenceNumber, + Header, IdempotencyMetadata, InboxEntry, InvocationId, InvocationResolutionResult, + InvocationStatus, InvocationTarget, JournalEntry, JournalEntryId, JournalMeta, KvPair, + OutboxMessage, Promise, ResponseResult, SequenceNumber, ServiceId, ServiceInvocation, + ServiceInvocationResponseSink, Source, SpanContext, SpanRelation, StateMutation, Timer, + VirtualObjectStatus, }; use crate::StorageError; @@ -219,6 +221,75 @@ pub mod v1 { } } + impl From for JournalEntryId { + fn from(value: restate_types::identifiers::JournalEntryId) -> Self { + JournalEntryId { + partition_key: value.partition_key(), + invocation_uuid: value + .invocation_id() + .invocation_uuid() + .to_bytes() + .to_vec() + .into(), + entry_index: value.journal_index(), + } + } + } + + impl TryFrom for restate_types::identifiers::JournalEntryId { + type Error = ConversionError; + + fn try_from(value: JournalEntryId) -> Result { + Ok(restate_types::identifiers::JournalEntryId::from_parts( + restate_types::identifiers::InvocationId::from_parts( + value.partition_key, + try_bytes_into_invocation_uuid(value.invocation_uuid)?, + ), + value.entry_index, + )) + } + } + + impl From for EntryResult { + fn from(value: restate_types::journal::EntryResult) -> Self { + match value { + restate_types::journal::EntryResult::Success(s) => EntryResult { + result: Some(entry_result::Result::Value(s)), + }, + restate_types::journal::EntryResult::Failure(code, message) => EntryResult { + result: Some(entry_result::Result::Failure(entry_result::Failure { + error_code: code.into(), + message: message.into_bytes(), + })), + }, + } + } + } + + impl TryFrom for restate_types::journal::EntryResult { + type Error = ConversionError; + + fn try_from(value: EntryResult) -> Result { + Ok( + match value + .result + .ok_or(ConversionError::missing_field("result"))? + { + entry_result::Result::Value(s) => { + restate_types::journal::EntryResult::Success(s) + } + entry_result::Result::Failure(entry_result::Failure { + error_code, + message, + }) => restate_types::journal::EntryResult::Failure( + error_code.into(), + ByteString::try_from(message).map_err(ConversionError::invalid_data)?, + ), + }, + ) + } + } + impl TryFrom for crate::invocation_status_table::InvocationStatus { type Error = ConversionError; @@ -1492,6 +1563,21 @@ pub mod v1 { is_completed: get_state_keys.is_completed, } } + enriched_entry_header::Kind::GetPromise(get_promise) => { + restate_types::journal::enriched::EnrichedEntryHeader::GetPromise { + is_completed: get_promise.is_completed, + } + } + enriched_entry_header::Kind::PeekPromise(peek_promise) => { + restate_types::journal::enriched::EnrichedEntryHeader::PeekPromise { + is_completed: peek_promise.is_completed, + } + } + enriched_entry_header::Kind::CompletePromise(complete_promise) => { + restate_types::journal::enriched::EnrichedEntryHeader::CompletePromise { + is_completed: complete_promise.is_completed, + } + } enriched_entry_header::Kind::Sleep(sleep) => { restate_types::journal::enriched::EnrichedEntryHeader::Sleep { is_completed: sleep.is_completed, @@ -1628,6 +1714,20 @@ pub mod v1 { } => enriched_entry_header::Kind::Custom(Custom { code: u32::from(code), }), + restate_types::journal::enriched::EnrichedEntryHeader::GetPromise { + is_completed, + .. + } => enriched_entry_header::Kind::GetPromise(GetPromise { is_completed }), + restate_types::journal::enriched::EnrichedEntryHeader::PeekPromise { + is_completed, + .. + } => enriched_entry_header::Kind::PeekPromise(PeekPromise { is_completed }), + restate_types::journal::enriched::EnrichedEntryHeader::CompletePromise { + is_completed, + .. + } => enriched_entry_header::Kind::CompletePromise(CompletePromise { + is_completed, + }), }; EnrichedEntryHeader { kind: Some(kind) } @@ -2083,6 +2183,54 @@ pub mod v1 { } } + impl From for Promise { + fn from(value: crate::promise_table::Promise) -> Self { + match value.state { + crate::promise_table::PromiseState::Completed(e) => Promise { + state: Some(promise::State::CompletedState(promise::CompletedState { + result: Some(e.into()), + })), + }, + crate::promise_table::PromiseState::NotCompleted(listeners) => Promise { + state: Some(promise::State::NotCompletedState( + promise::NotCompletedState { + listening_journal_entries: listeners + .into_iter() + .map(Into::into) + .collect(), + }, + )), + }, + } + } + } + + impl TryFrom for crate::promise_table::Promise { + type Error = ConversionError; + + fn try_from(value: Promise) -> Result { + Ok(crate::promise_table::Promise { + state: match value.state.ok_or(ConversionError::missing_field("state"))? { + promise::State::CompletedState(s) => { + crate::promise_table::PromiseState::Completed( + s.result + .ok_or(ConversionError::missing_field("result"))? + .try_into()?, + ) + } + promise::State::NotCompletedState(s) => { + crate::promise_table::PromiseState::NotCompleted( + s.listening_journal_entries + .into_iter() + .map(TryInto::try_into) + .collect::, _>>()?, + ) + } + }, + }) + } + } + impl From for SequenceNumber { fn from(value: crate::fsm_table::SequenceNumber) -> Self { SequenceNumber { diff --git a/crates/types/src/errors.rs b/crates/types/src/errors.rs index b4fefdc923..a23d66f418 100644 --- a/crates/types/src/errors.rs +++ b/crates/types/src/errors.rs @@ -78,6 +78,7 @@ pub mod codes { pub const GONE: InvocationErrorCode = InvocationErrorCode(410); pub const JOURNAL_MISMATCH: InvocationErrorCode = InvocationErrorCode(570); pub const PROTOCOL_VIOLATION: InvocationErrorCode = InvocationErrorCode(571); + pub const CONFLICT: InvocationErrorCode = InvocationErrorCode(409); } /// This struct represents errors arisen when processing a service invocation. @@ -204,6 +205,9 @@ pub const CANCELED_INVOCATION_ERROR: InvocationError = pub const GONE_INVOCATION_ERROR: InvocationError = InvocationError::new_static(codes::GONE, "gone"); +pub const ALREADY_COMPLETED_INVOCATION_ERROR: InvocationError = + InvocationError::new_static(codes::CONFLICT, "promise was already completed"); + /// Error parsing/decoding a resource ID. #[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)] pub enum IdDecodeError { diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index a78e0929e1..60005f09c6 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -381,6 +381,11 @@ pub struct InvocationId { inner: InvocationUuid, } +pub trait WithInvocationId { + /// Returns the invocation id + fn invocation_id(&self) -> InvocationId; +} + pub type EncodedInvocationId = [u8; InvocationId::SIZE_IN_BYTES]; impl InvocationId { @@ -495,6 +500,12 @@ impl WithPartitionKey for InvocationId { } } +impl WithPartitionKey for T { + fn partition_key(&self) -> PartitionKey { + self.invocation_id().partition_key + } +} + impl fmt::Display for InvocationId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { // encode the id such that it is possible to do a string prefix search for a @@ -621,6 +632,37 @@ fn encode_invocation_id( buf } +#[derive(Eq, Hash, PartialEq, Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] +pub struct JournalEntryId { + invocation_id: InvocationId, + journal_index: EntryIndex, +} + +impl JournalEntryId { + pub const fn from_parts(invocation_id: InvocationId, journal_index: EntryIndex) -> Self { + Self { + invocation_id, + journal_index, + } + } + + pub fn journal_index(&self) -> EntryIndex { + self.journal_index + } +} + +impl From<(InvocationId, EntryIndex)> for JournalEntryId { + fn from(value: (InvocationId, EntryIndex)) -> Self { + Self::from_parts(value.0, value.1) + } +} + +impl WithInvocationId for JournalEntryId { + fn invocation_id(&self) -> InvocationId { + self.invocation_id + } +} + #[derive(Debug, Clone, serde_with::SerializeDisplay, serde_with::DeserializeFromStr)] pub struct LambdaARN { partition: ByteString, @@ -799,6 +841,18 @@ mod mocks { Alphanumeric.sample_string(&mut rand::thread_rng(), 16), ) } + + pub const fn from_static( + partition_key: PartitionKey, + service_name: &'static str, + service_key: &'static str, + ) -> Self { + Self { + service_name: ByteString::from_static(service_name), + key: ByteString::from_static(service_key), + partition_key, + } + } } impl IdempotencyId { diff --git a/crates/types/src/journal/entries.rs b/crates/types/src/journal/entries.rs index d206ea8af7..2e8c65126b 100644 --- a/crates/types/src/journal/entries.rs +++ b/crates/types/src/journal/entries.rs @@ -29,6 +29,9 @@ pub enum Entry { ClearState(ClearStateEntry), GetStateKeys(GetStateKeysEntry), ClearAllState, + GetPromise(GetPromiseEntry), + PeekPromise(PeekPromiseEntry), + CompletePromise(CompletePromiseEntry), // Syscalls Sleep(SleepEntry), @@ -51,7 +54,7 @@ impl Entry { Entry::Output(OutputEntry { result }) } - pub fn get_state(key: impl Into, value: Option) -> Self { + pub fn get_state(key: impl Into, value: Option) -> Self { Entry::GetState(GetStateEntry { key: key.into(), value, @@ -134,6 +137,15 @@ impl From for CompletionResult { } } +impl From for CompletionResult { + fn from(value: EntryResult) -> Self { + match value { + EntryResult::Success(s) => CompletionResult::Success(s), + EntryResult::Failure(c, m) => CompletionResult::Failure(c, m), + } + } +} + impl From<&InvocationError> for CompletionResult { fn from(value: &InvocationError) -> Self { CompletionResult::Failure(value.code(), value.message().into()) @@ -149,6 +161,9 @@ pub enum EntryType { ClearState, GetStateKeys, ClearAllState, + GetPromise, + PeekPromise, + CompletePromise, Sleep, Call, OneWayCall, @@ -192,6 +207,9 @@ mod private { pub trait Sealed {} impl Sealed for GetStateEntry {} impl Sealed for GetStateKeysEntry {} + impl Sealed for GetPromiseEntry {} + impl Sealed for PeekPromiseEntry {} + impl Sealed for CompletePromiseEntry {} impl Sealed for SleepEntry {} impl Sealed for InvokeEntry {} impl Sealed for AwakeableEntry {} @@ -207,17 +225,10 @@ pub struct OutputEntry { pub result: EntryResult, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum GetStateResult { - Empty, - Result(Bytes), - Failure(InvocationErrorCode, ByteString), -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct GetStateEntry { pub key: Bytes, - pub value: Option, + pub value: Option, } impl CompletableEntry for GetStateEntry { @@ -254,6 +265,49 @@ impl CompletableEntry for GetStateKeysEntry { } } +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct GetPromiseEntry { + pub key: ByteString, + pub value: Option, +} + +impl CompletableEntry for GetPromiseEntry { + fn is_completed(&self) -> bool { + self.value.is_some() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PeekPromiseEntry { + pub key: ByteString, + pub value: Option, +} + +impl CompletableEntry for PeekPromiseEntry { + fn is_completed(&self) -> bool { + self.value.is_some() + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CompletePromiseEntry { + pub key: ByteString, + pub completion: EntryResult, + pub value: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CompleteResult { + Done, + Failure(InvocationErrorCode, ByteString), +} + +impl CompletableEntry for CompletePromiseEntry { + fn is_completed(&self) -> bool { + self.value.is_some() + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub enum SleepResult { Fired, diff --git a/crates/types/src/journal/raw.rs b/crates/types/src/journal/raw.rs index 76347a9ef6..f44da80309 100644 --- a/crates/types/src/journal/raw.rs +++ b/crates/types/src/journal/raw.rs @@ -117,6 +117,15 @@ pub enum EntryHeader { is_completed: bool, }, ClearAllState, + GetPromise { + is_completed: bool, + }, + PeekPromise { + is_completed: bool, + }, + CompletePromise { + is_completed: bool, + }, Sleep { is_completed: bool, }, @@ -158,6 +167,9 @@ impl EntryHeader::CompleteAwakeable { .. } => None, EntryHeader::Run { .. } => None, EntryHeader::Custom { .. } => None, + EntryHeader::GetPromise { is_completed } => Some(*is_completed), + EntryHeader::PeekPromise { is_completed } => Some(*is_completed), + EntryHeader::CompletePromise { is_completed } => Some(*is_completed), } } @@ -177,6 +189,9 @@ impl EntryHeader::CompleteAwakeable { .. } => {} EntryHeader::Run { .. } => {} EntryHeader::Custom { .. } => {} + EntryHeader::GetPromise { is_completed } => *is_completed = true, + EntryHeader::PeekPromise { is_completed } => *is_completed = true, + EntryHeader::CompletePromise { is_completed } => *is_completed = true, } } @@ -196,6 +211,9 @@ impl EntryHeader::CompleteAwakeable { .. } => EntryType::CompleteAwakeable, EntryHeader::Run { .. } => EntryType::Run, EntryHeader::Custom { .. } => EntryType::Custom, + EntryHeader::GetPromise { .. } => EntryType::GetPromise, + EntryHeader::PeekPromise { .. } => EntryType::PeekPromise, + EntryHeader::CompletePromise { .. } => EntryType::CompletePromise, } } @@ -224,6 +242,11 @@ impl }, EntryHeader::Run { .. } => EntryHeader::Run {}, EntryHeader::Custom { code } => EntryHeader::Custom { code }, + EntryHeader::GetPromise { is_completed } => EntryHeader::GetPromise { is_completed }, + EntryHeader::PeekPromise { is_completed } => EntryHeader::PeekPromise { is_completed }, + EntryHeader::CompletePromise { is_completed } => { + EntryHeader::CompletePromise { is_completed } + } } } } diff --git a/crates/types/src/service_protocol.rs b/crates/types/src/service_protocol.rs index 26ee5f921a..24a52e971f 100644 --- a/crates/types/src/service_protocol.rs +++ b/crates/types/src/service_protocol.rs @@ -63,9 +63,10 @@ mod pb_into { use super::*; use crate::journal::{ - AwakeableEntry, ClearStateEntry, CompleteAwakeableEntry, Entry, EntryResult, GetStateEntry, - GetStateKeysEntry, GetStateKeysResult, GetStateResult, InputEntry, InvokeEntry, - InvokeRequest, OneWayCallEntry, OutputEntry, RunEntry, SetStateEntry, SleepEntry, + AwakeableEntry, ClearStateEntry, CompleteAwakeableEntry, CompletePromiseEntry, + CompleteResult, CompletionResult, Entry, EntryResult, GetPromiseEntry, GetStateEntry, + GetStateKeysEntry, GetStateKeysResult, InputEntry, InvokeEntry, InvokeRequest, + OneWayCallEntry, OutputEntry, PeekPromiseEntry, RunEntry, SetStateEntry, SleepEntry, SleepResult, }; @@ -99,10 +100,10 @@ mod pb_into { Ok(Self::GetState(GetStateEntry { key: msg.key, value: msg.result.map(|v| match v { - get_state_entry_message::Result::Empty(_) => GetStateResult::Empty, - get_state_entry_message::Result::Value(b) => GetStateResult::Result(b), + get_state_entry_message::Result::Empty(_) => CompletionResult::Empty, + get_state_entry_message::Result::Value(b) => CompletionResult::Success(b), get_state_entry_message::Result::Failure(failure) => { - GetStateResult::Failure(failure.code.into(), failure.message.into()) + CompletionResult::Failure(failure.code.into(), failure.message.into()) } }), })) @@ -153,6 +154,63 @@ mod pb_into { } } + impl TryFrom for Entry { + type Error = &'static str; + + fn try_from(msg: GetPromiseEntryMessage) -> Result { + Ok(Self::GetPromise(GetPromiseEntry { + key: msg.key.into(), + value: msg.result.map(|v| match v { + get_promise_entry_message::Result::Value(b) => EntryResult::Success(b), + get_promise_entry_message::Result::Failure(failure) => { + EntryResult::Failure(failure.code.into(), failure.message.into()) + } + }), + })) + } + } + + impl TryFrom for Entry { + type Error = &'static str; + + fn try_from(msg: PeekPromiseEntryMessage) -> Result { + Ok(Self::PeekPromise(PeekPromiseEntry { + key: msg.key.into(), + value: msg.result.map(|v| match v { + peek_promise_entry_message::Result::Empty(_) => CompletionResult::Empty, + peek_promise_entry_message::Result::Value(b) => CompletionResult::Success(b), + peek_promise_entry_message::Result::Failure(failure) => { + CompletionResult::Failure(failure.code.into(), failure.message.into()) + } + }), + })) + } + } + + impl TryFrom for Entry { + type Error = &'static str; + + fn try_from(msg: CompletePromiseEntryMessage) -> Result { + Ok(Self::CompletePromise(CompletePromiseEntry { + key: msg.key.into(), + completion: match msg.completion.ok_or("completion")? { + complete_promise_entry_message::Completion::CompletionValue(b) => { + EntryResult::Success(b) + } + complete_promise_entry_message::Completion::CompletionFailure(failure) => { + EntryResult::Failure(failure.code.into(), failure.message.into()) + } + }, + value: msg.result.map(|v| match v { + complete_promise_entry_message::Result::Empty(_) => CompleteResult::Done, + complete_promise_entry_message::Result::Failure(failure) => { + CompleteResult::Failure(failure.code.into(), failure.message.into()) + } + }), + })) + } + } + impl TryFrom for Entry { type Error = &'static str; diff --git a/crates/worker/src/invoker_integration.rs b/crates/worker/src/invoker_integration.rs index bacf25d819..cb62df4148 100644 --- a/crates/worker/src/invoker_integration.rs +++ b/crates/worker/src/invoker_integration.rs @@ -16,14 +16,12 @@ use restate_service_protocol::awakeable_id::AwakeableIdentifier; use restate_types::errors::{codes, InvocationError}; use restate_types::identifiers::InvocationId; use restate_types::invocation::{ - InvocationTarget, InvocationTargetType, ServiceInvocationSpanContext, SpanRelation, + InvocationTarget, InvocationTargetType, ServiceInvocationSpanContext, ServiceType, SpanRelation, }; use restate_types::journal::enriched::{ AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry, }; -use restate_types::journal::raw::{ - EntryHeader, PlainEntryHeader, PlainRawEntry, RawEntry, RawEntryCodec, -}; +use restate_types::journal::raw::{PlainEntryHeader, PlainRawEntry, RawEntry, RawEntryCodec}; use restate_types::journal::{CompleteAwakeableEntry, Entry, InvokeEntry, OneWayCallEntry}; use restate_types::journal::{EntryType, InvokeRequest}; use std::marker::PhantomData; @@ -162,6 +160,27 @@ where )?; EnrichedEntryHeader::ClearAllState {} } + PlainEntryHeader::GetPromise { is_completed } => { + check_workflow_type( + &header.as_entry_type(), + ¤t_invocation_target.service_ty(), + )?; + EnrichedEntryHeader::GetPromise { is_completed } + } + PlainEntryHeader::PeekPromise { is_completed } => { + check_workflow_type( + &header.as_entry_type(), + ¤t_invocation_target.service_ty(), + )?; + EnrichedEntryHeader::PeekPromise { is_completed } + } + PlainEntryHeader::CompletePromise { is_completed } => { + check_workflow_type( + &header.as_entry_type(), + ¤t_invocation_target.service_ty(), + )?; + EnrichedEntryHeader::CompletePromise { is_completed } + } PlainEntryHeader::Sleep { is_completed } => EnrichedEntryHeader::Sleep { is_completed }, PlainEntryHeader::Call { is_completed, .. } => { if !is_completed { @@ -225,7 +244,7 @@ where }, } } - EntryHeader::Run { .. } => EnrichedEntryHeader::Run {}, + PlainEntryHeader::Run { .. } => EnrichedEntryHeader::Run {}, PlainEntryHeader::Custom { code } => EnrichedEntryHeader::Custom { code }, }; @@ -233,6 +252,23 @@ where } } +#[inline] +fn check_workflow_type( + entry_type: &EntryType, + service_type: &ServiceType, +) -> Result<(), InvocationError> { + if *service_type != ServiceType::Workflow { + return Err(InvocationError::new( + codes::BAD_REQUEST, + format!( + "The service type {} does not support the entry type {}, only Workflow supports it", + service_type, entry_type + ), + )); + } + Ok(()) +} + #[inline] fn can_read_state( entry_type: &EntryType, diff --git a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs index 930cf5d16f..a380b4c338 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/mod.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/mod.rs @@ -25,15 +25,17 @@ use restate_storage_api::invocation_status_table::{ }; use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable}; use restate_storage_api::outbox_table::OutboxMessage; +use restate_storage_api::promise_table::{Promise, PromiseState, ReadOnlyPromiseTable}; use restate_storage_api::service_status_table::VirtualObjectStatus; use restate_storage_api::timer_table::Timer; use restate_storage_api::Result as StorageResult; use restate_types::errors::{ - InvocationError, InvocationErrorCode, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR, - KILLED_INVOCATION_ERROR, + InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, + CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, }; use restate_types::identifiers::{ - EntryIndex, IdempotencyId, InvocationId, PartitionKey, ServiceId, WithPartitionKey, + EntryIndex, IdempotencyId, InvocationId, JournalEntryId, PartitionKey, ServiceId, + WithInvocationId, WithPartitionKey, }; use restate_types::ingress::IngressResponse; use restate_types::invocation::{ @@ -148,7 +150,7 @@ where /// We use the returned service invocation id and span relation to log the effects (see [`Effects#log`]). #[instrument(level = "trace", skip_all, fields(command = ?command), err)] pub(crate) async fn on_apply< - State: StateReader + ReadOnlyJournalTable + ReadOnlyIdempotencyTable, + State: StateReader + ReadOnlyJournalTable + ReadOnlyIdempotencyTable + ReadOnlyPromiseTable, >( &mut self, command: Command, @@ -908,7 +910,7 @@ where invocation_id, ServiceInvocationSpanContext::empty(), ); - // TODO CLEANUP PROMISES + effects.clear_all_promises(service_id); } } InvocationStatus::Free => { @@ -924,7 +926,9 @@ where } } - async fn try_invoker_effect( + async fn try_invoker_effect< + State: StateReader + ReadOnlyJournalTable + ReadOnlyPromiseTable, + >( &mut self, effects: &mut Effects, state: &mut State, @@ -948,7 +952,7 @@ where Ok(()) } - async fn on_invoker_effect( + async fn on_invoker_effect( &mut self, effects: &mut Effects, state: &mut State, @@ -1223,7 +1227,7 @@ where } } - async fn handle_journal_entry( + async fn handle_journal_entry( &mut self, effects: &mut Effects, state: &mut State, @@ -1362,6 +1366,189 @@ where ); } } + EnrichedEntryHeader::GetPromise { is_completed, .. } => { + if !is_completed { + let_assert!( + Entry::GetPromise(GetPromiseEntry { key, .. }) = + journal_entry.deserialize_entry_ref::()? + ); + + if let Some(service_id) = + invocation_metadata.invocation_target.as_keyed_service_id() + { + // Load state and write completion + let promise_metadata = state.get_promise(&service_id, &key).await?; + + match promise_metadata { + Some(Promise { + state: PromiseState::Completed(result), + }) => { + // Result is already available + let completion_result: CompletionResult = result.into(); + Codec::write_completion( + &mut journal_entry, + completion_result.clone(), + )?; + + // Forward completion + effects.forward_completion( + invocation_id, + Completion::new(entry_index, completion_result), + ); + } + Some(Promise { + state: PromiseState::NotCompleted(mut v), + }) => { + v.push(JournalEntryId::from_parts(invocation_id, entry_index)); + effects.put_promise( + service_id, + key, + Promise { + state: PromiseState::NotCompleted(v), + }, + ) + } + None => effects.put_promise( + service_id, + key, + Promise { + state: PromiseState::NotCompleted(vec![ + JournalEntryId::from_parts(invocation_id, entry_index), + ]), + }, + ), + } + } else { + warn!( + "Trying to process entry {} for a target that has no promises", + journal_entry.header().as_entry_type() + ); + effects.forward_completion( + invocation_id, + Completion::new(entry_index, CompletionResult::Success(Bytes::new())), + ); + } + } + } + EnrichedEntryHeader::PeekPromise { is_completed, .. } => { + if !is_completed { + let_assert!( + Entry::PeekPromise(PeekPromiseEntry { key, .. }) = + journal_entry.deserialize_entry_ref::()? + ); + + if let Some(service_id) = + invocation_metadata.invocation_target.as_keyed_service_id() + { + // Load state and write completion + let promise_metadata = state.get_promise(&service_id, &key).await?; + + let completion_result = match promise_metadata { + Some(Promise { + state: PromiseState::Completed(result), + }) => result.into(), + _ => CompletionResult::Empty, + }; + + Codec::write_completion(&mut journal_entry, completion_result.clone())?; + + // Forward completion + effects.forward_completion( + invocation_id, + Completion::new(entry_index, completion_result), + ); + } else { + warn!( + "Trying to process entry {} for a target that has no promises", + journal_entry.header().as_entry_type() + ); + effects.forward_completion( + invocation_id, + Completion::new(entry_index, CompletionResult::Empty), + ); + } + } + } + EnrichedEntryHeader::CompletePromise { is_completed, .. } => { + if !is_completed { + let_assert!( + Entry::CompletePromise(CompletePromiseEntry { + key, + completion, + .. + }) = journal_entry.deserialize_entry_ref::()? + ); + + if let Some(service_id) = + invocation_metadata.invocation_target.as_keyed_service_id() + { + // Load state and write completion + let promise_metadata = state.get_promise(&service_id, &key).await?; + + let completion_result = match promise_metadata { + None => { + // Just register the promise completion + effects.put_promise( + service_id, + key, + Promise { + state: PromiseState::Completed(completion), + }, + ); + CompletionResult::Empty + } + Some(Promise { + state: PromiseState::NotCompleted(listeners), + }) => { + // Send response to listeners + for listener in listeners { + self.handle_outgoing_message( + OutboxMessage::ServiceResponse(InvocationResponse { + id: listener.invocation_id(), + entry_index: listener.journal_index(), + result: completion.clone().into(), + }), + effects, + ); + } + + // Now register the promise completion + effects.put_promise( + service_id, + key, + Promise { + state: PromiseState::Completed(completion), + }, + ); + CompletionResult::Empty + } + Some(Promise { + state: PromiseState::Completed(_), + }) => { + // Conflict! + (&ALREADY_COMPLETED_INVOCATION_ERROR).into() + } + }; + + Codec::write_completion(&mut journal_entry, completion_result.clone())?; + + // Forward completion + effects.forward_completion( + invocation_id, + Completion::new(entry_index, completion_result), + ); + } else { + warn!( + "Trying to process entry {} for a target that has no promises", + journal_entry.header().as_entry_type() + ); + effects.forward_completion( + invocation_id, + Completion::new(entry_index, CompletionResult::Empty), + ); + } + } + } EnrichedEntryHeader::Sleep { is_completed, .. } => { debug_assert!(!is_completed, "Sleep entry must not be completed."); let_assert!( diff --git a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs index a162a507f4..17172ca372 100644 --- a/crates/worker/src/partition/state_machine/command_interpreter/tests.rs +++ b/crates/worker/src/partition/state_machine/command_interpreter/tests.rs @@ -21,6 +21,7 @@ use restate_service_protocol::codec::ProtobufRawEntryCodec; use restate_storage_api::idempotency_table::IdempotencyMetadata; use restate_storage_api::inbox_table::SequenceNumberInboxEntry; use restate_storage_api::invocation_status_table::{JournalMetadata, StatusTimestamps}; +use restate_storage_api::promise_table::OwnedPromiseRow; use restate_storage_api::timer_table::{TimerKey, TimerKeyKind}; use restate_storage_api::{Result as StorageResult, StorageError}; use restate_test_util::matchers::*; @@ -270,6 +271,27 @@ impl ReadOnlyIdempotencyTable for StateReaderMock { } } +impl ReadOnlyPromiseTable for StateReaderMock { + async fn get_promise( + &mut self, + _service_id: &ServiceId, + _key: &ByteString, + ) -> StorageResult> { + unimplemented!(); + } + + fn all_promises( + &mut self, + _range: RangeInclusive, + ) -> impl Stream> + Send { + unimplemented!(); + + // I need this for type inference to work + #[allow(unreachable_code)] + futures::stream::iter(vec![]) + } +} + #[test(tokio::test)] async fn awakeable_with_success() { let mut state_machine: CommandInterpreter = diff --git a/crates/worker/src/partition/state_machine/effect_interpreter.rs b/crates/worker/src/partition/state_machine/effect_interpreter.rs index 1913b3f7ed..2b5d260b7a 100644 --- a/crates/worker/src/partition/state_machine/effect_interpreter.rs +++ b/crates/worker/src/partition/state_machine/effect_interpreter.rs @@ -173,7 +173,8 @@ impl EffectInterpreter { pub(crate) async fn interpret_effects< S: StateStorage + restate_storage_api::invocation_status_table::ReadOnlyInvocationStatusTable - + restate_storage_api::idempotency_table::IdempotencyTable, + + restate_storage_api::idempotency_table::IdempotencyTable + + restate_storage_api::promise_table::PromiseTable, >( effects: &mut Effects, state_storage: &mut S, @@ -189,7 +190,8 @@ impl EffectInterpreter { async fn interpret_effect< S: StateStorage + restate_storage_api::invocation_status_table::ReadOnlyInvocationStatusTable - + restate_storage_api::idempotency_table::IdempotencyTable, + + restate_storage_api::idempotency_table::IdempotencyTable + + restate_storage_api::promise_table::PromiseTable, >( effect: Effect, state_storage: &mut S, @@ -444,6 +446,16 @@ impl EffectInterpreter { Effect::IngressResponse(ingress_response) => { collector.push(Action::IngressResponse(ingress_response)); } + Effect::PutPromise { + service_id, + key, + metadata, + } => { + state_storage.put_promise(&service_id, &key, metadata).await; + } + Effect::ClearAllPromises { service_id } => { + state_storage.delete_all_promises(&service_id).await + } } Ok(()) diff --git a/crates/worker/src/partition/state_machine/effects.rs b/crates/worker/src/partition/state_machine/effects.rs index 41ad086438..bdacb87878 100644 --- a/crates/worker/src/partition/state_machine/effects.rs +++ b/crates/worker/src/partition/state_machine/effects.rs @@ -10,6 +10,7 @@ use crate::partition::types::InvocationIdAndTarget; use bytes::Bytes; +use bytestring::ByteString; use opentelemetry::trace::SpanId; use restate_storage_api::inbox_table::InboxEntry; use restate_storage_api::invocation_status_table::{ @@ -17,6 +18,8 @@ use restate_storage_api::invocation_status_table::{ }; use restate_storage_api::invocation_status_table::{InvocationStatus, JournalMetadata}; use restate_storage_api::outbox_table::OutboxMessage; +use restate_storage_api::promise_table::Promise; +use restate_storage_api::promise_table::PromiseState; use restate_storage_api::timer_table::{Timer, TimerKey}; use restate_types::deployment::PinnedDeployment; use restate_types::errors::InvocationErrorCode; @@ -99,6 +102,16 @@ pub(crate) enum Effect { span_context: ServiceInvocationSpanContext, }, + // Promises + PutPromise { + service_id: ServiceId, + key: ByteString, + metadata: Promise, + }, + ClearAllPromises { + service_id: ServiceId, + }, + // Timers RegisterTimer { timer_value: TimerKeyValue, @@ -420,6 +433,32 @@ impl Effect { debug_if_leader!(is_leader, "Effect: Clear all state") } + Effect::PutPromise { + service_id, + key, + metadata: + Promise { + state: PromiseState::Completed(_), + }, + } => { + debug_if_leader!(is_leader, rpc.service = %service_id.service_name, "Effect: Put promise {} in completed state", key) + } + Effect::PutPromise { + service_id, + key, + metadata: + Promise { + state: PromiseState::NotCompleted(_), + }, + } => { + debug_if_leader!(is_leader, rpc.service = %service_id.service_name, "Effect: Put promise {} in non completed state", key) + } + Effect::ClearAllPromises { service_id } => { + debug_if_leader!( + is_leader, + rpc.service = %service_id.service_name, + "Effect: Clear all promises") + } Effect::RegisterTimer { timer_value, span_context, @@ -991,6 +1030,23 @@ impl Effects { self.effects.push(Effect::MutateState(state_mutation)); } + pub(crate) fn put_promise( + &mut self, + service_id: ServiceId, + key: ByteString, + promise_metadata: Promise, + ) { + self.effects.push(Effect::PutPromise { + service_id, + key, + metadata: promise_metadata, + }); + } + + pub(crate) fn clear_all_promises(&mut self, service_id: ServiceId) { + self.effects.push(Effect::ClearAllPromises { service_id }); + } + /// We log only if the log level is TRACE, or if the log level is DEBUG and we're the leader, /// or if the span level is INFO and we're the leader. pub(crate) fn log(&self, is_leader: bool) { diff --git a/crates/worker/src/partition/storage/mod.rs b/crates/worker/src/partition/storage/mod.rs index c7b3afab26..e72d545d50 100644 --- a/crates/worker/src/partition/storage/mod.rs +++ b/crates/worker/src/partition/storage/mod.rs @@ -11,6 +11,7 @@ use crate::metric_definitions::{PARTITION_STORAGE_TX_COMMITTED, PARTITION_STORAGE_TX_CREATED}; use crate::partition::shuffle::{OutboxReader, OutboxReaderError}; use bytes::Bytes; +use bytestring::ByteString; use futures::{Stream, StreamExt, TryStreamExt}; use metrics::counter; use restate_storage_api::deduplication_table::{ @@ -24,6 +25,7 @@ use restate_storage_api::invocation_status_table::{ }; use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable}; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; +use restate_storage_api::promise_table::{OwnedPromiseRow, Promise}; use restate_storage_api::service_status_table::{ ReadOnlyVirtualObjectStatusTable, VirtualObjectStatus, }; @@ -560,6 +562,7 @@ where } } +// Workaround until https://github.com/restatedev/restate/issues/276 is sorted out impl ReadOnlyInvocationStatusTable for Transaction where TransactionType: restate_storage_api::Transaction + Send, @@ -623,6 +626,48 @@ where } } +// Workaround until https://github.com/restatedev/restate/issues/276 is sorted out +impl restate_storage_api::promise_table::ReadOnlyPromiseTable + for Transaction +where + TransactionType: restate_storage_api::Transaction + Send, +{ + fn get_promise( + &mut self, + service_id: &ServiceId, + key: &ByteString, + ) -> impl Future>> + Send { + self.inner.get_promise(service_id, key) + } + + fn all_promises( + &mut self, + range: RangeInclusive, + ) -> impl Stream> + Send { + self.inner.all_promises(range) + } +} + +// Workaround until https://github.com/restatedev/restate/issues/276 is sorted out +impl restate_storage_api::promise_table::PromiseTable + for Transaction +where + TransactionType: restate_storage_api::Transaction + Send, +{ + fn put_promise( + &mut self, + service_id: &ServiceId, + key: &ByteString, + metadata: Promise, + ) -> impl Future + Send { + self.inner.put_promise(service_id, key, metadata) + } + + fn delete_all_promises(&mut self, service_id: &ServiceId) -> impl Future + Send { + self.inner.delete_all_promises(service_id) + } +} + mod fsm_variable { pub(crate) const INBOX_SEQ_NUMBER: u64 = 0; pub(crate) const OUTBOX_SEQ_NUMBER: u64 = 1;