Skip to content

Commit

Permalink
Remove Built in service effects (#1582)
Browse files Browse the repository at this point in the history
This is a leftover from the removal of built in effects.
  • Loading branch information
slinkydeveloper authored Jun 3, 2024
1 parent 0d4a6ad commit ce1edbd
Show file tree
Hide file tree
Showing 3 changed files with 0 additions and 171 deletions.
56 changes: 0 additions & 56 deletions crates/wal-protocol/src/effects.rs

This file was deleted.

4 changes: 0 additions & 4 deletions crates/wal-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@ use restate_types::state_mut::ExternalStateMutation;
use restate_types::{flexbuffers_storage_encode_decode, Version};

use crate::control::AnnounceLeader;
use crate::effects::BuiltinServiceEffects;
use crate::timer::TimerKeyValue;
use restate_types::logs::{LogId, Lsn, Payload};
use restate_types::partition_table::{FindPartition, PartitionTableError};
use restate_types::storage::{StorageCodec, StorageDecodeError, StorageEncodeError};
use restate_types::{GenerationalNodeId, PlainNodeId};

pub mod control;
pub mod effects;
pub mod timer;

/// The primary envelope for all messages in the system.
Expand Down Expand Up @@ -146,8 +144,6 @@ pub enum Command {
ScheduleTimer(TimerKeyValue),
/// Another partition processor is reporting a response of an invocation we requested.
InvocationResponse(InvocationResponse),
/// A built-in invoker reporting effects from an invocation.
BuiltInInvokerEffect(BuiltinServiceEffects),
}

impl Command {
Expand Down
111 changes: 0 additions & 111 deletions crates/worker/src/partition/state_machine/command_interpreter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use restate_types::journal::*;
use restate_types::message::MessageIndex;
use restate_types::state_mut::ExternalStateMutation;
use restate_types::time::MillisSinceEpoch;
use restate_wal_protocol::effects::{BuiltinServiceEffect, BuiltinServiceEffects};
use restate_wal_protocol::timer::TimerKeyValue;
use restate_wal_protocol::Command;
use std::collections::HashSet;
Expand Down Expand Up @@ -205,10 +204,6 @@ where
self.try_purge_invocation(purge_invocation_request.invocation_id, state, effects)
.await
}
Command::BuiltInInvokerEffect(builtin_service_effects) => {
self.try_built_in_invoker_effect(effects, state, builtin_service_effects)
.await
}
Command::PatchState(mutation) => {
self.handle_external_state_mutation(mutation, state, effects)
.await
Expand Down Expand Up @@ -472,89 +467,6 @@ where
Ok(())
}

async fn try_built_in_invoker_effect<State: StateReader>(
&mut self,
effects: &mut Effects,
state: &mut State,
nbis_effects: BuiltinServiceEffects,
) -> Result<(), Error> {
let (invocation_id, nbis_effects) = nbis_effects.into_inner();
let invocation_status =
Self::get_invocation_status_and_trace(state, &invocation_id, effects).await?;

match invocation_status {
InvocationStatus::Invoked(invocation_metadata) => {
for nbis_effect in nbis_effects {
self.on_built_in_invoker_effect(
effects,
&invocation_id,
&invocation_metadata,
nbis_effect,
)
.await?
}
Ok(())
}
_ => {
trace!(
"Received built in invoker effect for unknown invocation {}. Ignoring it.",
invocation_id
);
Ok(())
}
}
}

async fn on_built_in_invoker_effect(
&mut self,
effects: &mut Effects,
invocation_id: &InvocationId,
invocation_metadata: &InFlightInvocationMetadata,
nbis_effect: BuiltinServiceEffect,
) -> Result<(), Error> {
match nbis_effect {
BuiltinServiceEffect::SetState { key, value } => {
effects.set_state(
invocation_metadata
.invocation_target
.as_keyed_service_id()
.expect("Non deterministic built in services clearing state MUST be keyed"),
*invocation_id,
invocation_metadata.journal_metadata.span_context.clone(),
Bytes::from(key.into_owned()),
value,
);
}
BuiltinServiceEffect::ClearState(key) => {
effects.clear_state(
invocation_metadata
.invocation_target
.as_keyed_service_id()
.expect("Non deterministic built in services clearing state MUST be keyed"),
*invocation_id,
invocation_metadata.journal_metadata.span_context.clone(),
Bytes::from(key.into_owned()),
);
}
BuiltinServiceEffect::OutboxMessage(msg) => {
self.handle_outgoing_message(msg, effects);
}
BuiltinServiceEffect::End(None) => {
self.end_built_in_invocation(effects, *invocation_id, invocation_metadata.clone())
.await?
}
BuiltinServiceEffect::End(Some(e)) => {
self.fail_invocation(effects, *invocation_id, invocation_metadata.clone(), e)
.await?
}
BuiltinServiceEffect::IngressResponse(ingress_response) => {
self.ingress_response(ingress_response, effects);
}
}

Ok(())
}

async fn try_terminate_invocation<State: StateReader>(
&mut self,
InvocationTermination {
Expand Down Expand Up @@ -1141,29 +1053,6 @@ where
Ok(())
}

// This needs a different method because for built-in invocations we send back the output as soon as we have it.
async fn end_built_in_invocation(
&mut self,
effects: &mut Effects,
invocation_id: InvocationId,
invocation_metadata: InFlightInvocationMetadata,
) -> Result<(), Error> {
self.notify_invocation_result(
invocation_id,
invocation_metadata.invocation_target.clone(),
invocation_metadata.journal_metadata.span_context,
invocation_metadata.timestamps.creation_time(),
Ok(()),
effects,
);

effects.free_invocation(invocation_id);
effects.drop_journal(invocation_id, invocation_metadata.journal_metadata.length);
Self::try_pop_inbox(effects, &invocation_metadata.invocation_target);

Ok(())
}

async fn fail_invocation(
&mut self,
effects: &mut Effects,
Expand Down

0 comments on commit ce1edbd

Please sign in to comment.