Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqlBatchView #286

Merged
merged 19 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions rust/analytics/src/dfext/min_max_time_df.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use datafusion::prelude::*;
use datafusion::{
arrow::array::TimestampNanosecondArray,
functions_aggregate::min_max::{max, min},
};

use super::typed_column::typed_column;

pub async fn min_max_time_dataframe(
df: DataFrame,
min_time_column_name: &str,
max_time_column_name: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>)> {
let df = df.aggregate(
vec![],
vec![
min(col(min_time_column_name)),
max(col(max_time_column_name)),
],
)?;
let minmax = df.collect().await?;
if minmax.len() != 1 {
anyhow::bail!("expected minmax to be size 1");
}
let minmax = &minmax[0];
let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?;
let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?;
if min_column.is_empty() || max_column.is_empty() {
anyhow::bail!("expected minmax to be size 1");
}
Ok((
DateTime::from_timestamp_nanos(min_column.value(0)),
DateTime::from_timestamp_nanos(max_column.value(0)),
))
}
6 changes: 6 additions & 0 deletions rust/analytics/src/dfext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,11 @@ pub mod async_log_stream;
pub mod expressions;
/// Stream a function's log as a table
pub mod log_stream_table_provider;
/// Get min & max from the time column
pub mod min_max_time_df;
/// Convert a filtering expression to a physical predicate
pub mod predicate;
/// Execution plan interface for an async task
pub mod task_log_exec_plan;
/// Access to a RecordBatch's columns
pub mod typed_column;
27 changes: 27 additions & 0 deletions rust/analytics/src/dfext/predicate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::Session,
common::DFSchema,
logical_expr::utils::conjunction,
physical_plan::{expressions, PhysicalExpr},
prelude::*,
};
use std::sync::Arc;

// from datafusion/datafusion-examples/examples/advanced_parquet_index.rs
pub fn filters_to_predicate(
schema: SchemaRef,
state: &dyn Session,
filters: &[Expr],
) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
let df_schema = DFSchema::try_from(schema)?;
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| expressions::lit(true));

Ok(predicate)
}
28 changes: 28 additions & 0 deletions rust/analytics/src/dfext/typed_column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use anyhow::{Context, Result};

pub fn typed_column_by_name<'a, T: core::any::Any>(
rc: &'a datafusion::arrow::array::RecordBatch,
column_name: &str,
) -> Result<&'a T> {
let column = rc
.column_by_name(column_name)
.with_context(|| format!("getting column {column_name}"))?;
column
.as_any()
.downcast_ref::<T>()
.with_context(|| format!("casting {column_name}: {:?}", column.data_type()))
}

pub fn typed_column<T: core::any::Any>(
rc: &datafusion::arrow::array::RecordBatch,
index: usize,
) -> Result<&T> {
let column = rc
.columns()
.get(index)
.with_context(|| format!("getting column {index}"))?;
column
.as_any()
.downcast_ref::<T>()
.with_context(|| format!("casting {index}: {:?}", column.data_type()))
}
11 changes: 9 additions & 2 deletions rust/analytics/src/lakehouse/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ async fn verify_overlapping_partitions(
let nb_source_events = hash_to_object_count(source_data_hash)?;
let filtered = existing_partitions
.filter(view_set_name, view_instance_id, begin_insert, end_insert)
.with_context(|| "filtering partition cache")?
.partitions;
if filtered.is_empty() {
logger
Expand Down Expand Up @@ -147,7 +146,15 @@ async fn materialize_partition(
.with_context(|| "writing partition")?;
}
PartitionCreationStrategy::MergeExisting => {
create_merged_partition(lake, view, begin_insert, end_insert, logger).await?;
create_merged_partition(
existing_partitions,
lake,
view,
begin_insert,
end_insert,
logger,
)
.await?;
}
PartitionCreationStrategy::Abort => {}
}
Expand Down
30 changes: 14 additions & 16 deletions rust/analytics/src/lakehouse/blocks_view.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use crate::time::TimeRange;

use super::{
metadata_partition_spec::fetch_metadata_partition_spec,
partition_cache::QueryPartitionProvider,
partition_cache::PartitionCache,
view::{PartitionSpec, View, ViewMetadata},
view_factory::ViewMaker,
};
use crate::time::TimeRange;
use anyhow::{Context, Result};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
Expand All @@ -19,17 +17,8 @@ use std::sync::Arc;

const VIEW_SET_NAME: &str = "blocks";
const VIEW_INSTANCE_ID: &str = "global";

#[derive(Debug)]
pub struct BlocksViewMaker {}

impl ViewMaker for BlocksViewMaker {
fn make_view(&self, view_instance_id: &str) -> Result<Arc<dyn View>> {
if view_instance_id != "global" {
anyhow::bail!("only global view instance id is supported for metadata views");
}
Ok(Arc::new(BlocksView::new()?))
}
lazy_static::lazy_static! {
static ref INSERT_TIME_COLUMN: Arc<String> = Arc::new( String::from("insert_time"));
}

#[derive(Debug)]
Expand Down Expand Up @@ -78,7 +67,7 @@ impl View for BlocksView {
async fn make_batch_partition_spec(
&self,
lake: Arc<DataLakeConnection>,
_part_provider: Arc<dyn QueryPartitionProvider>,
_existing_partitions: Arc<PartitionCache>,
begin_insert: DateTime<Utc>,
end_insert: DateTime<Utc>,
) -> Result<Arc<dyn PartitionSpec>> {
Expand Down Expand Up @@ -123,6 +112,7 @@ impl View for BlocksView {
anyhow::bail!("not supported");
}

// fetch_partition_source_data relies on this filter being on insert_time
fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
let utc: Arc<str> = Arc::from("+00:00");
Ok(vec![Expr::Between(Between::new(
Expand All @@ -140,6 +130,14 @@ impl View for BlocksView {
.into(),
))])
}

fn get_min_event_time_column_name(&self) -> Arc<String> {
INSERT_TIME_COLUMN.clone()
}

fn get_max_event_time_column_name(&self) -> Arc<String> {
INSERT_TIME_COLUMN.clone()
}
}

pub fn blocks_view_schema() -> Schema {
Expand Down
17 changes: 14 additions & 3 deletions rust/analytics/src/lakehouse/log_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{
block_partition_spec::BlockPartitionSpec,
jit_partitions::write_partition_from_blocks,
log_block_processor::LogBlockProcessor,
partition_cache::QueryPartitionProvider,
partition_cache::PartitionCache,
partition_source_data::fetch_partition_source_data,
view::{PartitionSpec, View, ViewMetadata},
view_factory::ViewMaker,
Expand All @@ -26,6 +26,9 @@ use std::sync::Arc;
use uuid::Uuid;

const VIEW_SET_NAME: &str = "log_entries";
lazy_static::lazy_static! {
static ref TIME_COLUMN: Arc<String> = Arc::new( String::from("time"));
}

#[derive(Debug)]
pub struct LogViewMaker {}
Expand Down Expand Up @@ -72,15 +75,15 @@ impl View for LogView {
async fn make_batch_partition_spec(
&self,
lake: Arc<DataLakeConnection>,
part_provider: Arc<dyn QueryPartitionProvider>,
existing_partitions: Arc<PartitionCache>,
begin_insert: DateTime<Utc>,
end_insert: DateTime<Utc>,
) -> Result<Arc<dyn PartitionSpec>> {
if *self.view_instance_id != "global" {
anyhow::bail!("not supported for jit queries... should it?");
}
let source_data =
fetch_partition_source_data(lake, part_provider, begin_insert, end_insert, "log")
fetch_partition_source_data(lake, existing_partitions, begin_insert, end_insert, "log")
.await
.with_context(|| "fetch_partition_source_data")?;
Ok(Arc::new(BlockPartitionSpec {
Expand Down Expand Up @@ -187,4 +190,12 @@ impl View for LogView {
.into(),
))])
}

fn get_min_event_time_column_name(&self) -> Arc<String> {
TIME_COLUMN.clone()
}

fn get_max_event_time_column_name(&self) -> Arc<String> {
TIME_COLUMN.clone()
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use super::{partition_cache::QueryPartitionProvider, view::View};
use crate::{lakehouse::reader_factory::ReaderFactory, time::TimeRange};
use super::{
partition_cache::QueryPartitionProvider,
partitioned_execution_plan::make_partitioned_execution_plan, view::View,
};
use crate::time::TimeRange;
use async_trait::async_trait;
use datafusion::{
arrow::datatypes::SchemaRef,
catalog::{Session, TableProvider},
common::DFSchema,
datasource::{
listing::PartitionedFile,
physical_plan::{parquet::ParquetExecBuilder, FileScanConfig},
TableType,
},
datasource::TableType,
error::DataFusionError,
execution::object_store::ObjectStoreUrl,
logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown},
physical_plan::{expressions, ExecutionPlan, PhysicalExpr},
logical_expr::{Expr, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use object_store::ObjectStore;
Expand Down Expand Up @@ -48,25 +45,6 @@ impl MaterializedView {
pub fn get_view(&self) -> Arc<dyn View> {
self.view.clone()
}

// from datafusion/datafusion-examples/examples/advanced_parquet_index.rs
fn filters_to_predicate(
&self,
schema: SchemaRef,
state: &dyn Session,
filters: &[Expr],
) -> datafusion::error::Result<Arc<dyn PhysicalExpr>> {
let df_schema = DFSchema::try_from(schema)?;
let predicate = conjunction(filters.to_vec());
let predicate = predicate
.map(|predicate| state.create_physical_expr(predicate, &df_schema))
.transpose()?
// if there are no filters, use a literal true to have a predicate
// that always evaluates to true we can pass to the index
.unwrap_or_else(|| expressions::lit(true));

Ok(predicate)
}
}

#[async_trait]
Expand All @@ -90,13 +68,11 @@ impl TableProvider for MaterializedView {
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let predicate = self.filters_to_predicate(self.view.get_file_schema(), state, filters)?;
self.view
.jit_update(self.lake.clone(), self.query_range.clone())
.await
.map_err(|e| DataFusionError::External(e.into()))?;

let mut file_group = vec![];
let partitions = self
.part_provider
.fetch(
Expand All @@ -108,22 +84,15 @@ impl TableProvider for MaterializedView {
.await
.map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;

for part in &partitions {
file_group.push(PartitionedFile::new(&part.file_path, part.file_size as u64));
}

let schema = self.schema();
let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
let file_scan_config = FileScanConfig::new(object_store_url, schema)
.with_limit(limit)
.with_projection(projection.cloned())
.with_file_groups(vec![file_group]);
let reader_factory =
ReaderFactory::new(Arc::clone(&self.object_store), Arc::new(partitions));
Ok(ParquetExecBuilder::new(file_scan_config)
.with_predicate(predicate.clone())
.with_parquet_file_reader_factory(Arc::new(reader_factory))
.build_arc())
make_partitioned_execution_plan(
self.schema(),
self.object_store.clone(),
state,
projection,
filters,
limit,
Arc::new(partitions),
)
}

/// Tell DataFusion to push filters down to the scan method
Expand Down
Loading
Loading