diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6ab4ed465..85cedabe5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -31,6 +31,21 @@ jobs: with: submodules: recursive + - uses: depot/setup-action@v1 + - name: Build dependency images + uses: depot/bake-action@v1 + with: + files: | + ./docker-bake.hcl + ./docker-bake.override.hcl + ./docker-bake.platforms.hcl + targets: | + rollups-node-devnet + rollups-node-snapshot + project: ${{ vars.DEPOT_PROJECT }} + workdir: build + load: true + - name: 📦 Install protoc run: sudo apt update && sudo apt install -y protobuf-compiler libprotobuf-dev @@ -83,20 +98,6 @@ jobs: with: go-version-file: 'go.mod' - - uses: depot/setup-action@v1 - - - name: Build and load devnet image - uses: depot/bake-action@v1 - with: - files: | - ./docker-bake.hcl - ./docker-bake.override.hcl - ./docker-bake.platforms.hcl - targets: rollups-node-devnet - project: ${{ vars.DEPOT_PROJECT }} - workdir: build - load: true - - name: Run Go tests working-directory: ${{ github.workspace }} run: go test ./... diff --git a/CHANGELOG.md b/CHANGELOG.md index d91c0e31a..c61c7755b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 instead of /opt/cartesi/bin. - Changed the base Docker image to debian-bookworm instead cartesi/server-manager. +### Removed + +- Removed snapshot-saving feature. Now, the node will always start from the beginning. + ## [1.2.0] ### Added diff --git a/build/Dockerfile b/build/Dockerfile index 905d72438..33cfa3f89 100644 --- a/build/Dockerfile +++ b/build/Dockerfile @@ -20,13 +20,14 @@ ARG LINUX_KERNEL_VERSION ARG ROM_VERSION # Build directories. -ARG MACHINE_SNAPSHOT_BUILD_PATH=/build/machine-snapshot +ARG SNAPSHOT_BUILD_PATH=/build/snapshot ARG DEVNET_BUILD_PATH=/build/devnet ARG RUST_BUILD_PATH=/build/rollups-node/rust ARG GO_BUILD_PATH=/build/rollups-node/go # Runtime dir for the cartesi-machine snapshot. -ARG MACHINE_SNAPSHOT_RUNTIME_PATH=/usr/share/cartesi/snapshots +ARG SNAPSHOT_RUNTIME_PATH=/usr/share/cartesi/snapshots +ARG SNAPSHOT_FILENAME=test-application #################################################################################################### # STAGE: emulator-base @@ -87,12 +88,13 @@ ADD https://github.com/cartesi/image-kernel/releases/download/v${LINUX_VERSION}/ ADD https://github.com/cartesi/machine-emulator-rom/releases/download/v${ROM_VERSION}/rom-v${ROM_VERSION}.bin rom.bin # Generate snapshot with echo and store it. -ARG MACHINE_SNAPSHOT_BUILD_PATH -WORKDIR ${MACHINE_SNAPSHOT_BUILD_PATH} +ARG SNAPSHOT_BUILD_PATH +ARG SNAPSHOT_FILENAME +WORKDIR ${SNAPSHOT_BUILD_PATH} RUN cartesi-machine \ --ram-length=128Mi \ --rollup \ - --store=$MACHINE_SNAPSHOT_BUILD_PATH/snapshot \ + --store=$SNAPSHOT_BUILD_PATH/$SNAPSHOT_FILENAME \ -- "ioctl-echo-loop --vouchers=1 --notices=1 --reports=1 --verbose=1" # STAGE: rollups-node-snapshot @@ -103,14 +105,13 @@ RUN cartesi-machine \ FROM emulator-base as rollups-node-snapshot # Copy image from the builder stage. -ARG MACHINE_SNAPSHOT_BUILD_PATH -ARG MACHINE_SNAPSHOT_RUNTIME_PATH -WORKDIR ${MACHINE_SNAPSHOT_RUNTIME_PATH} -COPY --from=snapshot-builder --chown=cartesi:cartesi ${MACHINE_SNAPSHOT_BUILD_PATH}/snapshot ./0_0 -RUN < = std::result::Result; @@ -34,6 +41,7 @@ pub struct BrokerFacade { outputs_stream: RollupsOutputsStream, claims_stream: RollupsClaimsStream, reader_mode: bool, + last_id: String, } impl BrokerFacade { @@ -54,63 +62,28 @@ impl BrokerFacade { outputs_stream, claims_stream, reader_mode, + last_id: INITIAL_ID.to_owned(), }) } - /// Search the input event stream for the finish epoch event of the previous epoch - #[tracing::instrument(level = "trace", skip_all)] - pub async fn find_previous_finish_epoch( - &mut self, - mut epoch: u64, - ) -> Result { - tracing::trace!(epoch, "getting previous finish epoch"); - - if epoch == 0 { - tracing::trace!("returning initial id for epoch 0"); - return Ok(INITIAL_ID.to_owned()); - } else { - // This won't underflow because we know the epoch is not 0 - epoch -= 1; - } - - tracing::trace!(epoch, "searching for finish epoch input event"); - let mut last_id = INITIAL_ID.to_owned(); - loop { - let event = self - .client - .consume_nonblocking(&self.inputs_stream, &last_id) - .await - .context(ConsumeSnafu)? - .ok_or(BrokerFacadeError::FindFinishEpochInputError { - epoch, - })?; - if matches!( - event.payload, - RollupsInput { - data: RollupsData::FinishEpoch {}, - epoch_index, - .. - } if epoch_index == epoch - ) { - tracing::trace!(event_id = last_id, "returning event id"); - return Ok(event.id); - } - last_id = event.id; - } - } - /// Consume rollups input event #[tracing::instrument(level = "trace", skip_all)] - pub async fn consume_input( - &mut self, - last_id: &str, - ) -> Result> { - tracing::trace!(last_id, "consuming rollups input event"); - let result = self + pub async fn consume_input(&mut self) -> Result { + tracing::trace!(self.last_id, "consuming rollups input event"); + let event = self .client - .consume_blocking(&self.inputs_stream, last_id) - .await; - result.context(BrokerInternalSnafu) + .consume_blocking(&self.inputs_stream, &self.last_id) + .await + .context(BrokerInternalSnafu)?; + if event.payload.parent_id != self.last_id { + Err(BrokerFacadeError::ParentIdMismatchError { + expected: self.last_id.to_owned(), + got: event.payload.parent_id, + }) + } else { + self.last_id = event.id; + Ok(event.payload) + } } /// Produce the rollups claim if it isn't in the stream yet @@ -180,7 +153,7 @@ mod tests { use backoff::ExponentialBackoff; use rollups_events::{ Address, DAppMetadata, Hash, InputMetadata, Payload, - RollupsAdvanceStateInput, ADDRESS_SIZE, HASH_SIZE, + RollupsAdvanceStateInput, RollupsData, ADDRESS_SIZE, HASH_SIZE, }; use test_fixtures::BrokerFixture; use testcontainers::clients::Cli; @@ -210,54 +183,6 @@ mod tests { } } - #[test_log::test(tokio::test)] - async fn test_it_finds_previous_finish_of_first_epoch() { - let docker = Cli::default(); - let mut state = TestState::setup(&docker).await; - let id = state.facade.find_previous_finish_epoch(0).await.unwrap(); - assert_eq!(id, INITIAL_ID); - } - - #[test_log::test(tokio::test)] - async fn test_it_finds_previous_finish_of_nth_epoch() { - let docker = Cli::default(); - let mut state = TestState::setup(&docker).await; - let inputs = - vec![RollupsData::FinishEpoch {}, RollupsData::FinishEpoch {}]; - let mut ids = Vec::new(); - for input in inputs { - ids.push(state.fixture.produce_input_event(input).await); - } - assert_eq!( - state.facade.find_previous_finish_epoch(1).await.unwrap(), - ids[0] - ); - assert_eq!( - state.facade.find_previous_finish_epoch(2).await.unwrap(), - ids[1] - ); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_find_previous_epoch_when_it_is_missing() { - let docker = Cli::default(); - let mut state = TestState::setup(&docker).await; - let inputs = - vec![RollupsData::FinishEpoch {}, RollupsData::FinishEpoch {}]; - let mut ids = Vec::new(); - for input in inputs { - ids.push(state.fixture.produce_input_event(input).await); - } - assert!(matches!( - state - .facade - .find_previous_finish_epoch(3) - .await - .unwrap_err(), - BrokerFacadeError::FindFinishEpochInputError { epoch: 2 } - )); - } - #[test_log::test(tokio::test)] async fn test_it_consumes_inputs() { let docker = Cli::default(); @@ -288,40 +213,31 @@ mod tests { ids.push(state.fixture.produce_input_event(input.clone()).await); } assert_eq!( - state.facade.consume_input(INITIAL_ID).await.unwrap(), - Event { - id: ids[0].clone(), - payload: RollupsInput { - parent_id: INITIAL_ID.to_owned(), - epoch_index: 0, - inputs_sent_count: 1, - data: inputs[0].clone(), - }, - } + state.facade.consume_input().await.unwrap(), + RollupsInput { + parent_id: INITIAL_ID.to_owned(), + epoch_index: 0, + inputs_sent_count: 1, + data: inputs[0].clone(), + }, ); assert_eq!( - state.facade.consume_input(&ids[0]).await.unwrap(), - Event { - id: ids[1].clone(), - payload: RollupsInput { - parent_id: ids[0].clone(), - epoch_index: 0, - inputs_sent_count: 1, - data: inputs[1].clone(), - }, - } + state.facade.consume_input().await.unwrap(), + RollupsInput { + parent_id: ids[0].clone(), + epoch_index: 0, + inputs_sent_count: 1, + data: inputs[1].clone(), + }, ); assert_eq!( - state.facade.consume_input(&ids[1]).await.unwrap(), - Event { - id: ids[2].clone(), - payload: RollupsInput { - parent_id: ids[1].clone(), - epoch_index: 1, - inputs_sent_count: 2, - data: inputs[2].clone(), - }, - } + state.facade.consume_input().await.unwrap(), + RollupsInput { + parent_id: ids[1].clone(), + epoch_index: 1, + inputs_sent_count: 2, + data: inputs[2].clone(), + }, ); } diff --git a/offchain/advance-runner/src/config.rs b/offchain/advance-runner/src/config.rs index e38c70eee..3d16e4f4f 100644 --- a/offchain/advance-runner/src/config.rs +++ b/offchain/advance-runner/src/config.rs @@ -2,13 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use clap::Parser; -use snafu::{ResultExt, Snafu}; use std::time::Duration; use crate::server_manager::ServerManagerCLIConfig; pub use crate::server_manager::ServerManagerConfig; -pub use crate::snapshot::config::{FSManagerConfig, SnapshotConfig}; -use crate::snapshot::config::{SnapshotCLIConfig, SnapshotConfigError}; use log::{LogConfig, LogEnvCliConfig}; pub use rollups_events::{ BrokerCLIConfig, BrokerConfig, DAppMetadata, DAppMetadataCLIConfig, @@ -19,7 +16,6 @@ pub struct AdvanceRunnerConfig { pub server_manager_config: ServerManagerConfig, pub broker_config: BrokerConfig, pub dapp_metadata: DAppMetadata, - pub snapshot_config: SnapshotConfig, pub log_config: LogConfig, pub backoff_max_elapsed_duration: Duration, pub healthcheck_port: u16, @@ -27,18 +23,13 @@ pub struct AdvanceRunnerConfig { } impl AdvanceRunnerConfig { - pub fn parse() -> Result { + pub fn parse() -> Self { let cli_config = CLIConfig::parse(); let broker_config = cli_config.broker_cli_config.into(); let dapp_metadata: DAppMetadata = cli_config.dapp_metadata_cli_config.into(); let server_manager_config = ServerManagerConfig::parse_from_cli(cli_config.sm_cli_config); - let snapshot_config = SnapshotConfig::new( - cli_config.snapshot_cli_config, - dapp_metadata.dapp_address.clone(), - ) - .context(SnapshotConfigSnafu)?; let log_config = LogConfig::initialize(cli_config.log_cli_config); @@ -49,25 +40,18 @@ impl AdvanceRunnerConfig { let reader_mode = cli_config.reader_mode; - Ok(Self { + Self { server_manager_config, broker_config, dapp_metadata, - snapshot_config, log_config, backoff_max_elapsed_duration, healthcheck_port, reader_mode, - }) + } } } -#[derive(Debug, Snafu)] -pub enum ConfigError { - #[snafu(display("error in snapshot configuration"))] - SnapshotConfigError { source: SnapshotConfigError }, -} - #[derive(Parser)] #[command(name = "advance_runner_config")] #[command(about = "Configuration for advance-runner")] @@ -81,9 +65,6 @@ struct CLIConfig { #[command(flatten)] dapp_metadata_cli_config: DAppMetadataCLIConfig, - #[command(flatten)] - snapshot_cli_config: SnapshotCLIConfig, - #[command(flatten)] pub log_cli_config: LogEnvCliConfig, diff --git a/offchain/advance-runner/src/dapp_contract.rs b/offchain/advance-runner/src/dapp_contract.rs deleted file mode 100644 index ad3b9c8b9..000000000 --- a/offchain/advance-runner/src/dapp_contract.rs +++ /dev/null @@ -1,44 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use contracts::cartesi_dapp::CartesiDApp; -use ethers::{ - prelude::ContractError, - providers::{Http, HttpRateLimitRetryPolicy, Provider, RetryClient}, -}; -use rollups_events::{Address, Hash}; -use snafu::{ResultExt, Snafu}; -use std::sync::Arc; -use url::Url; - -const MAX_RETRIES: u32 = 10; -const INITIAL_BACKOFF: u64 = 1000; - -#[derive(Debug, Snafu)] -#[snafu(display("failed to obtain hash from dapp contract"))] -pub struct DappContractError { - source: ContractError>>, -} - -pub async fn get_template_hash( - dapp_address: &Address, - provider_http_endpoint: Url, -) -> Result { - let provider = Provider::new(RetryClient::new( - Http::new(provider_http_endpoint), - Box::new(HttpRateLimitRetryPolicy), - MAX_RETRIES, - INITIAL_BACKOFF, - )); - - let cartesi_dapp = - CartesiDApp::new(dapp_address.inner(), Arc::new(provider)); - - let template_hash = cartesi_dapp - .get_template_hash() - .call() - .await - .context(DappContractSnafu)?; - - Ok(Hash::new(template_hash)) -} diff --git a/offchain/advance-runner/src/error.rs b/offchain/advance-runner/src/error.rs index 135b9e57d..e8d0cd4e6 100644 --- a/offchain/advance-runner/src/error.rs +++ b/offchain/advance-runner/src/error.rs @@ -5,9 +5,6 @@ use snafu::Snafu; use crate::{broker, runner, server_manager}; -use crate::snapshot::disabled::SnapshotDisabledError; -use crate::snapshot::fs_manager::FSSnapshotError; - #[derive(Debug, Snafu)] #[snafu(visibility(pub(crate)))] pub enum AdvanceRunnerError { @@ -24,13 +21,6 @@ pub enum AdvanceRunnerError { #[snafu(display("broker error"))] BrokerError { source: broker::BrokerFacadeError }, - #[snafu(display("advance runner error"))] - RunnerFSSnapshotError { - source: runner::RunnerError, - }, - - #[snafu(display("advance runner error"))] - RunnerSnapshotDisabledError { - source: runner::RunnerError, - }, + #[snafu(display("runner error"))] + RunnerError { source: runner::RunnerError }, } diff --git a/offchain/advance-runner/src/lib.rs b/offchain/advance-runner/src/lib.rs index 96c7913be..b1a2366d9 100644 --- a/offchain/advance-runner/src/lib.rs +++ b/offchain/advance-runner/src/lib.rs @@ -7,20 +7,16 @@ use config::AdvanceRunnerConfig; use runner::Runner; use server_manager::ServerManagerFacade; use snafu::ResultExt; -use snapshot::{ - config::SnapshotConfig, disabled::SnapshotDisabled, - fs_manager::FSSnapshotManager, -}; +pub use broker::BrokerFacadeError; pub use error::AdvanceRunnerError; +pub use runner::RunnerError; mod broker; pub mod config; -mod dapp_contract; mod error; pub mod runner; mod server_manager; -mod snapshot; #[tracing::instrument(level = "trace", skip_all)] pub async fn run( @@ -64,18 +60,7 @@ async fn start_advance_runner( .context(error::BrokerSnafu)?; tracing::trace!("connected the broker"); - match config.snapshot_config { - SnapshotConfig::FileSystem(fs_manager_config) => { - let snapshot_manager = FSSnapshotManager::new(fs_manager_config); - Runner::start(server_manager, broker, snapshot_manager) - .await - .context(error::RunnerFSSnapshotSnafu) - } - SnapshotConfig::Disabled => { - let snapshot_manager = SnapshotDisabled {}; - Runner::start(server_manager, broker, snapshot_manager) - .await - .context(error::RunnerSnapshotDisabledSnafu) - } - } + Runner::start(server_manager, broker) + .await + .context(error::RunnerSnafu) } diff --git a/offchain/advance-runner/src/main.rs b/offchain/advance-runner/src/main.rs index 4f57a2f66..60826dd3f 100644 --- a/offchain/advance-runner/src/main.rs +++ b/offchain/advance-runner/src/main.rs @@ -5,7 +5,7 @@ use advance_runner::config::AdvanceRunnerConfig; #[tokio::main] async fn main() -> Result<(), Box> { - let config = AdvanceRunnerConfig::parse()?; + let config = AdvanceRunnerConfig::parse(); log::configure(&config.log_config); diff --git a/offchain/advance-runner/src/runner.rs b/offchain/advance-runner/src/runner.rs index 032454ea2..ccd09479b 100644 --- a/offchain/advance-runner/src/runner.rs +++ b/offchain/advance-runner/src/runner.rs @@ -1,18 +1,14 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -use rollups_events::{Event, InputMetadata, RollupsData, RollupsInput}; +use rollups_events::{InputMetadata, RollupsData}; use snafu::{ResultExt, Snafu}; use crate::broker::{BrokerFacade, BrokerFacadeError}; use crate::server_manager::{ServerManagerError, ServerManagerFacade}; -use crate::snapshot::SnapshotManager; #[derive(Debug, Snafu)] -pub enum RunnerError { - #[snafu(display("failed to to create session in server-manager"))] - CreateSessionError { source: ServerManagerError }, - +pub enum RunnerError { #[snafu(display("failed to send advance-state input to server-manager"))] AdvanceError { source: ServerManagerError }, @@ -36,139 +32,51 @@ pub enum RunnerError { #[snafu(display("failed to produce outputs in broker"))] ProduceOutputsError { source: BrokerFacadeError }, - - #[snafu(display("failed to get storage directory"))] - GetStorageDirectoryError { source: SnapError }, - - #[snafu(display("failed to get latest snapshot"))] - GetLatestSnapshotError { source: SnapError }, - - #[snafu(display("failed to set latest snapshot"))] - SetLatestSnapshotError { source: SnapError }, - - #[snafu(display( - "parent id doesn't match expected={} got={}", - expected, - got - ))] - ParentIdMismatchError { expected: String, got: String }, - - #[snafu(display("failed to validate snapshot"))] - ValidateSnapshotError { source: SnapError }, } -type Result = std::result::Result>; +type Result = std::result::Result; -pub struct Runner { +pub struct Runner { server_manager: ServerManagerFacade, broker: BrokerFacade, - snapshot_manager: Snap, } -impl Runner { +impl Runner { #[tracing::instrument(level = "trace", skip_all)] pub async fn start( server_manager: ServerManagerFacade, broker: BrokerFacade, - snapshot_manager: Snap, - ) -> Result<(), Snap::Error> { + ) -> Result<()> { let mut runner = Self { server_manager, broker, - snapshot_manager, }; - let mut last_id = runner.setup().await?; - tracing::info!(last_id, "starting runner main loop"); + tracing::info!("starting runner main loop"); loop { - let event = runner.consume_next(&last_id).await?; + let event = runner + .broker + .consume_input() + .await + .context(ConsumeInputSnafu)?; tracing::info!(?event, "consumed input event"); - match event.payload.data { + match event.data { RollupsData::AdvanceStateInput(input) => { runner .handle_advance( - event.payload.epoch_index, - event.payload.inputs_sent_count, + event.epoch_index, + event.inputs_sent_count, input.metadata, input.payload.into_inner(), ) .await?; } RollupsData::FinishEpoch {} => { - runner - .handle_finish( - event.payload.epoch_index, - event.payload.inputs_sent_count, - ) - .await?; + runner.handle_finish(event.epoch_index).await?; } } - - last_id = event.id; - tracing::info!(last_id, "waiting for the next input event"); - } - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn setup(&mut self) -> Result { - tracing::trace!("setting up runner"); - - let snapshot = self - .snapshot_manager - .get_latest() - .await - .context(GetLatestSnapshotSnafu)?; - tracing::info!(?snapshot, "got latest snapshot"); - - if snapshot.is_template() { - self.snapshot_manager - .validate(&snapshot) - .await - .context(ValidateSnapshotSnafu)?; - tracing::info!("template snapshot is valid"); - } - - let event_id = self - .broker - .find_previous_finish_epoch(snapshot.epoch) - .await - .context(FindFinishEpochInputSnafu)?; - tracing::trace!(event_id, "found finish epoch input event"); - - self.server_manager - .start_session( - &snapshot.path, - snapshot.epoch, - snapshot.processed_input_count, - ) - .await - .context(CreateSessionSnafu)?; - - Ok(event_id) - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn consume_next( - &mut self, - last_id: &str, - ) -> Result, Snap::Error> { - tracing::trace!("consuming next event input"); - - let event = self - .broker - .consume_input(last_id) - .await - .context(ConsumeInputSnafu)?; - tracing::trace!("input event consumed from broker"); - - if event.payload.parent_id != last_id { - Err(RunnerError::ParentIdMismatchError { - expected: last_id.to_owned(), - got: event.payload.parent_id, - }) - } else { - Ok(event) + tracing::info!("waiting for the next input event"); } } @@ -179,7 +87,7 @@ impl Runner { inputs_sent_count: u64, input_metadata: InputMetadata, input_payload: Vec, - ) -> Result<(), Snap::Error> { + ) -> Result<()> { tracing::trace!("handling advance state"); let input_index = inputs_sent_count - 1; @@ -205,25 +113,10 @@ impl Runner { } #[tracing::instrument(level = "trace", skip_all)] - async fn handle_finish( - &mut self, - epoch_index: u64, - inputs_sent_count: u64, - ) -> Result<(), Snap::Error> { + async fn handle_finish(&mut self, epoch_index: u64) -> Result<()> { tracing::trace!("handling finish"); - // We add one to the epoch index because the snapshot is for the one after we are closing - let snapshot = self - .snapshot_manager - .get_storage_directory(epoch_index + 1, inputs_sent_count) - .await - .context(GetStorageDirectorySnafu)?; - tracing::trace!(?snapshot, "got storage directory"); - - let result = self - .server_manager - .finish_epoch(epoch_index, &snapshot.path) - .await; + let result = self.server_manager.finish_epoch(epoch_index).await; tracing::trace!("finished epoch in server-manager"); match result { @@ -248,13 +141,6 @@ impl Runner { } } } - - self.snapshot_manager - .set_latest(snapshot) - .await - .context(SetLatestSnapshotSnafu)?; - tracing::trace!("set latest snapshot"); - Ok(()) } } diff --git a/offchain/advance-runner/src/server_manager/config.rs b/offchain/advance-runner/src/server_manager/config.rs index 288f65b57..642e7daea 100644 --- a/offchain/advance-runner/src/server_manager/config.rs +++ b/offchain/advance-runner/src/server_manager/config.rs @@ -11,6 +11,7 @@ use grpc_interfaces::cartesi_server_manager::{CyclesConfig, DeadlineConfig}; #[derive(Debug, Clone)] pub struct ServerManagerConfig { pub server_manager_endpoint: String, + pub machine_snapshot_path: String, pub max_decoding_message_size: usize, pub session_id: String, pub pending_inputs_sleep_duration: u64, @@ -53,6 +54,7 @@ impl ServerManagerConfig { Self { server_manager_endpoint: cli_config.server_manager_endpoint, + machine_snapshot_path: cli_config.machine_snapshot_path, max_decoding_message_size: cli_config.max_decoding_message_size, session_id: cli_config.session_id, pending_inputs_sleep_duration: cli_config @@ -73,6 +75,10 @@ pub struct ServerManagerCLIConfig { #[arg(long, env, default_value = "http://127.0.0.1:5001")] pub server_manager_endpoint: String, + /// Path to the machine snapshot + #[arg(long, env, default_value = "")] + pub machine_snapshot_path: String, + /// Maximum size of a decoded message #[arg(long, env, default_value_t = 100 * 1024 * 1024)] pub max_decoding_message_size: usize, diff --git a/offchain/advance-runner/src/server_manager/facade.rs b/offchain/advance-runner/src/server_manager/facade.rs index 156e8a14b..c37375635 100644 --- a/offchain/advance-runner/src/server_manager/facade.rs +++ b/offchain/advance-runner/src/server_manager/facade.rs @@ -7,7 +7,6 @@ use rollups_events::{ RollupsClaim, RollupsNotice, RollupsOutput, RollupsReport, RollupsVoucher, }; use snafu::{OptionExt, ResultExt}; -use std::path::Path; use tonic::{transport::Channel, Request}; use uuid::Uuid; @@ -108,75 +107,67 @@ impl ServerManagerFacade { .context(ConnectionSnafu)? .max_decoding_message_size(config.max_decoding_message_size); - Ok(Self { + let mut sm_facade = Self { dapp_address, client, config, backoff, - }) - } - - #[tracing::instrument(level = "trace", skip_all)] - pub async fn start_session( - &mut self, - machine_directory: &Path, - active_epoch_index: u64, - processed_input_count: u64, - ) -> Result<()> { - tracing::trace!( - ?machine_directory, - active_epoch_index, - "starting server-manager session" - ); + }; // If session exists, delete it before creating new one - let response = grpc_call!(self, get_status, Void {})?; - if response.session_id.contains(&self.config.session_id) { + let response = grpc_call!(sm_facade, get_status, Void {})?; + if response.session_id.contains(&sm_facade.config.session_id) { tracing::warn!("deleting previous server-manager session"); let session_status = grpc_call!( - self, + sm_facade, get_session_status, GetSessionStatusRequest { - session_id: self.config.session_id.clone(), + session_id: sm_facade.config.session_id.clone(), } )?; let active_epoch_index = session_status.active_epoch_index; - let processed_input_count_within_epoch = - self.wait_for_pending_inputs(active_epoch_index) - .await? - .len() as u64; + let processed_input_count_within_epoch = sm_facade + .wait_for_pending_inputs(active_epoch_index) + .await? + .len() + as u64; grpc_call!( - self, + sm_facade, finish_epoch, FinishEpochRequest { - session_id: self.config.session_id.clone(), + session_id: sm_facade.config.session_id.clone(), active_epoch_index, processed_input_count_within_epoch, storage_directory: "".to_string(), } )?; grpc_call!( - self, + sm_facade, end_session, EndSessionRequest { - session_id: self.config.session_id.clone(), + session_id: sm_facade.config.session_id.clone(), } )?; } - grpc_call!(self, start_session, { + tracing::trace!("starting server-manager session"); + + grpc_call!(sm_facade, start_session, { StartSessionRequest { - session_id: self.config.session_id.clone(), - machine_directory: machine_directory.to_string_lossy().into(), - runtime: Some(self.config.runtime_config.clone()), - active_epoch_index, - processed_input_count, - server_cycles: Some(self.config.cycles_config.clone()), - server_deadline: Some(self.config.deadline_config.clone()), + session_id: sm_facade.config.session_id.clone(), + machine_directory: sm_facade + .config + .machine_snapshot_path + .clone(), + runtime: Some(sm_facade.config.runtime_config.clone()), + active_epoch_index: 0, + processed_input_count: 0, + server_cycles: Some(sm_facade.config.cycles_config.clone()), + server_deadline: Some(sm_facade.config.deadline_config.clone()), } })?; - Ok(()) + Ok(sm_facade) } #[tracing::instrument(level = "trace", skip_all)] @@ -286,7 +277,6 @@ impl ServerManagerFacade { pub async fn finish_epoch( &mut self, epoch_index: u64, - storage_directory: &Path, ) -> Result<(RollupsClaim, Vec)> { tracing::trace!(epoch_index, "sending finish epoch"); @@ -300,9 +290,7 @@ impl ServerManagerFacade { session_id: self.config.session_id.to_owned(), active_epoch_index: epoch_index, processed_input_count_within_epoch, - storage_directory: storage_directory - .to_string_lossy() - .to_string(), + storage_directory: "".to_owned(), } })?; diff --git a/offchain/advance-runner/src/snapshot/config.rs b/offchain/advance-runner/src/snapshot/config.rs deleted file mode 100644 index 01684e126..000000000 --- a/offchain/advance-runner/src/snapshot/config.rs +++ /dev/null @@ -1,96 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use clap::Parser; -use rollups_events::Address; -use snafu::{ensure, Snafu}; -use std::path::PathBuf; -use url::Url; - -#[derive(Debug, Clone)] -pub struct FSManagerConfig { - pub snapshot_dir: PathBuf, - pub snapshot_latest: PathBuf, - pub validation_enabled: bool, - pub provider_http_endpoint: Option, - pub dapp_address: Address, -} - -#[derive(Debug, Clone)] -pub enum SnapshotConfig { - FileSystem(FSManagerConfig), - Disabled, -} - -impl SnapshotConfig { - pub fn new( - cli_config: SnapshotCLIConfig, - dapp_address: Address, - ) -> Result { - if cli_config.snapshot_enabled { - let snapshot_dir = PathBuf::from(cli_config.snapshot_dir); - ensure!(snapshot_dir.is_dir(), DirSnafu); - - let snapshot_latest = PathBuf::from(cli_config.snapshot_latest); - ensure!(snapshot_latest.is_symlink(), SymlinkSnafu); - - let validation_enabled = cli_config.snapshot_validation_enabled; - if validation_enabled { - ensure!( - cli_config.provider_http_endpoint.is_some(), - NoProviderEndpointSnafu, - ); - } - - let provider_http_endpoint = cli_config.provider_http_endpoint; - - Ok(SnapshotConfig::FileSystem(FSManagerConfig { - snapshot_dir, - snapshot_latest, - validation_enabled, - provider_http_endpoint, - dapp_address, - })) - } else { - Ok(SnapshotConfig::Disabled) - } - } -} - -#[derive(Debug, Snafu)] -#[allow(clippy::enum_variant_names)] -pub enum SnapshotConfigError { - #[snafu(display("Snapshot dir isn't a directory"))] - DirError {}, - - #[snafu(display("Snapshot latest isn't a symlink"))] - SymlinkError {}, - - #[snafu(display("A provider http endpoint is required"))] - NoProviderEndpointError {}, -} - -#[derive(Clone, Parser, Debug)] -#[command(name = "snapshot")] -pub struct SnapshotCLIConfig { - /// If set to false, disables snapshots. Enabled by default - #[arg(long, env, default_value_t = true)] - snapshot_enabled: bool, - - /// Path to the directory with the snapshots - #[arg(long, env)] - snapshot_dir: String, - - /// Path to the symlink of the latest snapshot - #[arg(long, env)] - snapshot_latest: String, - - /// If set to false, disables snapshot validation. Enabled by default - #[arg(long, env, default_value_t = true)] - snapshot_validation_enabled: bool, - - /// The endpoint for a JSON-RPC provider. - /// Required if SNAPSHOT_VALIDATION_ENABLED is `true` - #[arg(long, env, value_parser = Url::parse)] - provider_http_endpoint: Option, -} diff --git a/offchain/advance-runner/src/snapshot/disabled.rs b/offchain/advance-runner/src/snapshot/disabled.rs deleted file mode 100644 index bc82f6bc1..000000000 --- a/offchain/advance-runner/src/snapshot/disabled.rs +++ /dev/null @@ -1,47 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use super::{Snapshot, SnapshotManager}; - -#[derive(Debug)] -pub struct SnapshotDisabled {} - -#[derive(snafu::Snafu, Debug)] -#[snafu(display("shapshot disabled"))] -pub struct SnapshotDisabledError {} - -#[async_trait::async_trait] -impl SnapshotManager for SnapshotDisabled { - type Error = SnapshotDisabledError; - - /// Get the most recent snapshot - #[tracing::instrument(level = "trace", skip_all)] - async fn get_latest(&self) -> Result { - tracing::trace!("snapshots disabled; returning default"); - Ok(Default::default()) - } - - /// Get the target storage directory for the snapshot - #[tracing::instrument(level = "trace", skip_all)] - async fn get_storage_directory( - &self, - _: u64, - _: u64, - ) -> Result { - tracing::trace!("snapshots disabled; returning default"); - Ok(Default::default()) - } - - /// Set the most recent snapshot - #[tracing::instrument(level = "trace", skip_all)] - async fn set_latest(&self, _: Snapshot) -> Result<(), Self::Error> { - tracing::trace!("snapshots disabled; ignoring"); - Ok(()) - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn validate(&self, _: &Snapshot) -> Result<(), Self::Error> { - tracing::trace!("snapshots disabled; ignoring"); - Ok(()) - } -} diff --git a/offchain/advance-runner/src/snapshot/fs_manager.rs b/offchain/advance-runner/src/snapshot/fs_manager.rs deleted file mode 100644 index d86ddd789..000000000 --- a/offchain/advance-runner/src/snapshot/fs_manager.rs +++ /dev/null @@ -1,637 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use rollups_events::{Hash, HASH_SIZE}; -use snafu::{ensure, OptionExt, ResultExt, Snafu}; -use std::collections::HashSet; -use std::fs::{self, File}; -use std::io::Read; -use std::path::{Path, PathBuf}; - -use crate::dapp_contract::DappContractError; - -use super::config::FSManagerConfig; -use super::{Snapshot, SnapshotManager}; - -const HASH_FILE: &str = "hash"; - -#[derive(Debug, Snafu)] -#[allow(clippy::enum_variant_names)] -pub enum FSSnapshotError { - #[snafu(display("failed to follow latest symlink"))] - ReadLinkError { source: std::io::Error }, - - #[snafu(display("failed to read symlink path ({})", path.display()))] - BrokenLinkError { path: PathBuf }, - - #[snafu(display("failed to get snapshot file name ({})", path.display()))] - DirNameError { path: PathBuf }, - - #[snafu(display( - "failed to parse the {} from snapshot file name ({})", - field, - path.display(), - ))] - ParsingError { - field: String, - path: PathBuf, - source: std::num::ParseIntError, - }, - - #[snafu(display("failed to remove snapshot {}", path.display()))] - RemoveError { - path: PathBuf, - source: std::io::Error, - }, - - #[snafu(display("snapshot on wrong dir {}", path.display()))] - WrongDirError { path: PathBuf }, - - #[snafu(display("snapshot path with invalid epoch {:?}", snapshot))] - InvalidEpochError { snapshot: Snapshot }, - - #[snafu(display( - "snapshot path with invalid processed_input_count {:?}", - snapshot - ))] - InvalidProcessedInputCountError { snapshot: Snapshot }, - - #[snafu(display("failed to read snapshot {}", path.display()))] - NotFoundError { path: PathBuf }, - - #[snafu(display("failed to list snapshots in dir ({})", path.display()))] - ListDirError { - path: PathBuf, - source: std::io::Error, - }, - - #[snafu(display("existing latest path exists but it is not symlink ({})", path.display()))] - LatestNotLinkError { path: PathBuf }, - - #[snafu(display("failed to set latest symlink ({})", path.display()))] - SetLatestError { - path: PathBuf, - source: std::io::Error, - }, - - #[snafu(display("failed to open hash file for snapshot ({})", path.display()))] - OpenHashError { - path: PathBuf, - source: std::io::Error, - }, - - #[snafu(display("failed to read hash file for snapshot ({})", path.display()))] - ReadHashError { - path: PathBuf, - source: std::io::Error, - }, - - #[snafu(display("failed to call the dapp contract"))] - OnchainError { source: DappContractError }, - - #[snafu(display("snapshot hash does not match its on-chain counterpart"))] - HashMismatchError, -} - -#[derive(Debug)] -pub struct FSSnapshotManager { - config: FSManagerConfig, -} - -impl FSSnapshotManager { - pub fn new(config: FSManagerConfig) -> Self { - Self { config } - } -} - -#[async_trait::async_trait] -impl SnapshotManager for FSSnapshotManager { - type Error = FSSnapshotError; - - #[tracing::instrument(level = "trace", skip_all)] - async fn get_latest(&self) -> Result { - tracing::trace!("getting latest snapshot"); - - let path = fs::read_link(&self.config.snapshot_latest) - .context(ReadLinkSnafu)?; - ensure!(path.is_dir(), BrokenLinkSnafu { path }); - tracing::trace!(?path, "followed latest link"); - - path.try_into() - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn get_storage_directory( - &self, - epoch: u64, - processed_input_count: u64, - ) -> Result { - tracing::trace!(epoch, "getting storage directory"); - - let mut path = self.config.snapshot_dir.clone(); - path.push(encode_filename(epoch, processed_input_count)); - tracing::trace!(?path, "computed the path"); - - // Make sure that the target directory for the snapshot doesn't exists - if path.exists() { - tracing::warn!(?path, "storage directory already exists"); - std::fs::remove_dir_all(&path) - .context(RemoveSnafu { path: path.clone() })?; - } - - Ok(Snapshot { - path, - epoch, - processed_input_count, - }) - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn set_latest(&self, snapshot: Snapshot) -> Result<(), Self::Error> { - tracing::trace!(?snapshot, "setting latest snapshot"); - - // basic verifications - let latest = &self.config.snapshot_latest; - let snapshot_dir = &self.config.snapshot_dir; - ensure!( - snapshot.path.parent() == Some(snapshot_dir), - WrongDirSnafu { - path: snapshot.path - } - ); - let (epoch, processed_input_count) = decode_filename(&snapshot.path)?; - ensure!(epoch == snapshot.epoch, InvalidEpochSnafu { snapshot }); - ensure!( - processed_input_count == snapshot.processed_input_count, - InvalidProcessedInputCountSnafu { snapshot } - ); - ensure!( - snapshot.path.is_dir(), - NotFoundSnafu { - path: snapshot.path - } - ); - - // list other snapshots - let mut snapshots = HashSet::new(); - let dir_iterator = - fs::read_dir(snapshot_dir).context(ListDirSnafu { - path: snapshot_dir.clone(), - })?; - for entry in dir_iterator { - let entry = entry.context(ListDirSnafu { - path: snapshot_dir.clone(), - })?; - let path = entry.path(); - if &path != latest && path != snapshot.path { - snapshots.insert(path.to_owned()); - } - } - tracing::trace!(?snapshots, "listed other existing snapshots"); - - // delete previous snapshot link - if latest.exists() { - ensure!( - latest.is_symlink(), - LatestNotLinkSnafu { - path: latest.clone() - } - ); - fs::remove_file(latest).context(SetLatestSnafu { - path: latest.clone(), - })?; - tracing::trace!("deleted previous latest symlink"); - } - - // set latest link - std::os::unix::fs::symlink(&snapshot.path, latest).context( - SetLatestSnafu { - path: latest.clone(), - }, - )?; - tracing::trace!("set latest snapshot"); - - // delete other snapshots - for path in snapshots.iter() { - fs::remove_dir_all(path) - .context(RemoveSnafu { path: path.clone() })?; - } - tracing::trace!("deleted previous snapshots"); - - Ok(()) - } - - #[tracing::instrument(level = "trace", skip_all)] - async fn validate(&self, snapshot: &Snapshot) -> Result<(), Self::Error> { - if self.config.validation_enabled { - let offchain_hash = snapshot.get_hash().await?; - - let onchain_hash = crate::dapp_contract::get_template_hash( - &self.config.dapp_address, - self.config.provider_http_endpoint.clone().unwrap(), - ) - .await - .context(OnchainSnafu)?; - - if offchain_hash == onchain_hash { - Ok(()) - } else { - Err(FSSnapshotError::HashMismatchError) - } - } else { - tracing::trace!("snapshot validation disabled, accepting snapshot"); - Ok(()) - } - } -} - -fn encode_filename(epoch: u64, processed_input_count: u64) -> String { - format!("{}_{}", epoch, processed_input_count) -} - -fn decode_filename(path: &Path) -> Result<(u64, u64), FSSnapshotError> { - let file_name = path - .file_name() - .and_then(|file_name| file_name.to_str()) - .context(DirNameSnafu { - path: path.to_owned(), - })?; - tracing::trace!(file_name, "got snapshot file name"); - - let parts: Vec<_> = file_name.split('_').collect(); - ensure!( - parts.len() == 2, - DirNameSnafu { - path: path.to_owned() - } - ); - let epoch = parts[0].parse::().context(ParsingSnafu { - field: "epoch".to_owned(), - path: path.to_owned(), - })?; - let processed_input_count = - parts[1].parse::().context(ParsingSnafu { - field: "processed_input_count".to_owned(), - path: path.to_owned(), - })?; - - Ok((epoch, processed_input_count)) -} - -impl TryFrom for Snapshot { - type Error = FSSnapshotError; - - fn try_from(path: PathBuf) -> Result { - let (epoch, processed_input_count) = decode_filename(&path)?; - Ok(Snapshot { - path, - epoch, - processed_input_count, - }) - } -} - -impl Snapshot { - /// Reads the binary contents of the hash file in snapshot's directory - /// and converts them to a `Hash`. It assumes the file was created correctly - /// and makes no checks in this regard. - #[tracing::instrument(level = "trace", skip_all)] - pub async fn get_hash(&self) -> Result { - let path = self.path.join(HASH_FILE); - tracing::trace!(?path, "opening hash file at"); - let file = File::open(path.clone()) - .context(OpenHashSnafu { path: path.clone() })?; - - let mut buffer = [0_u8; HASH_SIZE]; - let bytes = file - .take(HASH_SIZE as u64) - .read(&mut buffer) - .context(ReadHashSnafu { path: path.clone() })?; - tracing::trace!("read {bytes} bytes from file"); - Ok(Hash::new(buffer)) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tempfile::TempDir; - - struct TestState { - tempdir: TempDir, - manager: FSSnapshotManager, - } - - impl TestState { - fn setup() -> Self { - let tempdir = - tempfile::tempdir().expect("failed to create temp dir"); - let snapshot_dir = tempdir.path().to_owned(); - let mut snapshot_latest = snapshot_dir.clone(); - snapshot_latest.push("latest"); - let config = FSManagerConfig { - snapshot_dir, - snapshot_latest, - validation_enabled: false, - provider_http_endpoint: None, - dapp_address: Default::default(), - }; - let manager = FSSnapshotManager::new(config); - Self { tempdir, manager } - } - - fn create_snapshot(&self, name: &str) -> PathBuf { - let path = self.tempdir.path().join(name); - fs::create_dir(&path).expect("failed to create dir"); - path - } - - fn list_snapshots_dir(&self) -> Vec { - let mut files = vec![]; - let dir_iterator = fs::read_dir(&self.tempdir.path()).unwrap(); - for entry in dir_iterator { - let entry = entry.unwrap(); - files.push(entry.path()); - } - files.sort(); - files - } - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_read_latest_link() { - let state = TestState::setup(); - let err = state - .manager - .get_latest() - .await - .expect_err("get latest should fail"); - assert!(matches!(err, FSSnapshotError::ReadLinkError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_get_latest_when_link_is_broken() { - let state = TestState::setup(); - std::os::unix::fs::symlink( - state.tempdir.path().join("0_0"), - state.tempdir.path().join("latest"), - ) - .expect("failed to create link"); - let err = state - .manager - .get_latest() - .await - .expect_err("get latest should fail"); - assert!(matches!(err, FSSnapshotError::BrokenLinkError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_get_latest_when_dirname_is_wrong() { - let state = TestState::setup(); - state.create_snapshot("invalid-name"); - std::os::unix::fs::symlink( - state.tempdir.path().join("invalid-name"), - state.tempdir.path().join("latest"), - ) - .expect("failed to create link"); - let err = state - .manager - .get_latest() - .await - .expect_err("get latest should fail"); - assert!(matches!(err, FSSnapshotError::DirNameError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_get_latest_snapshot() { - let state = TestState::setup(); - state.create_snapshot("0_0"); - state.create_snapshot("1_0"); - state.create_snapshot("2_0"); - std::os::unix::fs::symlink( - state.tempdir.path().join("1_0"), - state.tempdir.path().join("latest"), - ) - .expect("failed to create link"); - let snapshot = state - .manager - .get_latest() - .await - .expect("failed to get latest"); - assert_eq!( - snapshot, - Snapshot { - path: state.tempdir.path().join("1_0"), - epoch: 1, - processed_input_count: 0, - } - ); - } - - #[test_log::test(tokio::test)] - async fn test_it_gets_storage_when_snapshot_does_not_exist() { - let state = TestState::setup(); - let storage_directory = state - .manager - .get_storage_directory(0, 0) - .await - .expect("get storage directory should not fail"); - assert_eq!( - storage_directory, - Snapshot { - path: state.tempdir.path().join("0_0"), - epoch: 0, - processed_input_count: 0, - } - ); - assert!(state.list_snapshots_dir().is_empty()); - } - - #[test_log::test(tokio::test)] - async fn test_it_gets_storage_when_snapshot_already_exists() { - let state = TestState::setup(); - state.create_snapshot("0_0"); - state.create_snapshot("1_0"); - state.create_snapshot("2_0"); - let storage_directory = state - .manager - .get_storage_directory(2, 0) - .await - .expect("get storage directory should not fail"); - assert_eq!( - storage_directory, - Snapshot { - path: state.tempdir.path().join("2_0"), - epoch: 2, - processed_input_count: 0, - } - ); - assert_eq!( - state.list_snapshots_dir(), - vec![ - state.tempdir.path().join("0_0"), - state.tempdir.path().join("1_0"), - ] - ); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_set_latest_when_path_is_not_on_snapshots_dir() { - let state = TestState::setup(); - let path = state.tempdir.path().parent().unwrap().join("0_0"); - let err = state - .manager - .set_latest(Snapshot { - path, - epoch: 0, - processed_input_count: 0, - }) - .await - .expect_err("set latest should fail"); - assert!(matches!(err, FSSnapshotError::WrongDirError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_set_latest_when_epoch_mismatches() { - let state = TestState::setup(); - let err = state - .manager - .set_latest(Snapshot { - path: state.tempdir.path().join("0_0"), - epoch: 1, - processed_input_count: 0, - }) - .await - .expect_err("set latest should fail"); - assert!(matches!(err, FSSnapshotError::InvalidEpochError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_set_latest_when_dir_does_not_exist() { - let state = TestState::setup(); - let err = state - .manager - .set_latest(Snapshot { - path: state.tempdir.path().join("0_0"), - epoch: 0, - processed_input_count: 0, - }) - .await - .expect_err("set latest should fail"); - assert!(matches!(err, FSSnapshotError::NotFoundError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_set_latest_when_latest_is_not_symlink() { - let state = TestState::setup(); - state.create_snapshot("0_0"); - state.create_snapshot("latest"); - let err = state - .manager - .set_latest(Snapshot { - path: state.tempdir.path().join("0_0"), - epoch: 0, - processed_input_count: 0, - }) - .await - .expect_err("set latest should fail"); - assert!(matches!(err, FSSnapshotError::LatestNotLinkError { .. })); - } - - #[test_log::test(tokio::test)] - async fn test_it_sets_latest_snapshot() { - let state = TestState::setup(); - state.create_snapshot("0_0"); - state - .manager - .set_latest(Snapshot { - path: state.tempdir.path().join("0_0"), - epoch: 0, - processed_input_count: 0, - }) - .await - .expect("set latest should work"); - assert_eq!( - state.list_snapshots_dir(), - vec![ - state.tempdir.path().join("0_0"), - state.tempdir.path().join("latest"), - ] - ); - assert_eq!( - fs::read_link(&state.tempdir.path().join("latest")).unwrap(), - state.tempdir.path().join("0_0"), - ); - } - - #[test_log::test(tokio::test)] - async fn test_it_deletes_previous_snapshots_after_setting_latest() { - let state = TestState::setup(); - state.create_snapshot("0_0"); - state.create_snapshot("1_1"); - state.create_snapshot("2_2"); - std::os::unix::fs::symlink( - state.tempdir.path().join("1_1"), - state.tempdir.path().join("latest"), - ) - .expect("failed to create link"); - state - .manager - .set_latest(Snapshot { - path: state.tempdir.path().join("2_2"), - epoch: 2, - processed_input_count: 2, - }) - .await - .expect("set latest should work"); - assert_eq!( - state.list_snapshots_dir(), - vec![ - state.tempdir.path().join("2_2"), - state.tempdir.path().join("latest"), - ] - ); - assert_eq!( - fs::read_link(&state.tempdir.path().join("latest")).unwrap(), - state.tempdir.path().join("2_2"), - ); - } - - #[test_log::test(tokio::test)] - async fn test_it_gets_snapshot_hash() { - let state = TestState::setup(); - let path = state.create_snapshot("0_0"); - let hash_path = path.join(HASH_FILE); - let hash = [ - 160, 170, 75, 88, 113, 141, 144, 31, 252, 78, 159, 6, 79, 114, 6, - 16, 196, 49, 44, 208, 62, 83, 66, 97, 4, 151, 159, 105, 124, 85, - 51, 87, - ]; - fs::write(hash_path, hash).expect("should write hash to file"); - - let snap = Snapshot { - epoch: 0, - processed_input_count: 0, - path, - }; - - assert_eq!( - snap.get_hash().await.expect("get hash should work"), - Hash::new(hash) - ); - } - - #[test_log::test(tokio::test)] - async fn test_it_fails_to_get_hash_when_hash_file_does_not_exist() { - let state = TestState::setup(); - let path = state.create_snapshot("0_0"); - let snap = Snapshot { - epoch: 0, - processed_input_count: 0, - path, - }; - - let err = snap.get_hash().await.expect_err("get hash should fail"); - assert!(matches!(err, FSSnapshotError::OpenHashError { .. })) - } -} diff --git a/offchain/advance-runner/src/snapshot/mod.rs b/offchain/advance-runner/src/snapshot/mod.rs deleted file mode 100644 index 9b8c35acd..000000000 --- a/offchain/advance-runner/src/snapshot/mod.rs +++ /dev/null @@ -1,47 +0,0 @@ -// (c) Cartesi and individual authors (see AUTHORS) -// SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -use std::path::PathBuf; - -pub mod config; -pub mod disabled; -pub mod fs_manager; - -/// A path to a Cartesi Machine snapshot and its metadata -#[derive(Debug, Default, Clone, PartialEq, Eq)] -pub struct Snapshot { - pub path: PathBuf, - pub epoch: u64, - pub processed_input_count: u64, -} - -impl Snapshot { - /// Verifies if this is the template snapshot. The template snapshot is a - /// Cartesi Machine snapshot taken right after its creation and before it - /// processes any inputs - pub fn is_template(&self) -> bool { - self.epoch == 0 && self.processed_input_count == 0 - } -} - -#[async_trait::async_trait] -pub trait SnapshotManager { - type Error: snafu::Error; - - /// Get the most recent snapshot - async fn get_latest(&self) -> Result; - - /// Get the target storage directory for the snapshot - async fn get_storage_directory( - &self, - epoch: u64, - processed_input_count: u64, - ) -> Result; - - /// Set the most recent snapshot - async fn set_latest(&self, snapshot: Snapshot) -> Result<(), Self::Error>; - - /// Compares `Snapshot`'s hash with the template hash stored on-chain, - /// failing if they don't match - async fn validate(&self, snapshot: &Snapshot) -> Result<(), Self::Error>; -} diff --git a/offchain/advance-runner/tests/fixtures/mod.rs b/offchain/advance-runner/tests/fixtures/mod.rs index 83523f7c8..a5f68af10 100644 --- a/offchain/advance-runner/tests/fixtures/mod.rs +++ b/offchain/advance-runner/tests/fixtures/mod.rs @@ -2,8 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 (see LICENSE) use advance_runner::config::{ - AdvanceRunnerConfig, BrokerConfig, DAppMetadata, FSManagerConfig, - ServerManagerConfig, SnapshotConfig, + AdvanceRunnerConfig, BrokerConfig, DAppMetadata, ServerManagerConfig, }; use advance_runner::AdvanceRunnerError; use grpc_interfaces::cartesi_machine::{ @@ -13,7 +12,6 @@ use grpc_interfaces::cartesi_server_manager::{CyclesConfig, DeadlineConfig}; use log::LogConfig; use rollups_events::{Address, BrokerEndpoint}; use std::cell::RefCell; -use std::path::Path; use std::time::Duration; use tokio::task::JoinHandle; @@ -29,7 +27,7 @@ impl AdvanceRunnerFixture { redis_endpoint: BrokerEndpoint, chain_id: u64, dapp_address: Address, - snapshot_dir: Option<&Path>, + snapshot_dir: Option, ) -> Self { let runtime_config = MachineRuntimeConfig { concurrency: Some(ConcurrencyConfig { @@ -57,6 +55,7 @@ impl AdvanceRunnerFixture { let server_manager_config = ServerManagerConfig { server_manager_endpoint, + machine_snapshot_path: snapshot_dir.unwrap_or("".to_owned()), max_decoding_message_size: 100 * 1024 * 1024, session_id, pending_inputs_sleep_duration: 1000, @@ -77,29 +76,12 @@ impl AdvanceRunnerFixture { backoff: Default::default(), }; - let snapshot_config = if snapshot_dir.is_some() { - SnapshotConfig::FileSystem(FSManagerConfig { - snapshot_dir: snapshot_dir - .expect("Should have a Path") - .to_owned(), - snapshot_latest: snapshot_dir - .expect("Should have a Path") - .join("latest"), - validation_enabled: false, - provider_http_endpoint: None, - dapp_address, - }) - } else { - SnapshotConfig::Disabled - }; - let backoff_max_elapsed_duration = Duration::from_millis(1); let config = AdvanceRunnerConfig { server_manager_config, broker_config, dapp_metadata, - snapshot_config, backoff_max_elapsed_duration, healthcheck_port: 0, log_config: LogConfig::default(), diff --git a/offchain/advance-runner/tests/host_integration.rs b/offchain/advance-runner/tests/host_integration.rs index 727a5e0a4..2604c3078 100644 --- a/offchain/advance-runner/tests/host_integration.rs +++ b/offchain/advance-runner/tests/host_integration.rs @@ -142,16 +142,17 @@ async fn advance_runner_fails_when_inputs_has_wrong_parent_id() { state.broker.produce_raw_input_event(input).await; tracing::info!("waiting for the advance_runner to exit with error"); - let _err = state.advance_runner.wait_err().await; + let err = state.advance_runner.wait_err().await; assert!(matches!( - advance_runner::AdvanceRunnerError::RunnerSnapshotDisabledError { - source: - advance_runner::runner::RunnerError::ParentIdMismatchError { - expected: "0".to_owned(), - got: "invalid".to_owned() + err, + advance_runner::AdvanceRunnerError::RunnerError { + source: advance_runner::RunnerError::ConsumeInputError { + source: advance_runner::BrokerFacadeError::ParentIdMismatchError { + expected, + got, } - }, - _err + } + } if expected == "0".to_owned() && got == "invalid".to_owned() )); } diff --git a/offchain/advance-runner/tests/server_integration.rs b/offchain/advance-runner/tests/server_integration.rs index 28c985401..0ef18d97f 100644 --- a/offchain/advance-runner/tests/server_integration.rs +++ b/offchain/advance-runner/tests/server_integration.rs @@ -15,7 +15,7 @@ use testcontainers::clients::Cli; mod fixtures; struct TestState<'d> { - snapshots: MachineSnapshotsFixture, + _snapshots: MachineSnapshotsFixture, broker: BrokerFixture<'d>, server_manager: ServerManagerFixture<'d>, advance_runner: AdvanceRunnerFixture, @@ -26,18 +26,18 @@ impl TestState<'_> { let snapshots = MachineSnapshotsFixture::setup(); let broker = BrokerFixture::setup(docker).await; let server_manager = - ServerManagerFixture::setup(docker, snapshots.path()).await; + ServerManagerFixture::setup(docker, &snapshots.path()).await; let advance_runner = AdvanceRunnerFixture::setup( server_manager.endpoint().to_owned(), server_manager.session_id().to_owned(), broker.redis_endpoint().to_owned(), broker.chain_id(), broker.dapp_address().to_owned(), - Some(snapshots.path()), + Some(snapshots.path().to_string_lossy().to_string()), ) .await; TestState { - snapshots, + _snapshots: snapshots, broker, server_manager, advance_runner, @@ -138,16 +138,17 @@ async fn test_advance_runner_fails_when_inputs_has_wrong_parent_id() { state.broker.produce_raw_input_event(input).await; tracing::info!("waiting for the advance_runner to exit with error"); - let _err = state.advance_runner.wait_err().await; + let err = state.advance_runner.wait_err().await; assert!(matches!( - advance_runner::AdvanceRunnerError::RunnerSnapshotDisabledError { - source: - advance_runner::runner::RunnerError::ParentIdMismatchError { - expected: "0".to_owned(), - got: "invalid".to_owned() + err, + advance_runner::AdvanceRunnerError::RunnerError { + source: advance_runner::RunnerError::ConsumeInputError { + source: advance_runner::BrokerFacadeError::ParentIdMismatchError { + expected, + got, } - }, - _err + } + } if expected == "0".to_owned() && got == "invalid".to_owned() )); } @@ -271,17 +272,6 @@ async fn test_advance_runner_does_not_generate_duplicate_claim() { assert_eq!(produced_claims[0].epoch_hash, rollups_claim.epoch_hash); } -#[test_log::test(tokio::test)] -async fn test_advance_runner_stores_snapshot_after_finishing_epoch() { - let docker = Cli::default(); - let state = TestState::setup(&docker).await; - - finish_epoch_and_wait_for_next_input(&state).await; - - tracing::info!("checking the snapshots dir"); - state.snapshots.assert_latest_snapshot(1, 1); -} - #[test_log::test(tokio::test)] async fn test_advance_runner_restore_session_after_restart() { let docker = Cli::default(); diff --git a/offchain/test-fixtures/docker/machine_snapshot.Dockerfile b/offchain/test-fixtures/docker/machine_snapshot.Dockerfile deleted file mode 100644 index 5a2f77b77..000000000 --- a/offchain/test-fixtures/docker/machine_snapshot.Dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -# (c) Cartesi and individual authors (see AUTHORS) -# SPDX-License-Identifier: Apache-2.0 (see LICENSE) - -FROM cartesi/server-manager:0.8.2 - -USER root - -# Install system dependencies -RUN apt update && \ - apt install -y wget - -# Download rootfs, linux and rom -ENV IMAGES_PATH /usr/share/cartesi-machine/images -RUN wget -O ${IMAGES_PATH}/rootfs.ext2 https://github.com/cartesi/image-rootfs/releases/download/v0.18.0/rootfs-v0.18.0.ext2 && \ - wget -O ${IMAGES_PATH}/linux.bin https://github.com/cartesi/image-kernel/releases/download/v0.17.0/linux-5.15.63-ctsi-2-v0.17.0.bin && \ - wget -O ${IMAGES_PATH}/rom.bin https://github.com/cartesi/machine-emulator-rom/releases/download/v0.17.0/rom-v0.17.0.bin - -# Generate machine with echo and store it -ENV SNAPSHOT_DIR=/tmp/dapp-bin -RUN cartesi-machine \ - --ram-length=128Mi \ - --rollup \ - --store=$SNAPSHOT_DIR \ - -- "ioctl-echo-loop --vouchers=1 --notices=1 --reports=1 --verbose=1" diff --git a/offchain/test-fixtures/src/machine_snapshots.rs b/offchain/test-fixtures/src/machine_snapshots.rs index 9016774cd..679732d15 100644 --- a/offchain/test-fixtures/src/machine_snapshots.rs +++ b/offchain/test-fixtures/src/machine_snapshots.rs @@ -1,17 +1,15 @@ // (c) Cartesi and individual authors (see AUTHORS) // SPDX-License-Identifier: Apache-2.0 (see LICENSE) -use std::fs; -use std::os::unix; -use std::path::Path; +use std::path::PathBuf; use tempfile::TempDir; use crate::docker_cli; -const TAG: &str = "cartesi/test-machine-snapshot"; -const DOCKERFILE: &str = "../test-fixtures/docker/machine_snapshot.Dockerfile"; -const CONTAINER_SNAPSHOT_DIR: &str = "/tmp/dapp-bin"; -const SNAPSHOT_NAME: &str = "0_0"; +const TAG: &str = "cartesi/rollups-node-snapshot:devel"; +const CONTAINER_SNAPSHOT_DIR: &str = + "/usr/share/cartesi/snapshots/test-application"; +const SNAPSHOT_NAME: &str = "test-application"; pub struct MachineSnapshotsFixture { dir: TempDir, @@ -23,57 +21,15 @@ impl MachineSnapshotsFixture { tracing::info!("setting up machine snapshots fixture"); let dir = tempfile::tempdir().expect("failed to create temp dir"); - docker_cli::build(DOCKERFILE, TAG, &[]); let id = docker_cli::create(TAG); let from_container = format!("{}:{}", id, CONTAINER_SNAPSHOT_DIR); - let to_host = dir.path().join(SNAPSHOT_NAME); - docker_cli::cp(&from_container, to_host.to_str().unwrap()); + docker_cli::cp(&from_container, dir.path().to_str().unwrap()); docker_cli::rm(&id); - unix::fs::symlink( - dir.path().join(SNAPSHOT_NAME), - dir.path().join("latest"), - ) - .expect("failed to create latest link"); Self { dir } } - /// Return the path of directory that contains the snapshots - pub fn path(&self) -> &Path { - self.dir.path() - } - - /// Check whether the given snapshot is the latest - #[tracing::instrument(level = "trace", skip_all)] - pub fn assert_latest_snapshot( - &self, - epoch_index: u64, - processed_input_count: u64, - ) { - tracing::trace!( - epoch_index, - processed_input_count, - "checking the latest snapshot" - ); - let snapshot_name = - format!("{}_{}", epoch_index, processed_input_count); - let snapshot = self.path().join(snapshot_name); - assert!(snapshot.is_dir(), "snapshot not found"); - let latest = self.path().join("latest"); - assert!(latest.is_symlink(), "latest link not found"); - assert_eq!( - fs::read_link(&latest).unwrap(), - snapshot, - "invalid latest link" - ); - - tracing::trace!("checking whether the other snapshots were deleted"); - let dir_iterator = fs::read_dir(self.path()).unwrap(); - for entry in dir_iterator { - let path = entry.unwrap().path(); - assert!( - path == latest || path == snapshot, - "previous snapshots not deleted" - ); - } + /// Return the path of directory that contains the snapshot + pub fn path(&self) -> PathBuf { + self.dir.path().join(SNAPSHOT_NAME) } }