diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index a2de30eb18..e57fb9e5d9 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -99,16 +99,16 @@ impl PartitionStoreManager { self.lookup.lock().await.live.get(&partition_id).cloned() } - #[allow(non_snake_case)] - pub fn get_legacy_storage_REMOVE_ME(&self) -> PartitionStore { - PartitionStore::new( - self.raw_db.clone(), - self.rocksdb.clone(), - CfName::new("data-unpartitioned"), - 0, - RangeInclusive::new(0, PartitionKey::MAX - 1), - ) - } + // #[allow(non_snake_case)] + // pub fn get_legacy_storage_REMOVE_ME(&self) -> PartitionStore { + // PartitionStore::new( + // self.raw_db.clone(), + // self.rocksdb.clone(), + // CfName::new("data-unpartitioned"), + // 0, + // RangeInclusive::new(0, PartitionKey::MAX - 1), + // ) + // } pub async fn open_partition_store( &self, diff --git a/crates/storage-query-datafusion/Cargo.toml b/crates/storage-query-datafusion/Cargo.toml index 0652ded4c1..cdf173e358 100644 --- a/crates/storage-query-datafusion/Cargo.toml +++ b/crates/storage-query-datafusion/Cargo.toml @@ -12,6 +12,7 @@ default = [] options_schema = ["dep:schemars"] [dependencies] +restate-core = { workspace = true } restate-invoker-api = { workspace = true } restate-partition-store = { workspace = true } restate-schema-api = { workspace = true, features = ["deployment"] } diff --git a/crates/storage-query-datafusion/src/context.rs b/crates/storage-query-datafusion/src/context.rs index 09e30a6526..aab50a3f5b 100644 --- a/crates/storage-query-datafusion/src/context.rs +++ b/crates/storage-query-datafusion/src/context.rs @@ -11,6 +11,7 @@ use std::fmt::Debug; use std::sync::Arc; +use async_trait::async_trait; use codederror::CodedError; use datafusion::error::DataFusionError; use datafusion::execution::context::SessionState; @@ -18,11 +19,14 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::prelude::{SessionConfig, SessionContext}; +use restate_core::worker_api::ProcessorsManagerHandle; use restate_invoker_api::StatusHandle; -use restate_partition_store::{PartitionStore, PartitionStoreManager}; +use restate_partition_store::PartitionStoreManager; use restate_schema_api::deployment::DeploymentResolver; use restate_schema_api::service::ServiceMetadataResolver; use restate_types::config::QueryEngineOptions; +use restate_types::errors::GenericError; +use restate_types::identifiers::PartitionId; use crate::{analyzer, physical_optimizer}; @@ -72,6 +76,11 @@ pub enum BuildError { Datafusion(#[from] DataFusionError), } +#[async_trait] +pub trait SelectPartitions: Send + Sync + Debug + 'static { + async fn get_live_partitions(&self) -> Result, GenericError>; +} + #[derive(Clone)] pub struct QueryContext { datafusion_context: SessionContext, @@ -80,8 +89,8 @@ pub struct QueryContext { impl QueryContext { pub async fn create( options: &QueryEngineOptions, - _partition_store_manager: PartitionStoreManager, - rocksdb: PartitionStore, + partition_selector: impl SelectPartitions + Clone, + partition_store_manager: PartitionStoreManager, status: impl StatusHandle + Send + Sync + Debug + Clone + 'static, schemas: impl DeploymentResolver + ServiceMetadataResolver @@ -96,15 +105,40 @@ impl QueryContext { options.tmp_dir.clone(), options.query_parallelism(), ); - crate::invocation_status::register_self(&ctx, rocksdb.clone())?; - crate::keyed_service_status::register_self(&ctx, rocksdb.clone())?; - crate::state::register_self(&ctx, rocksdb.clone())?; - crate::journal::register_self(&ctx, rocksdb.clone())?; - crate::invocation_state::register_self(&ctx, status)?; - crate::inbox::register_self(&ctx, rocksdb.clone())?; crate::deployment::register_self(&ctx, schemas.clone())?; crate::service::register_self(&ctx, schemas)?; - crate::idempotency::register_self(&ctx, rocksdb)?; + crate::invocation_state::register_self(&ctx, status)?; + // partition-key-based + crate::invocation_status::register_self( + &ctx, + partition_selector.clone(), + partition_store_manager.clone(), + )?; + crate::keyed_service_status::register_self( + &ctx, + partition_selector.clone(), + partition_store_manager.clone(), + )?; + crate::state::register_self( + &ctx, + partition_selector.clone(), + partition_store_manager.clone(), + )?; + crate::journal::register_self( + &ctx, + partition_selector.clone(), + partition_store_manager.clone(), + )?; + crate::inbox::register_self( + &ctx, + partition_selector.clone(), + partition_store_manager.clone(), + )?; + crate::idempotency::register_self( + &ctx, + partition_selector.clone(), + partition_store_manager, + )?; let ctx = ctx .datafusion_context @@ -173,3 +207,10 @@ impl AsRef for QueryContext { &self.datafusion_context } } + +#[async_trait] +impl SelectPartitions for ProcessorsManagerHandle { + async fn get_live_partitions(&self) -> Result, GenericError> { + Ok(self.get_live_partitions().await?) + } +} diff --git a/crates/storage-query-datafusion/src/deployment/table.rs b/crates/storage-query-datafusion/src/deployment/table.rs index 8115739f6c..68728d4971 100644 --- a/crates/storage-query-datafusion/src/deployment/table.rs +++ b/crates/storage-query-datafusion/src/deployment/table.rs @@ -8,23 +8,24 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::schema::DeploymentBuilder; +use std::fmt::Debug; +use std::sync::Arc; -use crate::context::QueryContext; -use crate::deployment::row::append_deployment_row; -use crate::generic_table::{GenericTableProvider, RangeScanner}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::logical_expr::Expr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; -use restate_schema_api::deployment::{Deployment, DeploymentResolver}; -use restate_types::identifiers::{PartitionKey, ServiceRevision}; -use std::fmt::Debug; -use std::ops::RangeInclusive; -use std::sync::Arc; use tokio::sync::mpsc::Sender; +use restate_schema_api::deployment::{Deployment, DeploymentResolver}; +use restate_types::identifiers::ServiceRevision; + +use super::schema::DeploymentBuilder; +use crate::context::QueryContext; +use crate::deployment::row::append_deployment_row; +use crate::table_providers::{GenericTableProvider, Scan}; + pub(crate) fn register_self( ctx: &QueryContext, resolver: impl DeploymentResolver + Send + Sync + Debug + 'static, @@ -42,15 +43,14 @@ pub(crate) fn register_self( #[derive(Debug, Clone)] struct DeploymentMetadataScanner(DMR); -/// TODO This trait makes little sense for sys_deployment, -/// but it's fine nevertheless as the caller always uses the full range -impl RangeScanner +impl Scan for DeploymentMetadataScanner { fn scan( &self, - _range: RangeInclusive, projection: SchemaRef, + _filters: &[Expr], + _limit: Option, ) -> SendableRecordBatchStream { let schema = projection.clone(); let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); diff --git a/crates/storage-query-datafusion/src/generic_table.rs b/crates/storage-query-datafusion/src/generic_table.rs deleted file mode 100644 index 2f075b2267..0000000000 --- a/crates/storage-query-datafusion/src/generic_table.rs +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::any::Any; -use std::fmt::{Debug, Formatter}; -use std::ops::RangeInclusive; -use std::sync::Arc; - -use async_trait::async_trait; - -use datafusion::arrow::datatypes::SchemaRef; - -use crate::table_util::compute_ordering; -use datafusion::common::DataFusionError; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::execution::context::{SessionState, TaskContext}; -use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; -use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, -}; -pub use datafusion_expr::UserDefinedLogicalNode; -use restate_types::identifiers::PartitionKey; - -// TODO This trait assumes every table's primary key contains a PartitionKey. -// This assumption is incorrect with some tables, such as sys_deployment. -pub(crate) trait RangeScanner: Send + Sync + Debug + 'static { - fn scan( - &self, - range: RangeInclusive, - projection: SchemaRef, - ) -> SendableRecordBatchStream; -} - -pub(crate) type RangeScannerRef = Arc; - -pub(crate) struct GenericTableProvider { - schema: SchemaRef, - scanner: RangeScannerRef, -} - -impl GenericTableProvider { - pub(crate) fn new(schema: SchemaRef, scanner: RangeScannerRef) -> Self { - Self { schema, scanner } - } -} - -#[async_trait] -impl TableProvider for GenericTableProvider { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - - fn table_type(&self) -> TableType { - TableType::Base - } - - async fn scan( - &self, - _state: &SessionState, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> datafusion::common::Result> { - let projected_schema = match projection { - Some(p) => SchemaRef::new(self.schema.project(p)?), - None => self.schema.clone(), - }; - - Ok(Arc::new(GenericTableExecutionPlan { - output_ordering: compute_ordering(projected_schema.clone()), - projected_schema, - scanner: Arc::clone(&self.scanner), - })) - } - - fn supports_filters_pushdown( - &self, - filters: &[&Expr], - ) -> datafusion::common::Result> { - let res = filters - .iter() - .map(|_| TableProviderFilterPushDown::Inexact) - .collect(); - - Ok(res) - } -} - -#[derive(Debug, Clone)] -struct GenericTableExecutionPlan { - output_ordering: Option>, - projected_schema: SchemaRef, - scanner: RangeScannerRef, -} - -impl ExecutionPlan for GenericTableExecutionPlan { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.projected_schema.clone() - } - - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.output_ordering.as_deref() - } - - fn children(&self) -> Vec> { - vec![] - } - - fn with_new_children( - self: Arc, - new_children: Vec>, - ) -> Result, DataFusionError> { - if !new_children.is_empty() { - return Err(DataFusionError::Internal( - "GenericTableExecutionPlan does not support children".to_owned(), - )); - } - - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> datafusion::common::Result { - let range = 0..=PartitionKey::MAX; - let stream = self.scanner.scan(range, self.projected_schema.clone()); - Ok(stream) - } -} - -impl DisplayAs for GenericTableExecutionPlan { - fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - match t { - DisplayFormatType::Default | DisplayFormatType::Verbose => { - write!(f, "GenericTableExecutionPlan()",) - } - } - } -} diff --git a/crates/storage-query-datafusion/src/idempotency/table.rs b/crates/storage-query-datafusion/src/idempotency/table.rs index a70eb93903..3180f6c03d 100644 --- a/crates/storage-query-datafusion/src/idempotency/table.rs +++ b/crates/storage-query-datafusion/src/idempotency/table.rs @@ -8,31 +8,34 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::row::append_idempotency_row; -use super::schema::IdempotencyBuilder; +use std::fmt::Debug; +use std::ops::RangeInclusive; +use std::sync::Arc; -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::physical_plan::stream::RecordBatchReceiverStream; -use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; use futures::{Stream, StreamExt}; +use tokio::sync::mpsc::Sender; + +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_storage_api::idempotency_table::{IdempotencyMetadata, ReadOnlyIdempotencyTable}; use restate_types::identifiers::{IdempotencyId, PartitionKey}; -use std::fmt::Debug; -use std::ops::RangeInclusive; -use std::sync::Arc; -use tokio::sync::mpsc::Sender; -pub(crate) fn register_self( +use super::row::append_idempotency_row; +use super::schema::IdempotencyBuilder; +use crate::context::{QueryContext, SelectPartitions}; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::table_providers::PartitionedTableProvider; + +pub(crate) fn register_self( ctx: &QueryContext, - storage: I, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, ) -> datafusion::common::Result<()> { - let table = GenericTableProvider::new( + let table = PartitionedTableProvider::new( + partition_selector, IdempotencyBuilder::schema(), - Arc::new(IdempotencyScanner(storage)), + LocalPartitionsScanner::new(partition_store_manager, IdempotencyScanner), ); ctx.as_ref() @@ -41,26 +44,21 @@ pub(crate) fn register_self(I); +struct IdempotencyScanner; -impl RangeScanner - for IdempotencyScanner -{ - fn scan( - &self, +impl ScanLocalPartition for IdempotencyScanner { + async fn scan_partition_store( + mut partition_store: PartitionStore, + tx: Sender>, range: RangeInclusive, projection: SchemaRef, - ) -> SendableRecordBatchStream { - let mut db = self.0.clone(); - let schema = projection.clone(); - let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); - let tx = stream_builder.tx(); - let background_task = async move { - for_each_state(schema, tx, db.all_idempotency_metadata(range)).await; - Ok(()) - }; - stream_builder.spawn(background_task); - stream_builder.build() + ) { + for_each_state( + projection, + tx, + partition_store.all_idempotency_metadata(range), + ) + .await; } } diff --git a/crates/storage-query-datafusion/src/idempotency/tests.rs b/crates/storage-query-datafusion/src/idempotency/tests.rs index 6815f580b2..c6579d5623 100644 --- a/crates/storage-query-datafusion/src/idempotency/tests.rs +++ b/crates/storage-query-datafusion/src/idempotency/tests.rs @@ -31,7 +31,8 @@ async fn get_idempotency_key() { .run_in_scope("mock-query-engine", None, MockQueryEngine::create()) .await; - let mut tx = engine.rocksdb_mut().transaction(); + let mut partition_store = engine.partition_store().await; + let mut tx = partition_store.transaction(); let invocation_id_1 = InvocationId::mock_random(); tx.put_idempotency_metadata( &IdempotencyId::new( diff --git a/crates/storage-query-datafusion/src/inbox/table.rs b/crates/storage-query-datafusion/src/inbox/table.rs index 3b308abb1d..182a00772e 100644 --- a/crates/storage-query-datafusion/src/inbox/table.rs +++ b/crates/storage-query-datafusion/src/inbox/table.rs @@ -14,26 +14,30 @@ use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; - -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::inbox::row::append_inbox_row; -use crate::inbox::schema::InboxBuilder; -use datafusion::physical_plan::stream::RecordBatchReceiverStream; -use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; use futures::{Stream, StreamExt}; -use restate_partition_store::PartitionStore; +use tokio::sync::mpsc::Sender; + +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_storage_api::inbox_table::{InboxTable, SequenceNumberInboxEntry}; use restate_storage_api::StorageError; use restate_types::identifiers::PartitionKey; -use tokio::sync::mpsc::Sender; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::inbox::row::append_inbox_row; +use crate::inbox::schema::InboxBuilder; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::table_providers::PartitionedTableProvider; pub(crate) fn register_self( ctx: &QueryContext, - storage: PartitionStore, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, ) -> datafusion::common::Result<()> { - let table = GenericTableProvider::new(InboxBuilder::schema(), Arc::new(InboxScanner(storage))); + let table = PartitionedTableProvider::new( + partition_selector, + InboxBuilder::schema(), + LocalPartitionsScanner::new(partition_store_manager, InboxScanner), + ); ctx.as_ref() .register_table("sys_inbox", Arc::new(table)) @@ -41,26 +45,18 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct InboxScanner(PartitionStore); +struct InboxScanner; -impl RangeScanner for InboxScanner { - fn scan( - &self, +impl ScanLocalPartition for InboxScanner { + async fn scan_partition_store( + mut partition_store: PartitionStore, + tx: Sender>, range: RangeInclusive, projection: SchemaRef, - ) -> SendableRecordBatchStream { - let mut db = self.0.clone(); - let schema = projection.clone(); - let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); - let tx = stream_builder.tx(); - let background_task = async move { - let mut transaction = db.transaction(); - let rows = transaction.all_inboxes(range); - for_each_state(schema, tx, rows).await; - Ok(()) - }; - stream_builder.spawn(background_task); - stream_builder.build() + ) { + let mut transaction = partition_store.transaction(); + let rows = transaction.all_inboxes(range); + for_each_state(projection, tx, rows).await; } } diff --git a/crates/storage-query-datafusion/src/invocation_state/table.rs b/crates/storage-query-datafusion/src/invocation_state/table.rs index edc9e94d30..85a21abfdb 100644 --- a/crates/storage-query-datafusion/src/invocation_state/table.rs +++ b/crates/storage-query-datafusion/src/invocation_state/table.rs @@ -9,22 +9,22 @@ // by the Apache License, Version 2.0. use std::fmt::Debug; -use std::ops::RangeInclusive; use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; - -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::invocation_state::row::append_state_row; -use crate::invocation_state::schema::StateBuilder; +use datafusion::logical_expr::Expr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; +use tokio::sync::mpsc::Sender; + use restate_invoker_api::{InvocationStatusReport, StatusHandle}; use restate_types::identifiers::{PartitionKey, WithPartitionKey}; -use tokio::sync::mpsc::Sender; + +use crate::context::QueryContext; +use crate::invocation_state::row::append_state_row; +use crate::invocation_state::schema::StateBuilder; +use crate::table_providers::{GenericTableProvider, Scan}; pub(crate) fn register_self( ctx: &QueryContext, @@ -41,12 +41,14 @@ pub(crate) fn register_self( #[derive(Debug, Clone)] struct StatusScanner(S); -impl RangeScanner for StatusScanner { +impl Scan for StatusScanner { fn scan( &self, - range: RangeInclusive, projection: SchemaRef, + _filters: &[Expr], + _limit: Option, ) -> SendableRecordBatchStream { + let range = PartitionKey::MIN..=PartitionKey::MAX; let status = self.0.clone(); let schema = projection.clone(); let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); diff --git a/crates/storage-query-datafusion/src/invocation_status/table.rs b/crates/storage-query-datafusion/src/invocation_status/table.rs index aa3b06fdfc..371fa57fcf 100644 --- a/crates/storage-query-datafusion/src/invocation_status/table.rs +++ b/crates/storage-query-datafusion/src/invocation_status/table.rs @@ -14,26 +14,27 @@ use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use tokio::sync::mpsc::Sender; -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::invocation_status::row::append_invocation_status_row; -use crate::invocation_status::schema::InvocationStatusBuilder; -use datafusion::physical_plan::stream::RecordBatchReceiverStream; -use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; use restate_partition_store::invocation_status_table::OwnedInvocationStatusRow; -use restate_partition_store::PartitionStore; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_types::identifiers::PartitionKey; -use tokio::sync::mpsc::Sender; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::invocation_status::row::append_invocation_status_row; +use crate::invocation_status::schema::InvocationStatusBuilder; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::table_providers::PartitionedTableProvider; pub(crate) fn register_self( ctx: &QueryContext, - storage: PartitionStore, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, ) -> datafusion::common::Result<()> { - let status_table = GenericTableProvider::new( + let status_table = PartitionedTableProvider::new( + partition_selector, InvocationStatusBuilder::schema(), - Arc::new(StatusScanner(storage)), + LocalPartitionsScanner::new(partition_store_manager, StatusScanner), ); ctx.as_ref() @@ -42,29 +43,21 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct StatusScanner(PartitionStore); +struct StatusScanner; -impl RangeScanner for StatusScanner { - fn scan( - &self, +impl ScanLocalPartition for StatusScanner { + async fn scan_partition_store( + partition_store: PartitionStore, + tx: Sender>, range: RangeInclusive, projection: SchemaRef, - ) -> SendableRecordBatchStream { - let db = self.0.clone(); - let schema = projection.clone(); - let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); - let tx = stream_builder.tx(); - let background_task = move || { - let rows = db.all_invocation_status(range); - for_each_status(schema, tx, rows); - Ok(()) - }; - stream_builder.spawn_blocking(background_task); - stream_builder.build() + ) { + let rows = partition_store.all_invocation_status(range); + for_each_status(projection, tx, rows).await; } } -fn for_each_status<'a, I>( +async fn for_each_status<'a, I>( schema: SchemaRef, tx: Sender>, rows: I, @@ -77,7 +70,7 @@ fn for_each_status<'a, I>( append_invocation_status_row(&mut builder, &mut temp, row); if builder.full() { let batch = builder.finish(); - if tx.blocking_send(Ok(batch)).is_err() { + if tx.send(Ok(batch)).await.is_err() { // not sure what to do here? // the other side has hung up on us. // we probably don't want to panic, is it will cause the entire process to exit @@ -88,6 +81,6 @@ fn for_each_status<'a, I>( } if !builder.empty() { let result = builder.finish(); - let _ = tx.blocking_send(Ok(result)); + let _ = tx.send(Ok(result)).await; } } diff --git a/crates/storage-query-datafusion/src/journal/table.rs b/crates/storage-query-datafusion/src/journal/table.rs index d4ef38d510..1a0d17ca29 100644 --- a/crates/storage-query-datafusion/src/journal/table.rs +++ b/crates/storage-query-datafusion/src/journal/table.rs @@ -14,25 +14,28 @@ use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use tokio::sync::mpsc::Sender; -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::journal::row::append_journal_row; -use crate::journal::schema::JournalBuilder; -use datafusion::physical_plan::stream::RecordBatchReceiverStream; -use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; use restate_partition_store::journal_table::OwnedJournalRow; -use restate_partition_store::PartitionStore; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_types::identifiers::PartitionKey; -use tokio::sync::mpsc::Sender; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::journal::row::append_journal_row; +use crate::journal::schema::JournalBuilder; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::table_providers::PartitionedTableProvider; pub(crate) fn register_self( ctx: &QueryContext, - storage: PartitionStore, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, ) -> datafusion::common::Result<()> { - let journal_table = - GenericTableProvider::new(JournalBuilder::schema(), Arc::new(JournalScanner(storage))); + let journal_table = PartitionedTableProvider::new( + partition_selector, + JournalBuilder::schema(), + LocalPartitionsScanner::new(partition_store_manager, JournalScanner), + ); ctx.as_ref() .register_table("sys_journal", Arc::new(journal_table)) @@ -40,29 +43,21 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct JournalScanner(PartitionStore); +struct JournalScanner; -impl RangeScanner for JournalScanner { - fn scan( - &self, +impl ScanLocalPartition for JournalScanner { + async fn scan_partition_store( + partition_store: PartitionStore, + tx: Sender>, range: RangeInclusive, projection: SchemaRef, - ) -> SendableRecordBatchStream { - let db = self.0.clone(); - let schema = projection.clone(); - let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); - let tx = stream_builder.tx(); - let background_task = move || { - let rows = db.all_journal(range); - for_each_journal(schema, tx, rows); - Ok(()) - }; - stream_builder.spawn_blocking(background_task); - stream_builder.build() + ) { + let rows = partition_store.all_journal(range); + for_each_journal(projection, tx, rows).await; } } -fn for_each_journal<'a, I>( +async fn for_each_journal<'a, I>( schema: SchemaRef, tx: Sender>, rows: I, @@ -75,7 +70,7 @@ fn for_each_journal<'a, I>( append_journal_row(&mut builder, &mut temp, row); if builder.full() { let batch = builder.finish(); - if tx.blocking_send(Ok(batch)).is_err() { + if tx.send(Ok(batch)).await.is_err() { // not sure what to do here? // the other side has hung up on us. // we probably don't want to panic, is it will cause the entire process to exit @@ -86,6 +81,6 @@ fn for_each_journal<'a, I>( } if !builder.empty() { let result = builder.finish(); - let _ = tx.blocking_send(Ok(result)); + let _ = tx.send(Ok(result)).await; } } diff --git a/crates/storage-query-datafusion/src/journal/tests.rs b/crates/storage-query-datafusion/src/journal/tests.rs index f970ebad97..940ecd7ad1 100644 --- a/crates/storage-query-datafusion/src/journal/tests.rs +++ b/crates/storage-query-datafusion/src/journal/tests.rs @@ -38,7 +38,8 @@ async fn get_entries() { .run_in_scope("mock-query-engine", None, MockQueryEngine::create()) .await; - let mut tx = engine.rocksdb_mut().transaction(); + let mut partition_store = engine.partition_store().await; + let mut tx = partition_store.transaction(); let journal_invocation_id = InvocationId::mock_random(); tx.put_journal_entry( &journal_invocation_id, diff --git a/crates/storage-query-datafusion/src/keyed_service_status/table.rs b/crates/storage-query-datafusion/src/keyed_service_status/table.rs index dc769c2f81..5e579b1ad3 100644 --- a/crates/storage-query-datafusion/src/keyed_service_status/table.rs +++ b/crates/storage-query-datafusion/src/keyed_service_status/table.rs @@ -14,26 +14,27 @@ use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use tokio::sync::mpsc::Sender; -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::keyed_service_status::row::append_virtual_object_status_row; -use crate::keyed_service_status::schema::KeyedServiceStatusBuilder; -use datafusion::physical_plan::stream::RecordBatchReceiverStream; -use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; use restate_partition_store::service_status_table::OwnedVirtualObjectStatusRow; -use restate_partition_store::PartitionStore; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_types::identifiers::PartitionKey; -use tokio::sync::mpsc::Sender; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::keyed_service_status::row::append_virtual_object_status_row; +use crate::keyed_service_status::schema::KeyedServiceStatusBuilder; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::table_providers::PartitionedTableProvider; pub(crate) fn register_self( ctx: &QueryContext, - storage: PartitionStore, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, ) -> datafusion::common::Result<()> { - let status_table = GenericTableProvider::new( + let status_table = PartitionedTableProvider::new( + partition_selector, KeyedServiceStatusBuilder::schema(), - Arc::new(VirtualObjectStatusScanner(storage)), + LocalPartitionsScanner::new(partition_store_manager, VirtualObjectStatusScanner), ); ctx.as_ref() @@ -42,29 +43,21 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct VirtualObjectStatusScanner(PartitionStore); +struct VirtualObjectStatusScanner; -impl RangeScanner for VirtualObjectStatusScanner { - fn scan( - &self, +impl ScanLocalPartition for VirtualObjectStatusScanner { + async fn scan_partition_store( + partition_store: PartitionStore, + tx: Sender>, range: RangeInclusive, projection: SchemaRef, - ) -> SendableRecordBatchStream { - let db = self.0.clone(); - let schema = projection.clone(); - let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); - let tx = stream_builder.tx(); - let background_task = move || { - let rows = db.all_virtual_object_status(range); - for_each_status(schema, tx, rows); - Ok(()) - }; - stream_builder.spawn_blocking(background_task); - stream_builder.build() + ) { + let rows = partition_store.all_virtual_object_status(range); + for_each_status(projection, tx, rows).await; } } -fn for_each_status<'a, I>( +async fn for_each_status<'a, I>( schema: SchemaRef, tx: Sender>, rows: I, @@ -77,7 +70,7 @@ fn for_each_status<'a, I>( append_virtual_object_status_row(&mut builder, &mut temp, row); if builder.full() { let batch = builder.finish(); - if tx.blocking_send(Ok(batch)).is_err() { + if tx.send(Ok(batch)).await.is_err() { // not sure what to do here? // the other side has hung up on us. // we probably don't want to panic, is it will cause the entire process to exit @@ -88,6 +81,6 @@ fn for_each_status<'a, I>( } if !builder.empty() { let result = builder.finish(); - let _ = tx.blocking_send(Ok(result)); + let _ = tx.send(Ok(result)).await; } } diff --git a/crates/storage-query-datafusion/src/lib.rs b/crates/storage-query-datafusion/src/lib.rs index 81674dc756..05c89aeed0 100644 --- a/crates/storage-query-datafusion/src/lib.rs +++ b/crates/storage-query-datafusion/src/lib.rs @@ -11,17 +11,18 @@ mod analyzer; pub mod context; mod deployment; -mod generic_table; mod idempotency; mod inbox; mod invocation_state; mod invocation_status; mod journal; mod keyed_service_status; +mod partition_store_scanner; mod physical_optimizer; mod service; mod state; mod table_macro; +mod table_providers; mod table_util; pub use context::BuildError; diff --git a/crates/storage-query-datafusion/src/mocks.rs b/crates/storage-query-datafusion/src/mocks.rs index c34b9373d7..76288cb9eb 100644 --- a/crates/storage-query-datafusion/src/mocks.rs +++ b/crates/storage-query-datafusion/src/mocks.rs @@ -8,8 +8,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use crate::context::SelectPartitions; + use super::context::QueryContext; +use async_trait::async_trait; use datafusion::arrow::array::ArrayRef; use datafusion::arrow::record_batch::RecordBatch; use datafusion::execution::SendableRecordBatchStream; @@ -17,7 +20,7 @@ use googletest::matcher::{Matcher, MatcherResult}; use restate_core::task_center; use restate_invoker_api::status_handle::mocks::MockStatusHandle; use restate_invoker_api::StatusHandle; -use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager}; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_rocksdb::RocksDbManager; use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry; use restate_schema_api::deployment::{Deployment, DeploymentResolver}; @@ -25,7 +28,8 @@ use restate_schema_api::service::mocks::MockServiceMetadataResolver; use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver}; use restate_types::arc_util::Constant; use restate_types::config::{CommonOptions, QueryEngineOptions, WorkerOptions}; -use restate_types::identifiers::{DeploymentId, PartitionKey, ServiceRevision}; +use restate_types::errors::GenericError; +use restate_types::identifiers::{DeploymentId, PartitionId, PartitionKey, ServiceRevision}; use restate_types::invocation::ServiceType; use std::fmt::Debug; use std::marker::PhantomData; @@ -75,7 +79,17 @@ impl DeploymentResolver for MockSchemas { } } -pub(crate) struct MockQueryEngine(PartitionStore, QueryContext); +#[derive(Clone, Debug)] +struct MockPartitionSelector; + +#[async_trait] +impl SelectPartitions for MockPartitionSelector { + async fn get_live_partitions(&self) -> Result, GenericError> { + Ok(vec![0]) + } +} + +pub(crate) struct MockQueryEngine(PartitionStoreManager, QueryContext); impl MockQueryEngine { pub async fn create_with( @@ -100,22 +114,13 @@ impl MockQueryEngine { ) .await .expect("DB creation succeeds"); - let rocksdb = manager - .open_partition_store( - 0, - RangeInclusive::new(0, PartitionKey::MAX), - OpenMode::CreateIfMissing, - &worker_options.storage.rocksdb, - ) - .await - .expect("column family is open"); Self( - rocksdb.clone(), + manager.clone(), QueryContext::create( &QueryEngineOptions::default(), + MockPartitionSelector, manager, - rocksdb, status, schemas, ) @@ -128,8 +133,8 @@ impl MockQueryEngine { Self::create_with(MockStatusHandle::default(), MockSchemas::default()).await } - pub fn rocksdb_mut(&mut self) -> &mut PartitionStore { - &mut self.0 + pub async fn partition_store(&mut self) -> PartitionStore { + self.0.get_partition_store(0).await.unwrap() } pub async fn execute( diff --git a/crates/storage-query-datafusion/src/partition_store_scanner.rs b/crates/storage-query-datafusion/src/partition_store_scanner.rs new file mode 100644 index 0000000000..0c1a470e3e --- /dev/null +++ b/crates/storage-query-datafusion/src/partition_store_scanner.rs @@ -0,0 +1,80 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::ops::RangeInclusive; + +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::physical_plan::stream::RecordBatchReceiverStream; +use tokio::sync::mpsc::Sender; +use tracing::warn; + +use restate_partition_store::{PartitionStore, PartitionStoreManager}; +use restate_types::identifiers::{PartitionId, PartitionKey}; + +use crate::table_providers::ScanPartition; + +pub trait ScanLocalPartition: Send + Sync + std::fmt::Debug + 'static { + fn scan_partition_store( + partition_store: PartitionStore, + tx: Sender>, + range: RangeInclusive, + projection: SchemaRef, + ) -> impl std::future::Future + Send; +} + +#[derive(Clone, Debug)] +pub struct LocalPartitionsScanner { + partition_store_manager: PartitionStoreManager, + _marker: std::marker::PhantomData, +} + +impl LocalPartitionsScanner +where + S: ScanLocalPartition, +{ + pub fn new(partition_store_manager: PartitionStoreManager, _scanner: S) -> Self { + Self { + partition_store_manager, + _marker: std::marker::PhantomData, + } + } +} + +impl ScanPartition for LocalPartitionsScanner +where + S: ScanLocalPartition, +{ + fn scan_partition( + &self, + partition_id: PartitionId, + range: RangeInclusive, + projection: SchemaRef, + ) -> SendableRecordBatchStream { + let mut stream_builder = RecordBatchReceiverStream::builder(projection.clone(), 16); + let tx = stream_builder.tx(); + let partition_store_manager = self.partition_store_manager.clone(); + let background_task = async move { + let Some(partition_store) = partition_store_manager + .get_partition_store(partition_id) + .await + else { + warn!("partition {} doesn't exist, this is benign if the partition is being transferred out of this node", partition_id); + return Ok(()); + }; + S::scan_partition_store(partition_store, tx, range, projection).await; + + Ok(()) + }; + stream_builder.spawn(background_task); + stream_builder.build() + } +} diff --git a/crates/storage-query-datafusion/src/service/table.rs b/crates/storage-query-datafusion/src/service/table.rs index a60989d17f..e4cc924334 100644 --- a/crates/storage-query-datafusion/src/service/table.rs +++ b/crates/storage-query-datafusion/src/service/table.rs @@ -8,23 +8,23 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use super::schema::ServiceBuilder; +use std::fmt::Debug; +use std::sync::Arc; -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::service::row::append_service_row; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::logical_expr::Expr; use datafusion::physical_plan::stream::RecordBatchReceiverStream; use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; -use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver}; -use restate_types::identifiers::PartitionKey; -use std::fmt::Debug; -use std::ops::RangeInclusive; -use std::sync::Arc; use tokio::sync::mpsc::Sender; +use restate_schema_api::service::{ServiceMetadata, ServiceMetadataResolver}; + +use super::schema::ServiceBuilder; +use crate::context::QueryContext; +use crate::service::row::append_service_row; +use crate::table_providers::{GenericTableProvider, Scan}; + pub(crate) fn register_self( ctx: &QueryContext, resolver: impl ServiceMetadataResolver + Send + Sync + Debug + 'static, @@ -42,15 +42,14 @@ pub(crate) fn register_self( #[derive(Debug, Clone)] struct ServiceMetadataScanner(SMR); -/// TODO This trait makes little sense for sys_service, -/// but it's fine nevertheless as the caller always uses the full range -impl RangeScanner +impl Scan for ServiceMetadataScanner { fn scan( &self, - _range: RangeInclusive, projection: SchemaRef, + _filters: &[Expr], + _limit: Option, ) -> SendableRecordBatchStream { let schema = projection.clone(); let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); diff --git a/crates/storage-query-datafusion/src/state/table.rs b/crates/storage-query-datafusion/src/state/table.rs index 4bf549d138..0144079092 100644 --- a/crates/storage-query-datafusion/src/state/table.rs +++ b/crates/storage-query-datafusion/src/state/table.rs @@ -14,24 +14,28 @@ use std::sync::Arc; use datafusion::arrow::datatypes::SchemaRef; use datafusion::arrow::record_batch::RecordBatch; +use tokio::sync::mpsc::Sender; -use crate::context::QueryContext; -use crate::generic_table::{GenericTableProvider, RangeScanner}; -use crate::state::row::append_state_row; -use crate::state::schema::StateBuilder; -use datafusion::physical_plan::stream::RecordBatchReceiverStream; -use datafusion::physical_plan::SendableRecordBatchStream; -pub use datafusion_expr::UserDefinedLogicalNode; use restate_partition_store::state_table::OwnedStateRow; -use restate_partition_store::PartitionStore; +use restate_partition_store::{PartitionStore, PartitionStoreManager}; use restate_types::identifiers::PartitionKey; -use tokio::sync::mpsc::Sender; + +use crate::context::{QueryContext, SelectPartitions}; +use crate::partition_store_scanner::{LocalPartitionsScanner, ScanLocalPartition}; +use crate::state::row::append_state_row; +use crate::state::schema::StateBuilder; +use crate::table_providers::PartitionedTableProvider; pub(crate) fn register_self( ctx: &QueryContext, - storage: PartitionStore, + partition_selector: impl SelectPartitions, + partition_store_manager: PartitionStoreManager, ) -> datafusion::common::Result<()> { - let table = GenericTableProvider::new(StateBuilder::schema(), Arc::new(StateScanner(storage))); + let table = PartitionedTableProvider::new( + partition_selector, + StateBuilder::schema(), + LocalPartitionsScanner::new(partition_store_manager, StateScanner), + ); ctx.as_ref() .register_table("state", Arc::new(table)) @@ -39,29 +43,21 @@ pub(crate) fn register_self( } #[derive(Debug, Clone)] -struct StateScanner(PartitionStore); +struct StateScanner; -impl RangeScanner for StateScanner { - fn scan( - &self, +impl ScanLocalPartition for StateScanner { + async fn scan_partition_store( + partition_store: PartitionStore, + tx: Sender>, range: RangeInclusive, projection: SchemaRef, - ) -> SendableRecordBatchStream { - let db = self.0.clone(); - let schema = projection.clone(); - let mut stream_builder = RecordBatchReceiverStream::builder(projection, 16); - let tx = stream_builder.tx(); - let background_task = move || { - let rows = db.all_states(range); - for_each_state(schema, tx, rows); - Ok(()) - }; - stream_builder.spawn_blocking(background_task); - stream_builder.build() + ) { + let rows = partition_store.all_states(range); + for_each_state(projection, tx, rows).await; } } -fn for_each_state<'a, I>( +async fn for_each_state<'a, I>( schema: SchemaRef, tx: Sender>, rows: I, @@ -73,7 +69,7 @@ fn for_each_state<'a, I>( append_state_row(&mut builder, row); if builder.full() { let batch = builder.finish(); - if tx.blocking_send(Ok(batch)).is_err() { + if tx.send(Ok(batch)).await.is_err() { // not sure what to do here? // the other side has hung up on us. // we probably don't want to panic, is it will cause the entire process to exit @@ -84,6 +80,6 @@ fn for_each_state<'a, I>( } if !builder.empty() { let result = builder.finish(); - let _ = tx.blocking_send(Ok(result)); + let _ = tx.send(Ok(result)).await; } } diff --git a/crates/storage-query-datafusion/src/table_providers.rs b/crates/storage-query-datafusion/src/table_providers.rs new file mode 100644 index 0000000000..ede0eaeccd --- /dev/null +++ b/crates/storage-query-datafusion/src/table_providers.rs @@ -0,0 +1,338 @@ +// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::ops::RangeInclusive; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; +use datafusion::physical_expr::PhysicalSortExpr; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, +}; + +use restate_types::identifiers::{PartitionId, PartitionKey}; + +use crate::context::SelectPartitions; +use crate::table_util::compute_ordering; + +pub(crate) trait ScanPartition: Send + Sync + Debug + 'static { + fn scan_partition( + &self, + partition_id: PartitionId, + range: RangeInclusive, + projection: SchemaRef, + ) -> SendableRecordBatchStream; +} + +pub(crate) struct PartitionedTableProvider { + partition_selector: S, + schema: SchemaRef, + partition_scanner: T, +} + +impl PartitionedTableProvider { + pub(crate) fn new(processors_manager: S, schema: SchemaRef, partition_scanner: T) -> Self { + Self { + partition_selector: processors_manager, + schema, + partition_scanner, + } + } +} + +#[async_trait] +impl TableProvider for PartitionedTableProvider +where + T: ScanPartition + Clone, + S: SelectPartitions, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> datafusion::common::Result> { + let projected_schema = match projection { + Some(p) => SchemaRef::new(self.schema.project(p)?), + None => self.schema.clone(), + }; + let live_partitions = self + .partition_selector + .get_live_partitions() + .await + .map_err(|e| DataFusionError::External(e))?; + + Ok(Arc::new(PartitionedExecutionPlan { + live_partitions, + output_ordering: compute_ordering(projected_schema.clone()), + projected_schema, + scanner: self.partition_scanner.clone(), + })) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::common::Result> { + let res = filters + .iter() + .map(|_| TableProviderFilterPushDown::Inexact) + .collect(); + + Ok(res) + } +} + +#[derive(Debug, Clone)] +struct PartitionedExecutionPlan { + live_partitions: Vec, + output_ordering: Option>, + projected_schema: SchemaRef, + scanner: T, +} + +impl ExecutionPlan for PartitionedExecutionPlan +where + T: ScanPartition, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(self.live_partitions.len()) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + self.output_ordering.as_deref() + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + new_children: Vec>, + ) -> Result, DataFusionError> { + if !new_children.is_empty() { + return Err(DataFusionError::Internal( + "PartitionedExecutionPlan does not support children".to_owned(), + )); + } + + Ok(self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> datafusion::common::Result { + // fake range until we can handle filters + let range = 0..=PartitionKey::MAX; + // map df partitions to our partition ids by index. + let partition_id = self + .live_partitions + .get(partition) + .expect("num_partitions within bounds"); + let stream = + self.scanner + .scan_partition(*partition_id, range, self.projected_schema.clone()); + Ok(stream) + } +} + +impl DisplayAs for PartitionedExecutionPlan +where + T: std::fmt::Debug, +{ + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "PartitionedExecutionPlan({:?})", self.scanner) + } + } + } +} + +// Generic-based table provider that provides node-level or global data rather than +// partition-keyed data. +pub(crate) trait Scan: Send + Sync + Debug + 'static { + fn scan( + &self, + projection: SchemaRef, + filters: &[Expr], + limit: Option, + ) -> SendableRecordBatchStream; +} + +pub(crate) type ScannerRef = Arc; +pub(crate) struct GenericTableProvider { + schema: SchemaRef, + scanner: ScannerRef, +} + +impl GenericTableProvider { + pub(crate) fn new(schema: SchemaRef, scanner: ScannerRef) -> Self { + Self { schema, scanner } + } +} + +#[async_trait] +impl TableProvider for GenericTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::common::Result> { + let projected_schema = match projection { + Some(p) => SchemaRef::new(self.schema.project(p)?), + None => self.schema.clone(), + }; + + Ok(Arc::new(GenericExecutionPlan::new( + projected_schema, + filters, + limit, + Arc::clone(&self.scanner), + ))) + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::common::Result> { + let res = filters + .iter() + .map(|_| TableProviderFilterPushDown::Inexact) + .collect(); + + Ok(res) + } +} + +#[derive(Debug, Clone)] +struct GenericExecutionPlan { + projected_schema: SchemaRef, + scanner: ScannerRef, + limit: Option, + filters: Vec, +} + +impl GenericExecutionPlan { + fn new( + projected_schema: SchemaRef, + filters: &[Expr], + limit: Option, + scanner: ScannerRef, + ) -> Self { + Self { + projected_schema, + scanner, + limit, + filters: filters.to_vec(), + } + } +} + +impl ExecutionPlan for GenericExecutionPlan { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.projected_schema.clone() + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn children(&self) -> Vec> { + vec![] + } + + fn with_new_children( + self: Arc, + new_children: Vec>, + ) -> Result, DataFusionError> { + if !new_children.is_empty() { + return Err(DataFusionError::Internal( + "GenericExecutionPlan does not support children".to_owned(), + )); + } + + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion::common::Result { + let stream = self + .scanner + .scan(self.projected_schema.clone(), &self.filters, self.limit); + Ok(stream) + } +} + +impl DisplayAs for GenericExecutionPlan { + fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "GenericExecutionPlan()",) + } + } + } +} diff --git a/crates/storage-query-datafusion/src/tests.rs b/crates/storage-query-datafusion/src/tests.rs index 5e879abfbc..cea45e2444 100644 --- a/crates/storage-query-datafusion/src/tests.rs +++ b/crates/storage-query-datafusion/src/tests.rs @@ -68,7 +68,8 @@ async fn query_sys_invocation() { ) .await; - let mut tx = engine.rocksdb_mut().transaction(); + let mut partition_store = engine.partition_store().await; + let mut tx = partition_store.transaction(); tx.put_invocation_status( &invocation_id, InvocationStatus::Invoked(InFlightInvocationMetadata { diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 40f55f9846..bc96ebd899 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -147,8 +147,6 @@ impl Worker { ) .await?; - let legacy_storage = partition_store_manager.get_legacy_storage_REMOVE_ME(); - let invoker = InvokerService::from_options( &config.common.service_client, &config.worker.invoker, @@ -168,8 +166,8 @@ impl Worker { let storage_query_context = QueryContext::create( &config.admin.query_engine, - partition_store_manager, - legacy_storage.clone(), + partition_processor_manager.handle(), + partition_store_manager.clone(), invoker.status_reader(), schema_view.clone(), ) diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index caa39558d0..0a55141eee 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -43,7 +43,7 @@ pub struct PartitionProcessorManager { bifrost: Bifrost, invoker_handle: InvokerHandle>, rx: mpsc::Receiver, - _tx: mpsc::Sender, + tx: mpsc::Sender, } impl PartitionProcessorManager { @@ -56,7 +56,7 @@ impl PartitionProcessorManager { bifrost: Bifrost, invoker_handle: InvokerHandle>, ) -> Self { - let (_tx, rx) = mpsc::channel(updateable_config.load().worker.internal_queue_length()); + let (tx, rx) = mpsc::channel(updateable_config.load().worker.internal_queue_length()); Self { updateable_config, running_partition_processors: HashMap::default(), @@ -67,12 +67,12 @@ impl PartitionProcessorManager { bifrost, invoker_handle, rx, - _tx, + tx, } } - pub fn _handle(&self) -> ProcessorsManagerHandle { - ProcessorsManagerHandle::new(self._tx.clone()) + pub fn handle(&self) -> ProcessorsManagerHandle { + ProcessorsManagerHandle::new(self.tx.clone()) } pub async fn run(mut self) -> anyhow::Result<()> {