Skip to content

Commit

Permalink
Handle promise entries in state machine.
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 14, 2024
1 parent a662c68 commit 3b7e8f2
Show file tree
Hide file tree
Showing 7 changed files with 376 additions and 13 deletions.
4 changes: 4 additions & 0 deletions crates/types/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub mod codes {
pub const GONE: InvocationErrorCode = InvocationErrorCode(410);
pub const JOURNAL_MISMATCH: InvocationErrorCode = InvocationErrorCode(570);
pub const PROTOCOL_VIOLATION: InvocationErrorCode = InvocationErrorCode(571);
pub const CONFLICT: InvocationErrorCode = InvocationErrorCode(409);
}

/// This struct represents errors arisen when processing a service invocation.
Expand Down Expand Up @@ -204,6 +205,9 @@ pub const CANCELED_INVOCATION_ERROR: InvocationError =

pub const GONE_INVOCATION_ERROR: InvocationError = InvocationError::new_static(codes::GONE, "gone");

pub const CONFLICT_INVOCATION_ERROR: InvocationError =
InvocationError::new_static(codes::CONFLICT, "conflict");

/// Error parsing/decoding a resource ID.
#[derive(Debug, thiserror::Error, Clone, Eq, PartialEq)]
pub enum IdDecodeError {
Expand Down
45 changes: 41 additions & 4 deletions crates/worker/src/invoker_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ use restate_types::invocation::{
use restate_types::journal::enriched::{
AwakeableEnrichmentResult, CallEnrichmentResult, EnrichedEntryHeader, EnrichedRawEntry,
};
use restate_types::journal::raw::{
EntryHeader, PlainEntryHeader, PlainRawEntry, RawEntry, RawEntryCodec,
};
use restate_types::journal::raw::{PlainEntryHeader, PlainRawEntry, RawEntry, RawEntryCodec};
use restate_types::journal::{CompleteAwakeableEntry, Entry, InvokeEntry, OneWayCallEntry};
use restate_types::journal::{EntryType, InvokeRequest};
use std::marker::PhantomData;
Expand Down Expand Up @@ -155,6 +153,27 @@ where
)?;
EnrichedEntryHeader::ClearAllState {}
}
PlainEntryHeader::GetPromise { is_completed } => {
check_workflow_type(
&header.as_entry_type(),
&current_invocation_target.service_ty(),
)?;
EnrichedEntryHeader::GetPromise { is_completed }
}
PlainEntryHeader::PeekPromise { is_completed } => {
check_workflow_type(
&header.as_entry_type(),
&current_invocation_target.service_ty(),
)?;
EnrichedEntryHeader::PeekPromise { is_completed }
}
PlainEntryHeader::CompletePromise { is_completed } => {
check_workflow_type(
&header.as_entry_type(),
&current_invocation_target.service_ty(),
)?;
EnrichedEntryHeader::CompletePromise { is_completed }
}
PlainEntryHeader::Sleep { is_completed } => EnrichedEntryHeader::Sleep { is_completed },
PlainEntryHeader::Call { is_completed, .. } => {
if !is_completed {
Expand Down Expand Up @@ -218,14 +237,32 @@ where
},
}
}
EntryHeader::Run { .. } => EnrichedEntryHeader::Run {},
PlainEntryHeader::Run { .. } => EnrichedEntryHeader::Run {},
PlainEntryHeader::Custom { code } => EnrichedEntryHeader::Custom { code },
};

Ok(RawEntry::new(enriched_header, serialized_entry))
}
}

#[inline]
fn check_workflow_type(
entry_type: &EntryType,
service_type: &ServiceType,
) -> Result<(), InvocationError> {
// TODO FIX THIS WITH WORKFLOW TYPE
if !service_type.has_state() {
return Err(InvocationError::new(
codes::BAD_REQUEST,
format!(
"The service type {} does not support the entry type {}, only Workflow supports it",
service_type, entry_type
),
));
}
Ok(())
}

#[inline]
fn can_read_state(
entry_type: &EntryType,
Expand Down
201 changes: 194 additions & 7 deletions crates/worker/src/partition/state_machine/command_interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ use restate_storage_api::invocation_status_table::{
};
use restate_storage_api::journal_table::{JournalEntry, ReadOnlyJournalTable};
use restate_storage_api::outbox_table::OutboxMessage;
use restate_storage_api::promise_table::{PromiseMetadata, PromiseState, ReadOnlyPromiseTable};
use restate_storage_api::service_status_table::VirtualObjectStatus;
use restate_storage_api::timer_table::Timer;
use restate_storage_api::Result as StorageResult;
use restate_types::errors::{
InvocationError, InvocationErrorCode, CANCELED_INVOCATION_ERROR, GONE_INVOCATION_ERROR,
KILLED_INVOCATION_ERROR,
InvocationError, InvocationErrorCode, CANCELED_INVOCATION_ERROR, CONFLICT_INVOCATION_ERROR,
GONE_INVOCATION_ERROR, KILLED_INVOCATION_ERROR,
};
use restate_types::identifiers::{
EntryIndex, IdempotencyId, InvocationId, PartitionKey, ServiceId, WithPartitionKey,
EntryIndex, IdempotencyId, InvocationId, JournalEntryId, PartitionKey, ServiceId,
WithInvocationId, WithPartitionKey,
};
use restate_types::ingress::IngressResponse;
use restate_types::invocation::{
Expand Down Expand Up @@ -147,7 +149,7 @@ where
/// We use the returned service invocation id and span relation to log the effects (see [`Effects#log`]).
#[instrument(level = "trace", skip_all, fields(command = ?command), err)]
pub(crate) async fn on_apply<
State: StateReader + ReadOnlyJournalTable + ReadOnlyIdempotencyTable,
State: StateReader + ReadOnlyJournalTable + ReadOnlyIdempotencyTable + ReadOnlyPromiseTable,
>(
&mut self,
command: Command,
Expand Down Expand Up @@ -858,7 +860,9 @@ where
}
}

async fn try_invoker_effect<State: StateReader + ReadOnlyJournalTable>(
async fn try_invoker_effect<
State: StateReader + ReadOnlyJournalTable + ReadOnlyPromiseTable,
>(
&mut self,
effects: &mut Effects,
state: &mut State,
Expand All @@ -882,7 +886,7 @@ where
Ok(())
}

async fn on_invoker_effect<State: StateReader + ReadOnlyJournalTable>(
async fn on_invoker_effect<State: StateReader + ReadOnlyJournalTable + ReadOnlyPromiseTable>(
&mut self,
effects: &mut Effects,
state: &mut State,
Expand Down Expand Up @@ -1151,7 +1155,7 @@ where
}
}

async fn handle_journal_entry<State: StateReader>(
async fn handle_journal_entry<State: StateReader + ReadOnlyPromiseTable>(
&mut self,
effects: &mut Effects,
state: &mut State,
Expand Down Expand Up @@ -1290,6 +1294,189 @@ where
);
}
}
EnrichedEntryHeader::GetPromise { is_completed, .. } => {
if !is_completed {
let_assert!(
Entry::GetPromise(GetPromiseEntry { key, .. }) =
journal_entry.deserialize_entry_ref::<Codec>()?
);

if let Some(service_id) =
invocation_metadata.invocation_target.as_keyed_service_id()
{
// Load state and write completion
let promise_metadata = state.get_promise(&service_id, &key).await?;

match promise_metadata {
Some(PromiseMetadata {
state: PromiseState::Completed(result),
}) => {
// Result is already available
let completion_result: CompletionResult = result.into();
Codec::write_completion(
&mut journal_entry,
completion_result.clone(),
)?;

// Forward completion
effects.forward_completion(
invocation_id,
Completion::new(entry_index, completion_result),
);
}
Some(PromiseMetadata {
state: PromiseState::NotCompleted(mut v),
}) => {
v.push(JournalEntryId::from_parts(invocation_id, entry_index));
effects.put_promise(
service_id,
key,
PromiseMetadata {
state: PromiseState::NotCompleted(v),
},
)
}
None => effects.put_promise(
service_id,
key,
PromiseMetadata {
state: PromiseState::NotCompleted(vec![
JournalEntryId::from_parts(invocation_id, entry_index),
]),
},
),
}
} else {
warn!(
"Trying to process entry {} for a target that has no promises",
journal_entry.header().as_entry_type()
);
effects.forward_completion(
invocation_id,
Completion::new(entry_index, CompletionResult::Success(Bytes::new())),
);
}
}
}
EnrichedEntryHeader::PeekPromise { is_completed, .. } => {
if !is_completed {
let_assert!(
Entry::PeekPromise(PeekPromiseEntry { key, .. }) =
journal_entry.deserialize_entry_ref::<Codec>()?
);

if let Some(service_id) =
invocation_metadata.invocation_target.as_keyed_service_id()
{
// Load state and write completion
let promise_metadata = state.get_promise(&service_id, &key).await?;

let completion_result = match promise_metadata {
Some(PromiseMetadata {
state: PromiseState::Completed(result),
}) => result.into(),
_ => CompletionResult::Empty,
};

Codec::write_completion(&mut journal_entry, completion_result.clone())?;

// Forward completion
effects.forward_completion(
invocation_id,
Completion::new(entry_index, completion_result),
);
} else {
warn!(
"Trying to process entry {} for a target that has no promises",
journal_entry.header().as_entry_type()
);
effects.forward_completion(
invocation_id,
Completion::new(entry_index, CompletionResult::Empty),
);
}
}
}
EnrichedEntryHeader::CompletePromise { is_completed, .. } => {
if !is_completed {
let_assert!(
Entry::CompletePromise(CompletePromiseEntry {
key,
completion,
..
}) = journal_entry.deserialize_entry_ref::<Codec>()?
);

if let Some(service_id) =
invocation_metadata.invocation_target.as_keyed_service_id()
{
// Load state and write completion
let promise_metadata = state.get_promise(&service_id, &key).await?;

let completion_result = match promise_metadata {
None => {
// Just register the promise completion
effects.put_promise(
service_id,
key,
PromiseMetadata {
state: PromiseState::Completed(completion),
},
);
CompletionResult::Empty
}
Some(PromiseMetadata {
state: PromiseState::NotCompleted(listeners),
}) => {
// Send response to listeners
for listener in listeners {
self.handle_outgoing_message(
OutboxMessage::ServiceResponse(InvocationResponse {
id: listener.invocation_id(),
entry_index: listener.journal_index(),
result: completion.clone().into(),
}),
effects,
);
}

// Now register the promise completion
effects.put_promise(
service_id,
key,
PromiseMetadata {
state: PromiseState::Completed(completion),
},
);
CompletionResult::Empty
}
Some(PromiseMetadata {
state: PromiseState::Completed(_),
}) => {
// Conflict!
(&CONFLICT_INVOCATION_ERROR).into()
}
};

Codec::write_completion(&mut journal_entry, completion_result.clone())?;

// Forward completion
effects.forward_completion(
invocation_id,
Completion::new(entry_index, completion_result),
);
} else {
warn!(
"Trying to process entry {} for a target that has no promises",
journal_entry.header().as_entry_type()
);
effects.forward_completion(
invocation_id,
Completion::new(entry_index, CompletionResult::Empty),
);
}
}
}
EnrichedEntryHeader::Sleep { is_completed, .. } => {
debug_assert!(!is_completed, "Sleep entry must not be completed.");
let_assert!(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use restate_service_protocol::pb::protocol::SleepEntryMessage;
use restate_storage_api::idempotency_table::IdempotencyMetadata;
use restate_storage_api::inbox_table::SequenceNumberInboxEntry;
use restate_storage_api::invocation_status_table::{JournalMetadata, StatusTimestamps};
use restate_storage_api::promise_table::OwnedPromiseRow;
use restate_storage_api::timer_table::{TimerKey, TimerKeyKind};
use restate_storage_api::{Result as StorageResult, StorageError};
use restate_test_util::matchers::*;
Expand Down Expand Up @@ -260,6 +261,27 @@ impl ReadOnlyIdempotencyTable for StateReaderMock {
}
}

impl ReadOnlyPromiseTable for StateReaderMock {
async fn get_promise(
&mut self,
_service_id: &ServiceId,
_key: &ByteString,
) -> StorageResult<Option<PromiseMetadata>> {
unimplemented!();
}

fn all_promises(
&mut self,
_range: RangeInclusive<PartitionKey>,
) -> impl Stream<Item = StorageResult<OwnedPromiseRow>> + Send {
unimplemented!();

// I need this for type inference to work
#[allow(unreachable_code)]
futures::stream::iter(vec![])
}
}

#[test(tokio::test)]
async fn awakeable_with_success() {
let mut state_machine: CommandInterpreter<ProtobufRawEntryCodec> =
Expand Down
Loading

0 comments on commit 3b7e8f2

Please sign in to comment.