From 80601c1a582b8354b96e9b13309f040fe5af396f Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Mon, 20 Jan 2025 09:46:01 -0500 Subject: [PATCH 01/19] [wip] SqlBatchView --- rust/analytics/src/lakehouse/mod.rs | 2 + .../src/lakehouse/partition_cache.rs | 22 ++++ .../analytics/src/lakehouse/sql_batch_view.rs | 104 +++++++++++++++++ rust/analytics/tests/sql_view_test.rs | 110 ++++++++++++++++++ rust/tracing/src/logs/events.rs | 4 +- rust/tracing/src/time.rs | 3 +- 6 files changed, 242 insertions(+), 3 deletions(-) create mode 100644 rust/analytics/src/lakehouse/sql_batch_view.rs create mode 100644 rust/analytics/tests/sql_view_test.rs diff --git a/rust/analytics/src/lakehouse/mod.rs b/rust/analytics/src/lakehouse/mod.rs index 0864ee00..351f3eda 100644 --- a/rust/analytics/src/lakehouse/mod.rs +++ b/rust/analytics/src/lakehouse/mod.rs @@ -44,6 +44,8 @@ pub mod query; pub mod reader_factory; /// Exposes retire_partitions as a table function pub mod retire_partitions_table_function; +/// Sql-defined view updated at regular intervals +pub mod sql_batch_view; /// Replicated view of the `streams` table of the postgresql metadata database. pub mod streams_view; /// TableProvider implementation for the lakehouse diff --git a/rust/analytics/src/lakehouse/partition_cache.rs b/rust/analytics/src/lakehouse/partition_cache.rs index f0380529..084d1290 100644 --- a/rust/analytics/src/lakehouse/partition_cache.rs +++ b/rust/analytics/src/lakehouse/partition_cache.rs @@ -263,3 +263,25 @@ impl QueryPartitionProvider for LivePartitionProvider { Ok(partitions) } } + +#[derive(Debug)] +pub struct NullPartitionProvider {} + +impl fmt::Display for NullPartitionProvider { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +#[async_trait] +impl QueryPartitionProvider for NullPartitionProvider { + async fn fetch( + &self, + _view_set_name: &str, + _view_instance_id: &str, + _query_range: Option, + _file_schema_hash: Vec, + ) -> Result> { + Ok(vec![]) + } +} diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs new file mode 100644 index 00000000..a2d29c03 --- /dev/null +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -0,0 +1,104 @@ +use super::{ + partition_cache::{NullPartitionProvider, QueryPartitionProvider}, + query::make_session_context, + view::{PartitionSpec, View}, + view_factory::ViewFactory, +}; +use crate::time::TimeRange; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion::{arrow::datatypes::Schema, prelude::*, sql::TableReference}; +use micromegas_ingestion::data_lake_connection::DataLakeConnection; +use std::hash::Hash; +use std::hash::Hasher; +use std::{hash::DefaultHasher, sync::Arc}; + +#[derive(Debug)] +pub struct SqlBatchView { + view_set_name: Arc, + view_instance_id: Arc, + event_time_column: Arc, // could be more general - for filtering + src_query: Arc, + transform_query: Arc, + merge_partitions_query: Arc, + schema: Arc, +} + +impl SqlBatchView { + pub async fn new( + view_set_name: Arc, + view_instance_id: Arc, + event_time_column: Arc, + src_query: Arc, + transform_query: Arc, + merge_partitions_query: Arc, + lake: Arc, + view_factory: Arc, + ) -> Result { + let null_part_provider = Arc::new(NullPartitionProvider {}); + let ctx = make_session_context(lake, null_part_provider, None, view_factory) + .await + .with_context(|| "make_session_context")?; + let src_df = ctx.sql(&src_query).await?; + let src_view = src_df.into_view(); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + src_view, + )?; + + let transformed_df = ctx.sql(&transform_query).await?; + let schema = transformed_df.schema().inner().clone(); + + Ok(Self { + view_set_name, + view_instance_id, + event_time_column, + src_query, + transform_query, + merge_partitions_query, + schema, + }) + } +} + +#[async_trait] +impl View for SqlBatchView { + fn get_view_set_name(&self) -> Arc { + self.view_set_name.clone() + } + + fn get_view_instance_id(&self) -> Arc { + self.view_instance_id.clone() + } + + async fn make_batch_partition_spec( + &self, + _lake: Arc, + _part_provider: Arc, + _begin_insert: DateTime, + _end_insert: DateTime, + ) -> Result> { + todo!(); + } + fn get_file_schema_hash(&self) -> Vec { + let mut hasher = DefaultHasher::new(); + self.schema.hash(&mut hasher); + hasher.finish().to_le_bytes().to_vec() + } + fn get_file_schema(&self) -> Arc { + self.schema.clone() + } + async fn jit_update( + &self, + _lake: Arc, + _query_range: Option, + ) -> Result<()> { + anyhow::bail!("jit_update not supported on SqlBatchView"); + } + fn make_time_filter(&self, _begin: DateTime, _end: DateTime) -> Result> { + todo!(); + } +} diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs new file mode 100644 index 00000000..f6b54fc0 --- /dev/null +++ b/rust/analytics/tests/sql_view_test.rs @@ -0,0 +1,110 @@ +use anyhow::{Context, Result}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use micromegas_analytics::lakehouse::view::View; +use micromegas_analytics::lakehouse::view_factory::default_view_factory; +use micromegas_analytics::lakehouse::{sql_batch_view::SqlBatchView, view_factory::ViewFactory}; +use micromegas_ingestion::data_lake_connection::{connect_to_data_lake, DataLakeConnection}; +use micromegas_telemetry_sink::TelemetryGuardBuilder; +use micromegas_tracing::prelude::*; +use std::sync::Arc; + +async fn make_log_entries_levels_per_process_view( + lake: Arc, + view_factory: Arc, +) -> Result { + let src_query = Arc::new(String::from( + " + SELECT process_id, + date_bin('1 minute', time) as time_bin, + CAST(level==1 as INT) as fatal, + CAST(level==2 as INT) as err, + CAST(level==3 as INT) as warn, + CAST(level==4 as INT) as info, + CAST(level==5 as INT) as debug, + CAST(level==6 as INT) as trace + FROM log_entries;", + )); + + let transform_query = Arc::new(String::from( + " + SELECT time_bin, + process_id, + sum(fatal) as nb_fatal, + sum(err) as nb_err, + sum(warn) as nb_warn, + sum(info) as nb_info, + sum(debug) as nb_debug, + sum(trace) as nb_trace + FROM source + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", + )); + + let merge_partitions_query = Arc::new(String::from( + " + SELECT time_bin, + process_id, + sum(nb_fatal) as nb_fatal, + sum(nb_err) as nb_err, + sum(nb_warn) as nb_warn, + sum(nb_info) as nb_info, + sum(nb_debug) as nb_debug, + sum(nb_trace) as nb_trace + FROM src + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", + )); + + SqlBatchView::new( + Arc::new("log_entries_per_process".to_owned()), + Arc::new("global".to_owned()), + Arc::new("time_bin".to_owned()), + src_query, + transform_query, + merge_partitions_query, + lake, + view_factory, + ) + .await +} + +#[ignore] +#[tokio::test] +async fn sql_view_test() -> Result<()> { + let _telemetry_guard = TelemetryGuardBuilder::default() + .with_ctrlc_handling() + .with_local_sink_max_level(LevelFilter::Info) + .build(); + let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING") + .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?; + let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI") + .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; + let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); + let view_factory = Arc::new(default_view_factory()?); + let log_summary = make_log_entries_levels_per_process_view(lake, view_factory).await?; + let ref_schema = Arc::new(Schema::new(vec![ + Field::new( + "time_bin", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "process_id", + DataType::Dictionary(DataType::Int16.into(), DataType::Utf8.into()), + false, + ), + Field::new("nb_fatal", DataType::Int64, true), + Field::new("nb_err", DataType::Int64, true), + Field::new("nb_warn", DataType::Int64, true), + Field::new("nb_info", DataType::Int64, true), + Field::new("nb_debug", DataType::Int64, true), + Field::new("nb_trace", DataType::Int64, true), + ])); + + assert_eq!(log_summary.get_file_schema(), ref_schema); + + let ref_schema_hash: Vec = vec![219, 37, 165, 158, 123, 73, 39, 204]; + assert_eq!(log_summary.get_file_schema_hash(), ref_schema_hash); + + Ok(()) +} diff --git a/rust/tracing/src/logs/events.rs b/rust/tracing/src/logs/events.rs index c47f47e7..0ca65b48 100644 --- a/rust/tracing/src/logs/events.rs +++ b/rust/tracing/src/logs/events.rs @@ -291,14 +291,14 @@ impl InProcSerialize for TaggedLogString { } } +#[allow(unused_imports)] #[cfg(test)] mod test { - use std::thread; - use crate::logs::{ events::{Level, LevelFilter}, FilterState, LogMetadata, FILTER_LEVEL_UNSET_VALUE, }; + use std::thread; #[test] fn test_filter_levels() { diff --git a/rust/tracing/src/time.rs b/rust/tracing/src/time.rs index 94f9c30b..86e996a6 100644 --- a/rust/tracing/src/time.rs +++ b/rust/tracing/src/time.rs @@ -147,9 +147,10 @@ fn frequency_fallback() -> u64 { 0 } +#[allow(unused_imports)] #[cfg(test)] mod tests { - use super::*; + use crate::time::frequency; #[test] fn test_frequency() { From 2fc6606eb634ac70cc90a730df4a7a496c8fa49a Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Mon, 20 Jan 2025 16:16:11 -0500 Subject: [PATCH 02/19] [wip] sql_partition_spec --- rust/analytics/src/dfext/get_column.rs | 14 ++++ rust/analytics/src/dfext/mod.rs | 2 + rust/analytics/src/lakehouse/batch_update.rs | 1 - rust/analytics/src/lakehouse/blocks_view.rs | 7 +- rust/analytics/src/lakehouse/log_view.rs | 6 +- rust/analytics/src/lakehouse/metrics_view.rs | 17 +++-- rust/analytics/src/lakehouse/mod.rs | 4 +- .../src/lakehouse/partition_cache.rs | 36 +++++++--- .../src/lakehouse/partition_source_data.rs | 22 ++---- .../analytics/src/lakehouse/processes_view.rs | 4 +- .../analytics/src/lakehouse/sql_batch_view.rs | 41 ++++++++++-- .../src/lakehouse/sql_partition_spec.rs | 56 ++++++++++++++++ rust/analytics/src/lakehouse/streams_view.rs | 4 +- .../src/lakehouse/thread_spans_view.rs | 4 +- rust/analytics/src/lakehouse/view.rs | 5 +- rust/analytics/tests/sql_view_test.rs | 67 +++++++++++++++++-- rust/public/src/servers/maintenance.rs | 1 + 17 files changed, 232 insertions(+), 59 deletions(-) create mode 100644 rust/analytics/src/dfext/get_column.rs create mode 100644 rust/analytics/src/lakehouse/sql_partition_spec.rs diff --git a/rust/analytics/src/dfext/get_column.rs b/rust/analytics/src/dfext/get_column.rs new file mode 100644 index 00000000..0787da84 --- /dev/null +++ b/rust/analytics/src/dfext/get_column.rs @@ -0,0 +1,14 @@ +use anyhow::{Context, Result}; + +pub fn get_column<'a, T: core::any::Any>( + rc: &'a datafusion::arrow::array::RecordBatch, + column_name: &str, +) -> Result<&'a T> { + let column = rc + .column_by_name(column_name) + .with_context(|| format!("getting column {column_name}"))?; + column + .as_any() + .downcast_ref::() + .with_context(|| format!("casting {column_name}: {:?}", column.data_type())) +} diff --git a/rust/analytics/src/dfext/mod.rs b/rust/analytics/src/dfext/mod.rs index 9022b53f..daa5262d 100644 --- a/rust/analytics/src/dfext/mod.rs +++ b/rust/analytics/src/dfext/mod.rs @@ -2,6 +2,8 @@ pub mod async_log_stream; /// Utilities to help deal with df expressions pub mod expressions; +/// Access to a RecordBatch's columns +pub mod get_column; /// Stream a function's log as a table pub mod log_stream_table_provider; /// Execution plan interface for an async task diff --git a/rust/analytics/src/lakehouse/batch_update.rs b/rust/analytics/src/lakehouse/batch_update.rs index 34f60dfd..37231d84 100644 --- a/rust/analytics/src/lakehouse/batch_update.rs +++ b/rust/analytics/src/lakehouse/batch_update.rs @@ -38,7 +38,6 @@ async fn verify_overlapping_partitions( let nb_source_events = hash_to_object_count(source_data_hash)?; let filtered = existing_partitions .filter(view_set_name, view_instance_id, begin_insert, end_insert) - .with_context(|| "filtering partition cache")? .partitions; if filtered.is_empty() { logger diff --git a/rust/analytics/src/lakehouse/blocks_view.rs b/rust/analytics/src/lakehouse/blocks_view.rs index 9ebc0c02..9f7aa6be 100644 --- a/rust/analytics/src/lakehouse/blocks_view.rs +++ b/rust/analytics/src/lakehouse/blocks_view.rs @@ -1,11 +1,10 @@ -use crate::time::TimeRange; - use super::{ metadata_partition_spec::fetch_metadata_partition_spec, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, }; +use crate::time::TimeRange; use anyhow::{Context, Result}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -78,7 +77,7 @@ impl View for BlocksView { async fn make_batch_partition_spec( &self, lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { diff --git a/rust/analytics/src/lakehouse/log_view.rs b/rust/analytics/src/lakehouse/log_view.rs index 8752e51c..2e0826c0 100644 --- a/rust/analytics/src/lakehouse/log_view.rs +++ b/rust/analytics/src/lakehouse/log_view.rs @@ -2,7 +2,7 @@ use super::{ block_partition_spec::BlockPartitionSpec, jit_partitions::write_partition_from_blocks, log_block_processor::LogBlockProcessor, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, partition_source_data::fetch_partition_source_data, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, @@ -72,7 +72,7 @@ impl View for LogView { async fn make_batch_partition_spec( &self, lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { @@ -80,7 +80,7 @@ impl View for LogView { anyhow::bail!("not supported for jit queries... should it?"); } let source_data = - fetch_partition_source_data(lake, part_provider, begin_insert, end_insert, "log") + fetch_partition_source_data(lake, existing_partitions, begin_insert, end_insert, "log") .await .with_context(|| "fetch_partition_source_data")?; Ok(Arc::new(BlockPartitionSpec { diff --git a/rust/analytics/src/lakehouse/metrics_view.rs b/rust/analytics/src/lakehouse/metrics_view.rs index f327cadb..0c6d59d1 100644 --- a/rust/analytics/src/lakehouse/metrics_view.rs +++ b/rust/analytics/src/lakehouse/metrics_view.rs @@ -10,7 +10,7 @@ use super::{ generate_jit_partitions, is_jit_partition_up_to_date, write_partition_from_blocks, }, metrics_block_processor::MetricsBlockProcessor, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, partition_source_data::fetch_partition_source_data, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, @@ -73,17 +73,22 @@ impl View for MetricsView { async fn make_batch_partition_spec( &self, lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { if *self.view_instance_id != "global" { anyhow::bail!("not supported for jit queries... should it?"); } - let source_data = - fetch_partition_source_data(lake, part_provider, begin_insert, end_insert, "metrics") - .await - .with_context(|| "fetch_partition_source_data")?; + let source_data = fetch_partition_source_data( + lake, + existing_partitions, + begin_insert, + end_insert, + "metrics", + ) + .await + .with_context(|| "fetch_partition_source_data")?; Ok(Arc::new(BlockPartitionSpec { view_metadata: ViewMetadata { view_set_name: self.view_set_name.clone(), diff --git a/rust/analytics/src/lakehouse/mod.rs b/rust/analytics/src/lakehouse/mod.rs index 351f3eda..a67b6df7 100644 --- a/rust/analytics/src/lakehouse/mod.rs +++ b/rust/analytics/src/lakehouse/mod.rs @@ -1,4 +1,4 @@ -/// Record batches + schema +// Record batches + schema pub mod answer; /// Write parquet in object store pub mod async_parquet_writer; @@ -46,6 +46,8 @@ pub mod reader_factory; pub mod retire_partitions_table_function; /// Sql-defined view updated at regular intervals pub mod sql_batch_view; +/// Specification for a view partition backed by a SQL query on the lakehouse. +pub mod sql_partition_spec; /// Replicated view of the `streams` table of the postgresql metadata database. pub mod streams_view; /// TableProvider implementation for the lakehouse diff --git a/rust/analytics/src/lakehouse/partition_cache.rs b/rust/analytics/src/lakehouse/partition_cache.rs index 084d1290..0c018ac2 100644 --- a/rust/analytics/src/lakehouse/partition_cache.rs +++ b/rust/analytics/src/lakehouse/partition_cache.rs @@ -18,6 +18,7 @@ pub trait QueryPartitionProvider: std::fmt::Display + Send + Sync + std::fmt::De ) -> Result>; } +/// PartitionCache allows to query partitions based on the insert_time range #[derive(Debug)] pub struct PartitionCache { pub partitions: Vec, @@ -96,10 +97,7 @@ impl PartitionCache { view_instance_id: &str, begin_insert: DateTime, end_insert: DateTime, - ) -> Result { - if begin_insert < self.begin_insert || end_insert > self.end_insert { - anyhow::bail!("filtering from a result set that's not large enough"); - } + ) -> Self { let mut partitions = vec![]; for part in &self.partitions { if *part.view_metadata.view_set_name == view_set_name @@ -110,16 +108,35 @@ impl PartitionCache { partitions.push(part.clone()); } } - Ok(Self { + Self { partitions, begin_insert, end_insert, - }) + } + } + + pub fn filter_insert_range( + &self, + begin_insert: DateTime, + end_insert: DateTime, + ) -> Self { + let mut partitions = vec![]; + for part in &self.partitions { + if part.begin_insert_time < end_insert && part.end_insert_time > begin_insert { + partitions.push(part.clone()); + } + } + Self { + partitions, + begin_insert, + end_insert, + } } } #[async_trait] impl QueryPartitionProvider for PartitionCache { + /// unlike LivePartitionProvider, the query_range is tested against the insertion time, not the event time async fn fetch( &self, view_set_name: &str, @@ -129,11 +146,14 @@ impl QueryPartitionProvider for PartitionCache { ) -> Result> { let mut partitions = vec![]; if let Some(range) = query_range { + if range.begin < self.begin_insert || range.end > self.end_insert { + anyhow::bail!("filtering from a result set that's not large enough"); + } for part in &self.partitions { if *part.view_metadata.view_set_name == view_set_name && *part.view_metadata.view_instance_id == view_instance_id - && part.min_event_time < range.end - && part.max_event_time > range.begin + && part.begin_insert_time < range.end + && part.end_insert_time > range.begin && part.view_metadata.file_schema_hash == file_schema_hash { partitions.push(part.clone()); diff --git a/rust/analytics/src/lakehouse/partition_source_data.rs b/rust/analytics/src/lakehouse/partition_source_data.rs index e2ce19fc..ab509590 100644 --- a/rust/analytics/src/lakehouse/partition_source_data.rs +++ b/rust/analytics/src/lakehouse/partition_source_data.rs @@ -1,12 +1,14 @@ +use super::partition_cache::PartitionCache; use crate::{ + dfext::get_column::get_column, lakehouse::{blocks_view::BlocksView, query::query_single_view}, time::TimeRange, }; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use datafusion::arrow::array::{ - Array, ArrayRef, AsArray, BinaryArray, GenericListArray, Int32Array, Int64Array, RecordBatch, - StringArray, StructArray, TimestampNanosecondArray, + Array, ArrayRef, AsArray, BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray, + StructArray, TimestampNanosecondArray, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use micromegas_telemetry::property::Property; @@ -15,8 +17,6 @@ use micromegas_tracing::prelude::*; use std::sync::Arc; use uuid::Uuid; -use super::partition_cache::QueryPartitionProvider; - pub struct PartitionSourceBlock { pub block: BlockMetadata, pub stream: Arc, @@ -34,16 +34,6 @@ pub fn hash_to_object_count(hash: &[u8]) -> Result { )) } -pub fn get_column<'a, T: core::any::Any>(rc: &'a RecordBatch, column_name: &str) -> Result<&'a T> { - let column = rc - .column_by_name(column_name) - .with_context(|| format!("getting column {column_name}"))?; - column - .as_any() - .downcast_ref::() - .with_context(|| format!("casting {column_name}: {:?}", column.data_type())) -} - fn read_property_list(value: ArrayRef) -> Result> { let properties: &StructArray = value.as_struct(); let (key_index, _key_field) = properties @@ -66,7 +56,7 @@ fn read_property_list(value: ArrayRef) -> Result> { pub async fn fetch_partition_source_data( lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, source_stream_tag: &str, @@ -93,7 +83,7 @@ pub async fn fetch_partition_source_data( let mut partition_src_blocks = vec![]; let blocks_answer = query_single_view( lake, - part_provider, + existing_partitions, Some(TimeRange::new(begin_insert, end_insert)), &sql, blocks_view, diff --git a/rust/analytics/src/lakehouse/processes_view.rs b/rust/analytics/src/lakehouse/processes_view.rs index 9a48f1ab..2dc9c2b6 100644 --- a/rust/analytics/src/lakehouse/processes_view.rs +++ b/rust/analytics/src/lakehouse/processes_view.rs @@ -2,7 +2,7 @@ use crate::time::TimeRange; use super::{ metadata_partition_spec::fetch_metadata_partition_spec, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, }; @@ -86,7 +86,7 @@ impl View for ProcessesView { async fn make_batch_partition_spec( &self, lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index a2d29c03..3e1af64b 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -1,6 +1,7 @@ use super::{ - partition_cache::{NullPartitionProvider, QueryPartitionProvider}, + partition_cache::{NullPartitionProvider, PartitionCache}, query::make_session_context, + sql_partition_spec::fetch_sql_partition_spec, view::{PartitionSpec, View}, view_factory::ViewFactory, }; @@ -23,9 +24,11 @@ pub struct SqlBatchView { transform_query: Arc, merge_partitions_query: Arc, schema: Arc, + view_factory: Arc, } impl SqlBatchView { + #[allow(clippy::too_many_arguments)] pub async fn new( view_set_name: Arc, view_instance_id: Arc, @@ -37,7 +40,7 @@ impl SqlBatchView { view_factory: Arc, ) -> Result { let null_part_provider = Arc::new(NullPartitionProvider {}); - let ctx = make_session_context(lake, null_part_provider, None, view_factory) + let ctx = make_session_context(lake, null_part_provider, None, view_factory.clone()) .await .with_context(|| "make_session_context")?; let src_df = ctx.sql(&src_query).await?; @@ -60,6 +63,7 @@ impl SqlBatchView { transform_query, merge_partitions_query, schema, + view_factory, }) } } @@ -76,12 +80,35 @@ impl View for SqlBatchView { async fn make_batch_partition_spec( &self, - _lake: Arc, - _part_provider: Arc, - _begin_insert: DateTime, - _end_insert: DateTime, + lake: Arc, + existing_partitions: Arc, + begin_insert: DateTime, + end_insert: DateTime, ) -> Result> { - todo!(); + let partitions_in_range = + Arc::new(existing_partitions.filter_insert_range(begin_insert, end_insert)); + let ctx = make_session_context( + lake.clone(), + partitions_in_range.clone(), + None, + self.view_factory.clone(), + ) + .await + .with_context(|| "make_session_context")?; + let src_df = ctx.sql(&self.src_query).await?; + let src_view = src_df.into_view(); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + src_view, + )?; + + Ok(Arc::new( + fetch_sql_partition_spec(ctx, begin_insert, end_insert) + .await + .with_context(|| "fetch_sql_partition_spec")?, + )) } fn get_file_schema_hash(&self) -> Vec { let mut hasher = DefaultHasher::new(); diff --git a/rust/analytics/src/lakehouse/sql_partition_spec.rs b/rust/analytics/src/lakehouse/sql_partition_spec.rs new file mode 100644 index 00000000..e11efb8c --- /dev/null +++ b/rust/analytics/src/lakehouse/sql_partition_spec.rs @@ -0,0 +1,56 @@ +use super::view::PartitionSpec; +use crate::{dfext::get_column::get_column, response_writer::Logger}; +use anyhow::Result; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion::{ + arrow::array::{Int64Array, RecordBatch}, + prelude::*, +}; +use micromegas_ingestion::data_lake_connection::DataLakeConnection; +use std::sync::Arc; + +pub struct SqlPartitionSpec { + begin_insert: DateTime, + end_insert: DateTime, + record_count: i64, +} + +impl SqlPartitionSpec { + pub fn new(begin_insert: DateTime, end_insert: DateTime, record_count: i64) -> Self { + Self { + begin_insert, + end_insert, + record_count, + } + } +} + +#[async_trait] +impl PartitionSpec for SqlPartitionSpec { + fn get_source_data_hash(&self) -> Vec { + self.record_count.to_le_bytes().to_vec() + } + async fn write(&self, _lake: Arc, _logger: Arc) -> Result<()> { + todo!(); + } +} + +pub async fn fetch_sql_partition_spec( + ctx: SessionContext, + begin_insert: DateTime, + end_insert: DateTime, +) -> Result { + let df = ctx.sql("SELECT COUNT(*) as count FROM source;").await?; + let batches: Vec = df.collect().await?; + if batches.len() != 1 { + anyhow::bail!("fetch_sql_partition_spec: query should return a single batch"); + } + let rb = &batches[0]; + let count_column: &Int64Array = get_column(rb, "count")?; + if count_column.len() != 1 { + anyhow::bail!("fetch_sql_partition_spec: query should return a single row"); + } + let count = count_column.value(0); + Ok(SqlPartitionSpec::new(begin_insert, end_insert, count)) +} diff --git a/rust/analytics/src/lakehouse/streams_view.rs b/rust/analytics/src/lakehouse/streams_view.rs index 242215a6..57c08f36 100644 --- a/rust/analytics/src/lakehouse/streams_view.rs +++ b/rust/analytics/src/lakehouse/streams_view.rs @@ -2,7 +2,7 @@ use crate::time::TimeRange; use super::{ metadata_partition_spec::fetch_metadata_partition_spec, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, }; @@ -80,7 +80,7 @@ impl View for StreamsView { async fn make_batch_partition_spec( &self, lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { diff --git a/rust/analytics/src/lakehouse/thread_spans_view.rs b/rust/analytics/src/lakehouse/thread_spans_view.rs index e20e214e..c980de9e 100644 --- a/rust/analytics/src/lakehouse/thread_spans_view.rs +++ b/rust/analytics/src/lakehouse/thread_spans_view.rs @@ -1,6 +1,6 @@ use super::{ jit_partitions::{generate_jit_partitions, is_jit_partition_up_to_date}, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, partition_source_data::{hash_to_object_count, PartitionSourceDataBlocks}, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, @@ -191,7 +191,7 @@ impl View for ThreadSpansView { async fn make_batch_partition_spec( &self, _lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, _begin_insert: DateTime, _end_insert: DateTime, ) -> Result> { diff --git a/rust/analytics/src/lakehouse/view.rs b/rust/analytics/src/lakehouse/view.rs index 8c364789..4f513488 100644 --- a/rust/analytics/src/lakehouse/view.rs +++ b/rust/analytics/src/lakehouse/view.rs @@ -1,3 +1,4 @@ +use super::partition_cache::PartitionCache; use crate::{response_writer::Logger, time::TimeRange}; use anyhow::Result; use async_trait::async_trait; @@ -6,8 +7,6 @@ use datafusion::{arrow::datatypes::Schema, logical_expr::Expr}; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use std::sync::Arc; -use super::partition_cache::QueryPartitionProvider; - #[async_trait] pub trait PartitionSpec: Send + Sync { fn get_source_data_hash(&self) -> Vec; @@ -34,7 +33,7 @@ pub trait View: std::fmt::Debug + Send + Sync { async fn make_batch_partition_spec( &self, lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result>; diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index f6b54fc0..3be0c27d 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -1,8 +1,13 @@ use anyhow::{Context, Result}; +use chrono::DurationRound; +use chrono::{TimeDelta, Utc}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; +use micromegas_analytics::lakehouse::partition_cache::PartitionCache; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::default_view_factory; use micromegas_analytics::lakehouse::{sql_batch_view::SqlBatchView, view_factory::ViewFactory}; +use micromegas_analytics::response_writer::ResponseWriter; use micromegas_ingestion::data_lake_connection::{connect_to_data_lake, DataLakeConnection}; use micromegas_telemetry_sink::TelemetryGuardBuilder; use micromegas_tracing::prelude::*; @@ -22,7 +27,7 @@ async fn make_log_entries_levels_per_process_view( CAST(level==4 as INT) as info, CAST(level==5 as INT) as debug, CAST(level==6 as INT) as trace - FROM log_entries;", + FROM log_entries;", )); let transform_query = Arc::new(String::from( @@ -81,7 +86,8 @@ async fn sql_view_test() -> Result<()> { .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); let view_factory = Arc::new(default_view_factory()?); - let log_summary = make_log_entries_levels_per_process_view(lake, view_factory).await?; + let log_summary_view = + make_log_entries_levels_per_process_view(lake.clone(), view_factory.clone()).await?; let ref_schema = Arc::new(Schema::new(vec![ Field::new( "time_bin", @@ -101,10 +107,63 @@ async fn sql_view_test() -> Result<()> { Field::new("nb_trace", DataType::Int64, true), ])); - assert_eq!(log_summary.get_file_schema(), ref_schema); + assert_eq!(log_summary_view.get_file_schema(), ref_schema); let ref_schema_hash: Vec = vec![219, 37, 165, 158, 123, 73, 39, 204]; - assert_eq!(log_summary.get_file_schema_hash(), ref_schema_hash); + assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); + + let nb_partitions = 2; + let partition_time_delta = TimeDelta::minutes(1); + let now = Utc::now(); + let end_range = now.duration_trunc(partition_time_delta)?; + let begin_range = end_range - (partition_time_delta * nb_partitions); + + let mut partitions = Arc::new( + // we only need the blocks partitions for this call + PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) + .await?, + ); + let null_response_writer = Arc::new(ResponseWriter::new(None)); + let blocks_view = view_factory.make_view("blocks", "global")?; + materialize_partition_range( + partitions.clone(), + lake.clone(), + blocks_view, + begin_range, + end_range, + partition_time_delta, + null_response_writer.clone(), + ) + .await?; + partitions = Arc::new( + PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) + .await?, + ); + let log_entries_view = view_factory.make_view("log_entries", "global")?; + materialize_partition_range( + partitions.clone(), + lake.clone(), + log_entries_view, + begin_range, + end_range, + partition_time_delta, + null_response_writer.clone(), + ) + .await?; + partitions = Arc::new( + PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) + .await?, + ); + materialize_partition_range( + partitions.clone(), + lake.clone(), + Arc::new(log_summary_view), + begin_range, + end_range, + partition_time_delta, + null_response_writer.clone(), + ) + .await?; Ok(()) } diff --git a/rust/public/src/servers/maintenance.rs b/rust/public/src/servers/maintenance.rs index 360981d3..f8afb9d1 100644 --- a/rust/public/src/servers/maintenance.rs +++ b/rust/public/src/servers/maintenance.rs @@ -83,6 +83,7 @@ pub async fn materialize_all_views( let end_range = now.duration_trunc(partition_time_delta)?; let begin_range = end_range - (partition_time_delta * nb_partitions); let mut partitions = Arc::new( + // we only need the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) .await?, ); From b31844bf291d22e578a976dcc0842097adaf474a Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Tue, 21 Jan 2025 10:03:56 -0500 Subject: [PATCH 03/19] materialize sql-defined view --- rust/analytics/src/dfext/get_column.rs | 16 ++- .../src/lakehouse/partition_source_data.rs | 58 +++++----- .../analytics/src/lakehouse/sql_batch_view.rs | 24 +++- .../src/lakehouse/sql_partition_spec.rs | 109 ++++++++++++++++-- rust/public/src/servers/maintenance.rs | 2 +- 5 files changed, 166 insertions(+), 43 deletions(-) diff --git a/rust/analytics/src/dfext/get_column.rs b/rust/analytics/src/dfext/get_column.rs index 0787da84..981911ec 100644 --- a/rust/analytics/src/dfext/get_column.rs +++ b/rust/analytics/src/dfext/get_column.rs @@ -1,6 +1,6 @@ use anyhow::{Context, Result}; -pub fn get_column<'a, T: core::any::Any>( +pub fn typed_column_by_name<'a, T: core::any::Any>( rc: &'a datafusion::arrow::array::RecordBatch, column_name: &str, ) -> Result<&'a T> { @@ -12,3 +12,17 @@ pub fn get_column<'a, T: core::any::Any>( .downcast_ref::() .with_context(|| format!("casting {column_name}: {:?}", column.data_type())) } + +pub fn typed_column( + rc: &datafusion::arrow::array::RecordBatch, + index: usize, +) -> Result<&T> { + let column = rc + .columns() + .get(index) + .with_context(|| format!("getting column {index}"))?; + column + .as_any() + .downcast_ref::() + .with_context(|| format!("casting {index}: {:?}", column.data_type())) +} diff --git a/rust/analytics/src/lakehouse/partition_source_data.rs b/rust/analytics/src/lakehouse/partition_source_data.rs index ab509590..618d3dd7 100644 --- a/rust/analytics/src/lakehouse/partition_source_data.rs +++ b/rust/analytics/src/lakehouse/partition_source_data.rs @@ -1,6 +1,6 @@ use super::partition_cache::PartitionCache; use crate::{ - dfext::get_column::get_column, + dfext::get_column::typed_column_by_name, lakehouse::{blocks_view::BlocksView, query::query_single_view}, time::TimeRange, }; @@ -91,38 +91,42 @@ pub async fn fetch_partition_source_data( .await .with_context(|| "blocks query")?; for b in blocks_answer.record_batches { - let block_id_column: &StringArray = get_column(&b, "block_id")?; - let stream_id_column: &StringArray = get_column(&b, "stream_id")?; - let process_id_column: &StringArray = get_column(&b, "process_id")?; - let begin_time_column: &TimestampNanosecondArray = get_column(&b, "begin_time")?; - let begin_ticks_column: &Int64Array = get_column(&b, "begin_ticks")?; - let end_time_column: &TimestampNanosecondArray = get_column(&b, "end_time")?; - let end_ticks_column: &Int64Array = get_column(&b, "end_ticks")?; - let nb_objects_column: &Int32Array = get_column(&b, "nb_objects")?; - let object_offset_column: &Int64Array = get_column(&b, "object_offset")?; - let payload_size_column: &Int64Array = get_column(&b, "payload_size")?; + let block_id_column: &StringArray = typed_column_by_name(&b, "block_id")?; + let stream_id_column: &StringArray = typed_column_by_name(&b, "stream_id")?; + let process_id_column: &StringArray = typed_column_by_name(&b, "process_id")?; + let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?; + let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?; + let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?; + let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?; + let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?; + let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?; + let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?; let block_insert_time_column: &TimestampNanosecondArray = - get_column(&b, "block_insert_time")?; + typed_column_by_name(&b, "block_insert_time")?; let dependencies_metadata_column: &BinaryArray = - get_column(&b, "streams.dependencies_metadata")?; - let objects_metadata_column: &BinaryArray = get_column(&b, "streams.objects_metadata")?; - let stream_tags_column: &GenericListArray = get_column(&b, "streams.tags")?; + typed_column_by_name(&b, "streams.dependencies_metadata")?; + let objects_metadata_column: &BinaryArray = + typed_column_by_name(&b, "streams.objects_metadata")?; + let stream_tags_column: &GenericListArray = typed_column_by_name(&b, "streams.tags")?; let stream_properties_column: &GenericListArray = - get_column(&b, "streams.properties")?; + typed_column_by_name(&b, "streams.properties")?; let process_start_time_column: &TimestampNanosecondArray = - get_column(&b, "processes.start_time")?; - let process_start_ticks_column: &Int64Array = get_column(&b, "processes.start_ticks")?; - let process_tsc_freq_column: &Int64Array = get_column(&b, "processes.tsc_frequency")?; - let process_exe_column: &StringArray = get_column(&b, "processes.exe")?; - let process_username_column: &StringArray = get_column(&b, "processes.username")?; - let process_realname_column: &StringArray = get_column(&b, "processes.realname")?; - let process_computer_column: &StringArray = get_column(&b, "processes.computer")?; - let process_distro_column: &StringArray = get_column(&b, "processes.distro")?; - let process_cpu_column: &StringArray = get_column(&b, "processes.cpu_brand")?; - let process_parent_column: &StringArray = get_column(&b, "processes.parent_process_id")?; + typed_column_by_name(&b, "processes.start_time")?; + let process_start_ticks_column: &Int64Array = + typed_column_by_name(&b, "processes.start_ticks")?; + let process_tsc_freq_column: &Int64Array = + typed_column_by_name(&b, "processes.tsc_frequency")?; + let process_exe_column: &StringArray = typed_column_by_name(&b, "processes.exe")?; + let process_username_column: &StringArray = typed_column_by_name(&b, "processes.username")?; + let process_realname_column: &StringArray = typed_column_by_name(&b, "processes.realname")?; + let process_computer_column: &StringArray = typed_column_by_name(&b, "processes.computer")?; + let process_distro_column: &StringArray = typed_column_by_name(&b, "processes.distro")?; + let process_cpu_column: &StringArray = typed_column_by_name(&b, "processes.cpu_brand")?; + let process_parent_column: &StringArray = + typed_column_by_name(&b, "processes.parent_process_id")?; let process_properties_column: &GenericListArray = - get_column(&b, "processes.properties")?; + typed_column_by_name(&b, "processes.properties")?; for ir in 0..b.num_rows() { let block_insert_time = block_insert_time_column.value(ir); diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index 3e1af64b..c0baaf33 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -2,7 +2,7 @@ use super::{ partition_cache::{NullPartitionProvider, PartitionCache}, query::make_session_context, sql_partition_spec::fetch_sql_partition_spec, - view::{PartitionSpec, View}, + view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewFactory, }; use crate::time::TimeRange; @@ -22,7 +22,7 @@ pub struct SqlBatchView { event_time_column: Arc, // could be more general - for filtering src_query: Arc, transform_query: Arc, - merge_partitions_query: Arc, + _merge_partitions_query: Arc, schema: Arc, view_factory: Arc, } @@ -61,7 +61,7 @@ impl SqlBatchView { event_time_column, src_query, transform_query, - merge_partitions_query, + _merge_partitions_query: merge_partitions_query, schema, view_factory, }) @@ -85,6 +85,11 @@ impl View for SqlBatchView { begin_insert: DateTime, end_insert: DateTime, ) -> Result> { + let view_meta = ViewMetadata { + view_set_name: self.get_view_set_name(), + view_instance_id: self.get_view_instance_id(), + file_schema_hash: self.get_file_schema_hash(), + }; let partitions_in_range = Arc::new(existing_partitions.filter_insert_range(begin_insert, end_insert)); let ctx = make_session_context( @@ -105,9 +110,16 @@ impl View for SqlBatchView { )?; Ok(Arc::new( - fetch_sql_partition_spec(ctx, begin_insert, end_insert) - .await - .with_context(|| "fetch_sql_partition_spec")?, + fetch_sql_partition_spec( + ctx, + self.transform_query.clone(), + self.event_time_column.clone(), + view_meta, + begin_insert, + end_insert, + ) + .await + .with_context(|| "fetch_sql_partition_spec")?, )) } fn get_file_schema_hash(&self) -> Vec { diff --git a/rust/analytics/src/lakehouse/sql_partition_spec.rs b/rust/analytics/src/lakehouse/sql_partition_spec.rs index e11efb8c..b5766d74 100644 --- a/rust/analytics/src/lakehouse/sql_partition_spec.rs +++ b/rust/analytics/src/lakehouse/sql_partition_spec.rs @@ -1,24 +1,49 @@ -use super::view::PartitionSpec; -use crate::{dfext::get_column::get_column, response_writer::Logger}; +use super::{ + view::{PartitionSpec, ViewMetadata}, + write_partition::write_partition_from_rows, +}; +use crate::{ + dfext::get_column::{typed_column, typed_column_by_name}, + lakehouse::write_partition::PartitionRowSet, + response_writer::Logger, +}; use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::{ - arrow::array::{Int64Array, RecordBatch}, + arrow::array::{Int64Array, RecordBatch, TimestampNanosecondArray}, + functions_aggregate::min_max::{max, min}, prelude::*, }; +use futures::StreamExt; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use std::sync::Arc; pub struct SqlPartitionSpec { + ctx: SessionContext, + transform_query: Arc, + event_time_column: Arc, + view_metadata: ViewMetadata, begin_insert: DateTime, end_insert: DateTime, record_count: i64, } impl SqlPartitionSpec { - pub fn new(begin_insert: DateTime, end_insert: DateTime, record_count: i64) -> Self { + pub fn new( + ctx: SessionContext, + transform_query: Arc, + event_time_column: Arc, + view_metadata: ViewMetadata, + begin_insert: DateTime, + end_insert: DateTime, + record_count: i64, + ) -> Self { Self { + ctx, + transform_query, + event_time_column, + view_metadata, begin_insert, end_insert, record_count, @@ -31,13 +56,73 @@ impl PartitionSpec for SqlPartitionSpec { fn get_source_data_hash(&self) -> Vec { self.record_count.to_le_bytes().to_vec() } - async fn write(&self, _lake: Arc, _logger: Arc) -> Result<()> { - todo!(); + + async fn write(&self, lake: Arc, logger: Arc) -> Result<()> { + if self.record_count == 0 { + return Ok(()); + } + let desc = format!( + "[{}, {}] {} {}", + self.view_metadata.view_set_name, + self.view_metadata.view_instance_id, + self.begin_insert.to_rfc3339(), + self.end_insert.to_rfc3339() + ); + logger.write_log_entry(format!("writing {desc}")).await?; + let df = self.ctx.sql(&self.transform_query).await?; + let schema = df.schema().inner().clone(); + let mut stream = df.execute_stream().await?; + + let (tx, rx) = tokio::sync::mpsc::channel(1); + let join_handle = tokio::spawn(write_partition_from_rows( + lake.clone(), + self.view_metadata.clone(), + schema, + self.begin_insert, + self.end_insert, + self.get_source_data_hash(), + rx, + logger.clone(), + )); + + while let Some(rb_res) = stream.next().await { + let rb = rb_res?; + let df = self.ctx.read_batch(rb.clone())?; + let df = df.aggregate( + vec![], + vec![ + min(col(&*self.event_time_column)), + max(col(&*self.event_time_column)), + ], + )?; + let minmax = df.collect().await?; + if minmax.len() != 1 { + anyhow::bail!("expected minmax to be size 1"); + } + let minmax = &minmax[0]; + let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?; + let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?; + if min_column.is_empty() || max_column.is_empty() { + anyhow::bail!("expected minmax to be size 1"); + } + tx.send(PartitionRowSet { + min_time_row: DateTime::from_timestamp_nanos(min_column.value(0)), + max_time_row: DateTime::from_timestamp_nanos(max_column.value(0)), + rows: rb, + }) + .await?; + } + drop(tx); + join_handle.await??; + Ok(()) } } pub async fn fetch_sql_partition_spec( ctx: SessionContext, + transform_query: Arc, + event_time_column: Arc, + view_metadata: ViewMetadata, begin_insert: DateTime, end_insert: DateTime, ) -> Result { @@ -47,10 +132,18 @@ pub async fn fetch_sql_partition_spec( anyhow::bail!("fetch_sql_partition_spec: query should return a single batch"); } let rb = &batches[0]; - let count_column: &Int64Array = get_column(rb, "count")?; + let count_column: &Int64Array = typed_column_by_name(rb, "count")?; if count_column.len() != 1 { anyhow::bail!("fetch_sql_partition_spec: query should return a single row"); } let count = count_column.value(0); - Ok(SqlPartitionSpec::new(begin_insert, end_insert, count)) + Ok(SqlPartitionSpec::new( + ctx, + transform_query, + event_time_column, + view_metadata, + begin_insert, + end_insert, + count, + )) } diff --git a/rust/public/src/servers/maintenance.rs b/rust/public/src/servers/maintenance.rs index f8afb9d1..1a39b9d1 100644 --- a/rust/public/src/servers/maintenance.rs +++ b/rust/public/src/servers/maintenance.rs @@ -83,7 +83,7 @@ pub async fn materialize_all_views( let end_range = now.duration_trunc(partition_time_delta)?; let begin_range = end_range - (partition_time_delta * nb_partitions); let mut partitions = Arc::new( - // we only need the blocks partitions for this call + // we only need the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) .await?, ); From fd201836fb455f7091df8bf6f3553560b969bb09 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Tue, 21 Jan 2025 10:32:52 -0500 Subject: [PATCH 04/19] removed maker for views that can only by global --- rust/analytics/src/lakehouse/blocks_view.rs | 13 ------------ .../analytics/src/lakehouse/processes_view.rs | 16 +------------- rust/analytics/src/lakehouse/streams_view.rs | 16 +------------- rust/analytics/src/lakehouse/view_factory.rs | 21 +++++++++---------- rust/analytics/tests/sql_view_test.rs | 3 ++- rust/public/src/servers/maintenance.rs | 11 ++++++---- 6 files changed, 21 insertions(+), 59 deletions(-) diff --git a/rust/analytics/src/lakehouse/blocks_view.rs b/rust/analytics/src/lakehouse/blocks_view.rs index 9f7aa6be..30540873 100644 --- a/rust/analytics/src/lakehouse/blocks_view.rs +++ b/rust/analytics/src/lakehouse/blocks_view.rs @@ -2,7 +2,6 @@ use super::{ metadata_partition_spec::fetch_metadata_partition_spec, partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, - view_factory::ViewMaker, }; use crate::time::TimeRange; use anyhow::{Context, Result}; @@ -19,18 +18,6 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "blocks"; const VIEW_INSTANCE_ID: &str = "global"; -#[derive(Debug)] -pub struct BlocksViewMaker {} - -impl ViewMaker for BlocksViewMaker { - fn make_view(&self, view_instance_id: &str) -> Result> { - if view_instance_id != "global" { - anyhow::bail!("only global view instance id is supported for metadata views"); - } - Ok(Arc::new(BlocksView::new()?)) - } -} - #[derive(Debug)] pub struct BlocksView { view_set_name: Arc, diff --git a/rust/analytics/src/lakehouse/processes_view.rs b/rust/analytics/src/lakehouse/processes_view.rs index 2dc9c2b6..b8acc554 100644 --- a/rust/analytics/src/lakehouse/processes_view.rs +++ b/rust/analytics/src/lakehouse/processes_view.rs @@ -4,7 +4,6 @@ use super::{ metadata_partition_spec::fetch_metadata_partition_spec, partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, - view_factory::ViewMaker, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -20,15 +19,6 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "processes"; const VIEW_INSTANCE_ID: &str = "global"; -#[derive(Debug)] -pub struct ProcessesViewMaker {} - -impl ViewMaker for ProcessesViewMaker { - fn make_view(&self, view_instance_id: &str) -> Result> { - Ok(Arc::new(ProcessesView::new(view_instance_id)?)) - } -} - #[derive(Debug)] pub struct ProcessesView { view_set_name: Arc, @@ -38,11 +28,7 @@ pub struct ProcessesView { } impl ProcessesView { - pub fn new(view_instance_id: &str) -> Result { - if view_instance_id != "global" { - anyhow::bail!("only global view instance id is supported for metadata views"); - } - + pub fn new() -> Result { let data_sql = Arc::new(String::from( "SELECT process_id, exe, diff --git a/rust/analytics/src/lakehouse/streams_view.rs b/rust/analytics/src/lakehouse/streams_view.rs index 57c08f36..1cc7802a 100644 --- a/rust/analytics/src/lakehouse/streams_view.rs +++ b/rust/analytics/src/lakehouse/streams_view.rs @@ -4,7 +4,6 @@ use super::{ metadata_partition_spec::fetch_metadata_partition_spec, partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, - view_factory::ViewMaker, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -20,15 +19,6 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "streams"; const VIEW_INSTANCE_ID: &str = "global"; -#[derive(Debug)] -pub struct StreamsViewMaker {} - -impl ViewMaker for StreamsViewMaker { - fn make_view(&self, view_instance_id: &str) -> Result> { - Ok(Arc::new(StreamsView::new(view_instance_id)?)) - } -} - #[derive(Debug)] pub struct StreamsView { view_set_name: Arc, @@ -38,11 +28,7 @@ pub struct StreamsView { } impl StreamsView { - pub fn new(view_instance_id: &str) -> Result { - if view_instance_id != "global" { - anyhow::bail!("only global view instance id is supported for metadata views"); - } - + pub fn new() -> Result { let data_sql = Arc::new(String::from( "SELECT stream_id, process_id, diff --git a/rust/analytics/src/lakehouse/view_factory.rs b/rust/analytics/src/lakehouse/view_factory.rs index 26499ecf..cdf19660 100644 --- a/rust/analytics/src/lakehouse/view_factory.rs +++ b/rust/analytics/src/lakehouse/view_factory.rs @@ -136,9 +136,11 @@ //! //! //! +use super::blocks_view::BlocksView; +use super::processes_view::ProcessesView; +use super::streams_view::StreamsView; use super::{ - blocks_view::BlocksViewMaker, log_view::LogViewMaker, metrics_view::MetricsViewMaker, - processes_view::ProcessesViewMaker, streams_view::StreamsViewMaker, + log_view::LogViewMaker, metrics_view::MetricsViewMaker, thread_spans_view::ThreadSpansViewMaker, view::View, }; use anyhow::Result; @@ -183,15 +185,15 @@ impl ViewFactory { pub fn default_view_factory() -> Result { let log_view_maker = Arc::new(LogViewMaker {}); let metrics_view_maker = Arc::new(MetricsViewMaker {}); - let processes_view_maker = Arc::new(ProcessesViewMaker {}); - let streams_view_maker = Arc::new(StreamsViewMaker {}); - let blocks_view_maker = Arc::new(BlocksViewMaker {}); + let processes_view = Arc::new(ProcessesView::new()?); + let streams_view = Arc::new(StreamsView::new()?); + let blocks_view = Arc::new(BlocksView::new()?); let global_views = vec![ log_view_maker.make_view("global")?, metrics_view_maker.make_view("global")?, - processes_view_maker.make_view("global")?, - streams_view_maker.make_view("global")?, - blocks_view_maker.make_view("global")?, + processes_view, + streams_view, + blocks_view, ]; let mut factory = ViewFactory::new(global_views); factory.add_view_set(String::from("log_entries"), log_view_maker); @@ -200,8 +202,5 @@ pub fn default_view_factory() -> Result { String::from("thread_spans"), Arc::new(ThreadSpansViewMaker {}), ); - factory.add_view_set(String::from("processes"), processes_view_maker); - factory.add_view_set(String::from("streams"), streams_view_maker); - factory.add_view_set(String::from("blocks"), blocks_view_maker); Ok(factory) } diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index 3be0c27d..d18aef4a 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -3,6 +3,7 @@ use chrono::DurationRound; use chrono::{TimeDelta, Utc}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; +use micromegas_analytics::lakehouse::blocks_view::BlocksView; use micromegas_analytics::lakehouse::partition_cache::PartitionCache; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::default_view_factory; @@ -124,7 +125,7 @@ async fn sql_view_test() -> Result<()> { .await?, ); let null_response_writer = Arc::new(ResponseWriter::new(None)); - let blocks_view = view_factory.make_view("blocks", "global")?; + let blocks_view = Arc::new(BlocksView::new()?); materialize_partition_range( partitions.clone(), lake.clone(), diff --git a/rust/public/src/servers/maintenance.rs b/rust/public/src/servers/maintenance.rs index 1a39b9d1..4c4938bf 100644 --- a/rust/public/src/servers/maintenance.rs +++ b/rust/public/src/servers/maintenance.rs @@ -3,7 +3,10 @@ use chrono::{DateTime, DurationRound}; use chrono::{TimeDelta, Utc}; use micromegas_analytics::delete::delete_old_data; use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; +use micromegas_analytics::lakehouse::blocks_view::BlocksView; use micromegas_analytics::lakehouse::partition_cache::PartitionCache; +use micromegas_analytics::lakehouse::processes_view::ProcessesView; +use micromegas_analytics::lakehouse::streams_view::StreamsView; use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::ViewFactory; @@ -152,10 +155,10 @@ pub async fn every_second( } pub async fn daemon(lake: Arc, view_factory: Arc) -> Result<()> { - let blocks_view = view_factory.make_view("blocks", "global")?; - let views = Arc::new(vec![ - view_factory.make_view("processes", "global")?, - view_factory.make_view("streams", "global")?, + let blocks_view = Arc::new(BlocksView::new()?); + let views: Arc>> = Arc::new(vec![ + Arc::new(ProcessesView::new()?), + Arc::new(StreamsView::new()?), view_factory.make_view("log_entries", "global")?, view_factory.make_view("measures", "global")?, ]); From 3958f00240b6f6d84341bd121edb58216d01298e Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Tue, 21 Jan 2025 11:08:07 -0500 Subject: [PATCH 05/19] query custom view --- .../analytics/src/lakehouse/sql_batch_view.rs | 2 +- rust/analytics/tests/sql_view_test.rs | 26 +++++++++++++++---- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index c0baaf33..6c68b146 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -135,7 +135,7 @@ impl View for SqlBatchView { _lake: Arc, _query_range: Option, ) -> Result<()> { - anyhow::bail!("jit_update not supported on SqlBatchView"); + Ok(()) } fn make_time_filter(&self, _begin: DateTime, _end: DateTime) -> Result> { todo!(); diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index d18aef4a..77ca262b 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -4,7 +4,8 @@ use chrono::{TimeDelta, Utc}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; use micromegas_analytics::lakehouse::blocks_view::BlocksView; -use micromegas_analytics::lakehouse::partition_cache::PartitionCache; +use micromegas_analytics::lakehouse::partition_cache::{LivePartitionProvider, PartitionCache}; +use micromegas_analytics::lakehouse::query::query_single_view; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::default_view_factory; use micromegas_analytics::lakehouse::{sql_batch_view::SqlBatchView, view_factory::ViewFactory}; @@ -87,8 +88,9 @@ async fn sql_view_test() -> Result<()> { .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); let view_factory = Arc::new(default_view_factory()?); - let log_summary_view = - make_log_entries_levels_per_process_view(lake.clone(), view_factory.clone()).await?; + let log_summary_view = Arc::new( + make_log_entries_levels_per_process_view(lake.clone(), view_factory.clone()).await?, + ); let ref_schema = Arc::new(Schema::new(vec![ Field::new( "time_bin", @@ -113,7 +115,7 @@ async fn sql_view_test() -> Result<()> { let ref_schema_hash: Vec = vec![219, 37, 165, 158, 123, 73, 39, 204]; assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); - let nb_partitions = 2; + let nb_partitions = 3; let partition_time_delta = TimeDelta::minutes(1); let now = Utc::now(); let end_range = now.duration_trunc(partition_time_delta)?; @@ -158,7 +160,7 @@ async fn sql_view_test() -> Result<()> { materialize_partition_range( partitions.clone(), lake.clone(), - Arc::new(log_summary_view), + log_summary_view.clone(), begin_range, end_range, partition_time_delta, @@ -166,5 +168,19 @@ async fn sql_view_test() -> Result<()> { ) .await?; + let answer = query_single_view( + lake.clone(), + Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), + None, + "SELECT * FROM log_entries_per_process;", + log_summary_view, + ) + .await?; + let pretty_results = + datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); + eprintln!("{pretty_results}"); + + info!("bye"); + Ok(()) } From ba206c5ebb823b7430ffb9b39932138ad6e44658 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Tue, 21 Jan 2025 11:21:57 -0500 Subject: [PATCH 06/19] deadlock on shutdown --- rust/tracing/src/dispatch.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rust/tracing/src/dispatch.rs b/rust/tracing/src/dispatch.rs index 7ac8cf6f..ac96124f 100644 --- a/rust/tracing/src/dispatch.rs +++ b/rust/tracing/src/dispatch.rs @@ -370,7 +370,11 @@ impl Dispatch { } pub fn get_sink(&self) -> Arc { - self.sink.read().unwrap().clone() + if let Ok(guard) = self.sink.read() { + guard.clone() + } else { + Arc::new(NullEventSink {}) + } } fn shutdown(&self) { From 09e4b8c8a8a8d2b652440208def79075398d5ee3 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Tue, 21 Jan 2025 11:45:47 -0500 Subject: [PATCH 07/19] logging may fail while shutting down --- rust/tracing/src/dispatch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/tracing/src/dispatch.rs b/rust/tracing/src/dispatch.rs index ac96124f..da35a47d 100644 --- a/rust/tracing/src/dispatch.rs +++ b/rust/tracing/src/dispatch.rs @@ -370,7 +370,7 @@ impl Dispatch { } pub fn get_sink(&self) -> Arc { - if let Ok(guard) = self.sink.read() { + if let Ok(guard) = self.sink.try_read() { guard.clone() } else { Arc::new(NullEventSink {}) From 078d83a7c9a4e5806f6ee0d56367bdfbb402b812 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 09:36:37 -0500 Subject: [PATCH 08/19] make flush monitor configurable by env variable (for tests) --- .../lakehouse/{table_provider.rs => materialized_view.rs} | 0 rust/tracing/src/flush_monitor.rs | 8 ++++++-- 2 files changed, 6 insertions(+), 2 deletions(-) rename rust/analytics/src/lakehouse/{table_provider.rs => materialized_view.rs} (100%) diff --git a/rust/analytics/src/lakehouse/table_provider.rs b/rust/analytics/src/lakehouse/materialized_view.rs similarity index 100% rename from rust/analytics/src/lakehouse/table_provider.rs rename to rust/analytics/src/lakehouse/materialized_view.rs diff --git a/rust/tracing/src/flush_monitor.rs b/rust/tracing/src/flush_monitor.rs index 810e87ad..54e48d25 100644 --- a/rust/tracing/src/flush_monitor.rs +++ b/rust/tracing/src/flush_monitor.rs @@ -39,7 +39,11 @@ impl FlushMonitor { impl Default for FlushMonitor { fn default() -> Self { - // Default is to flush every minute - Self::new(60) + // Default is to flush every minute unless specified by the env variable + const DEFAULT_PERIOD: i64 = 60; + let nb_seconds = std::env::var("MICROMEGAS_FLUSH_PERIOD") + .map(|v| v.parse::().unwrap_or(DEFAULT_PERIOD)) + .unwrap_or(DEFAULT_PERIOD); + Self::new(nb_seconds) } } From 8c47e9a12a1585b8a6c204c519561110e09e08d3 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 09:37:18 -0500 Subject: [PATCH 09/19] shutdown race condition --- rust/tracing/src/dispatch.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/rust/tracing/src/dispatch.rs b/rust/tracing/src/dispatch.rs index da35a47d..d4af0fe5 100644 --- a/rust/tracing/src/dispatch.rs +++ b/rust/tracing/src/dispatch.rs @@ -371,7 +371,7 @@ impl Dispatch { pub fn get_sink(&self) -> Arc { if let Ok(guard) = self.sink.try_read() { - guard.clone() + (*guard).clone() } else { Arc::new(NullEventSink {}) } @@ -379,9 +379,12 @@ impl Dispatch { fn shutdown(&self) { let old_sink = self.get_sink(); - old_sink.on_shutdown(); let null_sink = Arc::new(NullEventSink {}); - *self.sink.write().unwrap() = null_sink; + if let Ok(mut guard) = self.sink.write() { + *guard = null_sink; + drop(guard) + } + old_sink.on_shutdown(); } fn startup(&self) { From 294acff8c6cb4676afcb618bc63db08901882a8c Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 09:38:09 -0500 Subject: [PATCH 10/19] partitioned table provider, execution plan & integration into partition merge --- rust/analytics/src/dfext/min_max_time_df.rs | 37 ++++ rust/analytics/src/dfext/mod.rs | 4 + rust/analytics/src/dfext/predicate.rs | 27 +++ rust/analytics/src/lakehouse/batch_update.rs | 10 +- rust/analytics/src/lakehouse/blocks_view.rs | 12 ++ rust/analytics/src/lakehouse/log_view.rs | 11 ++ .../src/lakehouse/materialized_view.rs | 65 ++----- rust/analytics/src/lakehouse/merge.rs | 184 ++++++++---------- rust/analytics/src/lakehouse/metrics_view.rs | 11 ++ rust/analytics/src/lakehouse/mod.rs | 8 +- .../src/lakehouse/partition_cache.rs | 27 +++ .../src/lakehouse/partition_source_data.rs | 6 + .../lakehouse/partitioned_execution_plan.rs | 42 ++++ .../lakehouse/partitioned_table_provider.rs | 78 ++++++++ .../analytics/src/lakehouse/processes_view.rs | 11 ++ rust/analytics/src/lakehouse/query.rs | 2 +- .../analytics/src/lakehouse/sql_batch_view.rs | 20 +- .../src/lakehouse/sql_partition_spec.rs | 49 ++--- rust/analytics/src/lakehouse/streams_view.rs | 11 ++ .../src/lakehouse/table_scan_rewrite.rs | 2 +- .../src/lakehouse/thread_spans_view.rs | 12 ++ rust/analytics/src/lakehouse/view.rs | 6 + .../lakehouse/view_instance_table_function.rs | 2 +- .../src/lakehouse/write_partition.rs | 2 +- rust/analytics/tests/sql_view_test.rs | 179 +++++++++++------ 25 files changed, 573 insertions(+), 245 deletions(-) create mode 100644 rust/analytics/src/dfext/min_max_time_df.rs create mode 100644 rust/analytics/src/dfext/predicate.rs create mode 100644 rust/analytics/src/lakehouse/partitioned_execution_plan.rs create mode 100644 rust/analytics/src/lakehouse/partitioned_table_provider.rs diff --git a/rust/analytics/src/dfext/min_max_time_df.rs b/rust/analytics/src/dfext/min_max_time_df.rs new file mode 100644 index 00000000..b1f54ae3 --- /dev/null +++ b/rust/analytics/src/dfext/min_max_time_df.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use datafusion::prelude::*; +use datafusion::{ + arrow::array::TimestampNanosecondArray, + functions_aggregate::min_max::{max, min}, +}; + +use super::get_column::typed_column; + +pub async fn min_max_time_dataframe( + df: DataFrame, + min_time_column_name: &str, + max_time_column_name: &str, +) -> Result<(DateTime, DateTime)> { + let df = df.aggregate( + vec![], + vec![ + min(col(min_time_column_name)), + max(col(max_time_column_name)), + ], + )?; + let minmax = df.collect().await?; + if minmax.len() != 1 { + anyhow::bail!("expected minmax to be size 1"); + } + let minmax = &minmax[0]; + let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?; + let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?; + if min_column.is_empty() || max_column.is_empty() { + anyhow::bail!("expected minmax to be size 1"); + } + Ok(( + DateTime::from_timestamp_nanos(min_column.value(0)), + DateTime::from_timestamp_nanos(max_column.value(0)), + )) +} diff --git a/rust/analytics/src/dfext/mod.rs b/rust/analytics/src/dfext/mod.rs index daa5262d..6229b1b0 100644 --- a/rust/analytics/src/dfext/mod.rs +++ b/rust/analytics/src/dfext/mod.rs @@ -6,5 +6,9 @@ pub mod expressions; pub mod get_column; /// Stream a function's log as a table pub mod log_stream_table_provider; +/// Get min & max from the time column +pub mod min_max_time_df; +/// Convert a filtering expression to a physical predicate +pub mod predicate; /// Execution plan interface for an async task pub mod task_log_exec_plan; diff --git a/rust/analytics/src/dfext/predicate.rs b/rust/analytics/src/dfext/predicate.rs new file mode 100644 index 00000000..6578e338 --- /dev/null +++ b/rust/analytics/src/dfext/predicate.rs @@ -0,0 +1,27 @@ +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::Session, + common::DFSchema, + logical_expr::utils::conjunction, + physical_plan::{expressions, PhysicalExpr}, + prelude::*, +}; +use std::sync::Arc; + +// from datafusion/datafusion-examples/examples/advanced_parquet_index.rs +pub fn filters_to_predicate( + schema: SchemaRef, + state: &dyn Session, + filters: &[Expr], +) -> datafusion::error::Result> { + let df_schema = DFSchema::try_from(schema)?; + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()? + // if there are no filters, use a literal true to have a predicate + // that always evaluates to true we can pass to the index + .unwrap_or_else(|| expressions::lit(true)); + + Ok(predicate) +} diff --git a/rust/analytics/src/lakehouse/batch_update.rs b/rust/analytics/src/lakehouse/batch_update.rs index 37231d84..94371b9f 100644 --- a/rust/analytics/src/lakehouse/batch_update.rs +++ b/rust/analytics/src/lakehouse/batch_update.rs @@ -146,7 +146,15 @@ async fn materialize_partition( .with_context(|| "writing partition")?; } PartitionCreationStrategy::MergeExisting => { - create_merged_partition(lake, view, begin_insert, end_insert, logger).await?; + create_merged_partition( + existing_partitions, + lake, + view, + begin_insert, + end_insert, + logger, + ) + .await?; } PartitionCreationStrategy::Abort => {} } diff --git a/rust/analytics/src/lakehouse/blocks_view.rs b/rust/analytics/src/lakehouse/blocks_view.rs index 30540873..dc3b43fa 100644 --- a/rust/analytics/src/lakehouse/blocks_view.rs +++ b/rust/analytics/src/lakehouse/blocks_view.rs @@ -17,6 +17,9 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "blocks"; const VIEW_INSTANCE_ID: &str = "global"; +lazy_static::lazy_static! { + static ref INSERT_TIME_COLUMN: Arc = Arc::new( String::from("insert_time")); +} #[derive(Debug)] pub struct BlocksView { @@ -109,6 +112,7 @@ impl View for BlocksView { anyhow::bail!("not supported"); } + // fetch_partition_source_data relies on this filter being on insert_time fn make_time_filter(&self, begin: DateTime, end: DateTime) -> Result> { let utc: Arc = Arc::from("+00:00"); Ok(vec![Expr::Between(Between::new( @@ -126,6 +130,14 @@ impl View for BlocksView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } } pub fn blocks_view_schema() -> Schema { diff --git a/rust/analytics/src/lakehouse/log_view.rs b/rust/analytics/src/lakehouse/log_view.rs index 2e0826c0..eaaafa5e 100644 --- a/rust/analytics/src/lakehouse/log_view.rs +++ b/rust/analytics/src/lakehouse/log_view.rs @@ -26,6 +26,9 @@ use std::sync::Arc; use uuid::Uuid; const VIEW_SET_NAME: &str = "log_entries"; +lazy_static::lazy_static! { + static ref TIME_COLUMN: Arc = Arc::new( String::from("time")); +} #[derive(Debug)] pub struct LogViewMaker {} @@ -187,4 +190,12 @@ impl View for LogView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } } diff --git a/rust/analytics/src/lakehouse/materialized_view.rs b/rust/analytics/src/lakehouse/materialized_view.rs index b02c9580..99f452ab 100644 --- a/rust/analytics/src/lakehouse/materialized_view.rs +++ b/rust/analytics/src/lakehouse/materialized_view.rs @@ -1,19 +1,16 @@ -use super::{partition_cache::QueryPartitionProvider, view::View}; -use crate::{lakehouse::reader_factory::ReaderFactory, time::TimeRange}; +use super::{ + partition_cache::QueryPartitionProvider, + partitioned_execution_plan::make_partitioned_execution_plan, view::View, +}; +use crate::time::TimeRange; use async_trait::async_trait; use datafusion::{ arrow::datatypes::SchemaRef, catalog::{Session, TableProvider}, - common::DFSchema, - datasource::{ - listing::PartitionedFile, - physical_plan::{parquet::ParquetExecBuilder, FileScanConfig}, - TableType, - }, + datasource::TableType, error::DataFusionError, - execution::object_store::ObjectStoreUrl, - logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}, - physical_plan::{expressions, ExecutionPlan, PhysicalExpr}, + logical_expr::{Expr, TableProviderFilterPushDown}, + physical_plan::ExecutionPlan, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use object_store::ObjectStore; @@ -48,25 +45,6 @@ impl MaterializedView { pub fn get_view(&self) -> Arc { self.view.clone() } - - // from datafusion/datafusion-examples/examples/advanced_parquet_index.rs - fn filters_to_predicate( - &self, - schema: SchemaRef, - state: &dyn Session, - filters: &[Expr], - ) -> datafusion::error::Result> { - let df_schema = DFSchema::try_from(schema)?; - let predicate = conjunction(filters.to_vec()); - let predicate = predicate - .map(|predicate| state.create_physical_expr(predicate, &df_schema)) - .transpose()? - // if there are no filters, use a literal true to have a predicate - // that always evaluates to true we can pass to the index - .unwrap_or_else(|| expressions::lit(true)); - - Ok(predicate) - } } #[async_trait] @@ -90,13 +68,11 @@ impl TableProvider for MaterializedView { filters: &[Expr], limit: Option, ) -> datafusion::error::Result> { - let predicate = self.filters_to_predicate(self.view.get_file_schema(), state, filters)?; self.view .jit_update(self.lake.clone(), self.query_range.clone()) .await .map_err(|e| DataFusionError::External(e.into()))?; - let mut file_group = vec![]; let partitions = self .part_provider .fetch( @@ -108,22 +84,15 @@ impl TableProvider for MaterializedView { .await .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?; - for part in &partitions { - file_group.push(PartitionedFile::new(&part.file_path, part.file_size as u64)); - } - - let schema = self.schema(); - let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); - let file_scan_config = FileScanConfig::new(object_store_url, schema) - .with_limit(limit) - .with_projection(projection.cloned()) - .with_file_groups(vec![file_group]); - let reader_factory = - ReaderFactory::new(Arc::clone(&self.object_store), Arc::new(partitions)); - Ok(ParquetExecBuilder::new(file_scan_config) - .with_predicate(predicate.clone()) - .with_parquet_file_reader_factory(Arc::new(reader_factory)) - .build_arc()) + make_partitioned_execution_plan( + self.schema(), + self.object_store.clone(), + state, + projection, + filters, + limit, + Arc::new(partitions), + ) } /// Tell DataFusion to push filters down to the scan method diff --git a/rust/analytics/src/lakehouse/merge.rs b/rust/analytics/src/lakehouse/merge.rs index d5350a73..0941bde4 100644 --- a/rust/analytics/src/lakehouse/merge.rs +++ b/rust/analytics/src/lakehouse/merge.rs @@ -1,145 +1,127 @@ use super::{ + partition::Partition, + partition_cache::PartitionCache, partition_source_data::hash_to_object_count, + partitioned_table_provider::PartitionedTableProvider, view::View, write_partition::{write_partition_from_rows, PartitionRowSet}, }; -use crate::response_writer::Logger; -use anyhow::{Context, Result}; +use crate::{dfext::min_max_time_df::min_max_time_dataframe, response_writer::Logger}; +use anyhow::Result; use chrono::{DateTime, Utc}; -use datafusion::parquet::arrow::{ - async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder, -}; +use datafusion::{execution::object_store::ObjectStoreUrl, prelude::*, sql::TableReference}; use futures::stream::StreamExt; use micromegas_ingestion::data_lake_connection::DataLakeConnection; -use object_store::path::Path; -use object_store::ObjectMeta; -use sqlx::Row; use std::sync::Arc; use xxhash_rust::xxh32::xxh32; +fn partition_set_stats( + view: Arc, + filtered_partitions: &[Partition], +) -> Result<(i64, i64)> { + let mut sum_size: i64 = 0; + let mut source_hash: i64 = 0; + let latest_file_schema_hash = view.get_file_schema_hash(); + for p in filtered_partitions { + // for some time all the hashes will actually be the number of events in the source data + // when views have different hash algos, we should delegate to the view the creation of the merged hash + source_hash = if p.source_data_hash.len() == std::mem::size_of::() { + source_hash + hash_to_object_count(&p.source_data_hash)? + } else { + //previous hash algo + xxh32(&p.source_data_hash, source_hash as u32).into() + }; + + sum_size += p.file_size; + + if p.view_metadata.file_schema_hash != latest_file_schema_hash { + anyhow::bail!( + "incompatible file schema with [{},{}]", + p.begin_insert_time.to_rfc3339(), + p.end_insert_time.to_rfc3339() + ); + } + } + Ok((sum_size, source_hash)) +} + pub async fn create_merged_partition( + existing_partitions: Arc, lake: Arc, view: Arc, - begin: DateTime, - end: DateTime, + begin_insert: DateTime, + end_insert: DateTime, logger: Arc, ) -> Result<()> { - let view_set_name = view.get_view_set_name().to_string(); - let view_instance_id = view.get_view_instance_id().to_string(); + let view_set_name = &view.get_view_set_name(); + let view_instance_id = &view.get_view_instance_id(); let desc = format!( "[{}, {}] {view_set_name} {view_instance_id}", - begin.to_rfc3339(), - end.to_rfc3339() + begin_insert.to_rfc3339(), + end_insert.to_rfc3339() ); // we are not looking for intersecting partitions, but only those that fit completely in the range - let rows = sqlx::query( - "SELECT file_path, file_size, updated, file_schema_hash, source_data_hash, begin_insert_time, end_insert_time, min_event_time, max_event_time - FROM lakehouse_partitions - WHERE view_set_name = $1 - AND view_instance_id = $2 - AND begin_insert_time >= $3 - AND end_insert_time <= $4 - ;", - ) - .bind(&view_set_name) - .bind(&view_instance_id) - .bind(begin) - .bind(end) - .fetch_all(&lake.db_pool) - .await - .with_context(|| "fetching partitions to merge")?; - if rows.len() < 2 { + // otherwise we'd get duplicated records + let filtered_partitions = existing_partitions + .filter_inside_range(view_set_name, view_instance_id, begin_insert, end_insert) + .partitions; + if filtered_partitions.len() < 2 { logger .write_log_entry(format!("{desc}: not enough partitions to merge")) .await?; return Ok(()); } - let latest_file_schema_hash = view.get_file_schema_hash(); - let mut sum_size: i64 = 0; - let mut source_hash: i64 = 0; - for r in &rows { - // for some time all the hashes will actually be the number of events in the source data - // when views have different hash algos, we should delegate to the view the creation of the merged hash - let source_data_hash: Vec = r.try_get("source_data_hash")?; - source_hash = if source_data_hash.len() == std::mem::size_of::() { - source_hash + hash_to_object_count(&source_data_hash)? - } else { - //previous hash algo - xxh32(&source_data_hash, source_hash as u32).into() - }; - - let file_size: i64 = r.try_get("file_size")?; - sum_size += file_size; - - let file_schema_hash: Vec = r.try_get("file_schema_hash")?; - if file_schema_hash != latest_file_schema_hash { - let begin_insert_time: DateTime = r.try_get("begin_insert_time")?; - let end_insert_time: DateTime = r.try_get("end_insert_time")?; - logger - .write_log_entry(format!( - "{desc}: incompatible file schema with [{},{}]", - begin_insert_time.to_rfc3339(), - end_insert_time.to_rfc3339() - )) - .await?; - return Ok(()); - } - } + let (sum_size, source_hash) = partition_set_stats(view.clone(), &filtered_partitions)?; logger .write_log_entry(format!( "{desc}: merging {} partitions sum_size={sum_size}", - rows.len() + filtered_partitions.len() )) .await?; - + let object_store = lake.blob_storage.inner(); + let table = PartitionedTableProvider::new( + view.get_file_schema(), + object_store.clone(), + Arc::new(filtered_partitions), + ); + let ctx = SessionContext::new(); + let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); + ctx.register_object_store(object_store_url.as_ref(), object_store.clone()); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + Arc::new(table), + )?; + let merged_df = ctx.sql("SELECT * FROM source;").await?; let (tx, rx) = tokio::sync::mpsc::channel(1); let join_handle = tokio::spawn(write_partition_from_rows( lake.clone(), view.get_meta(), view.get_file_schema(), - begin, - end, + begin_insert, + end_insert, source_hash.to_le_bytes().to_vec(), rx, logger.clone(), )); - for r in &rows { - let file_path: String = r.try_get("file_path")?; - let file_size: i64 = r.try_get("file_size")?; - logger - .write_log_entry(format!("reading path={file_path} size={file_size}")) - .await?; - let min_time_row: DateTime = r.try_get("min_event_time")?; - let max_time_row: DateTime = r.try_get("max_event_time")?; - - let updated: DateTime = r.try_get("updated")?; - let meta = ObjectMeta { - location: Path::from(file_path), - last_modified: updated, - size: file_size as usize, - e_tag: None, - version: None, - }; - let reader = ParquetObjectReader::new(lake.blob_storage.inner(), meta); - let builder = ParquetRecordBatchStreamBuilder::new(reader) - .await - .with_context(|| "ParquetRecordBatchStreamBuilder::new")?; - let mut rbstream = builder - .with_batch_size(100 * 1024) // the default is 1024, which seems low - .build() - .with_context(|| "builder.build()")?; - while let Some(rb_res) = rbstream.next().await { - let rows = rb_res?; - tx.send(PartitionRowSet { - min_time_row, - max_time_row, - rows, - }) - .await?; - } + let mut stream = merged_df.execute_stream().await?; + while let Some(rb_res) = stream.next().await { + let rb = rb_res?; + let (mintime, maxtime) = min_max_time_dataframe( + ctx.read_batch(rb.clone())?, + &view.get_min_event_time_column_name(), + &view.get_max_event_time_column_name(), + ) + .await?; + tx.send(PartitionRowSet { + min_time_row: mintime, + max_time_row: maxtime, + rows: rb, + }) + .await?; } drop(tx); join_handle.await??; - Ok(()) } diff --git a/rust/analytics/src/lakehouse/metrics_view.rs b/rust/analytics/src/lakehouse/metrics_view.rs index 0c6d59d1..4e1bd4d2 100644 --- a/rust/analytics/src/lakehouse/metrics_view.rs +++ b/rust/analytics/src/lakehouse/metrics_view.rs @@ -28,6 +28,9 @@ use std::sync::Arc; use uuid::Uuid; const VIEW_SET_NAME: &str = "measures"; +lazy_static::lazy_static! { + static ref TIME_COLUMN: Arc = Arc::new( String::from("time")); +} #[derive(Debug)] pub struct MetricsViewMaker {} @@ -194,4 +197,12 @@ impl View for MetricsView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } } diff --git a/rust/analytics/src/lakehouse/mod.rs b/rust/analytics/src/lakehouse/mod.rs index a67b6df7..b02d3cb1 100644 --- a/rust/analytics/src/lakehouse/mod.rs +++ b/rust/analytics/src/lakehouse/mod.rs @@ -18,6 +18,8 @@ pub mod log_block_processor; pub mod log_view; /// Exposes materialize_partitions as a table function pub mod materialize_partitions_table_function; +/// TableProvider implementation for the lakehouse +pub mod materialized_view; /// Merge consecutive parquet partitions into a single file pub mod merge; /// Specification for a view partition backed by a table in the postgresql metadata database. @@ -34,6 +36,10 @@ pub mod partition; pub mod partition_cache; /// Describes the event blocks backing a partition pub mod partition_source_data; +/// ExecutionPlan based on a set of parquet files +pub mod partitioned_execution_plan; +/// TableProvider based on a set of parquet files +pub mod partitioned_table_provider; /// Replicated view of the `processes` table of the postgresql metadata database. pub mod processes_view; /// property_get function support from SQL @@ -50,8 +56,6 @@ pub mod sql_batch_view; pub mod sql_partition_spec; /// Replicated view of the `streams` table of the postgresql metadata database. pub mod streams_view; -/// TableProvider implementation for the lakehouse -pub mod table_provider; /// Rewrite table scans to take the query range into account pub mod table_scan_rewrite; /// Tracking of expired partitions diff --git a/rust/analytics/src/lakehouse/partition_cache.rs b/rust/analytics/src/lakehouse/partition_cache.rs index 0c018ac2..7b41faaa 100644 --- a/rust/analytics/src/lakehouse/partition_cache.rs +++ b/rust/analytics/src/lakehouse/partition_cache.rs @@ -91,6 +91,7 @@ impl PartitionCache { }) } + // overlap test for a specific view pub fn filter( &self, view_set_name: &str, @@ -115,6 +116,7 @@ impl PartitionCache { } } + // overlap test for a all views pub fn filter_insert_range( &self, begin_insert: DateTime, @@ -132,6 +134,31 @@ impl PartitionCache { end_insert, } } + + // single view that fits completely in the specified range + pub fn filter_inside_range( + &self, + view_set_name: &str, + view_instance_id: &str, + begin_insert: DateTime, + end_insert: DateTime, + ) -> Self { + let mut partitions = vec![]; + for part in &self.partitions { + if *part.view_metadata.view_set_name == view_set_name + && *part.view_metadata.view_instance_id == view_instance_id + && part.begin_insert_time >= begin_insert + && part.end_insert_time <= end_insert + { + partitions.push(part.clone()); + } + } + Self { + partitions, + begin_insert, + end_insert, + } + } } #[async_trait] diff --git a/rust/analytics/src/lakehouse/partition_source_data.rs b/rust/analytics/src/lakehouse/partition_source_data.rs index 618d3dd7..f1aaf8f3 100644 --- a/rust/analytics/src/lakehouse/partition_source_data.rs +++ b/rust/analytics/src/lakehouse/partition_source_data.rs @@ -81,6 +81,12 @@ pub async fn fetch_partition_source_data( ;"); let mut block_ids_hash: i64 = 0; let mut partition_src_blocks = vec![]; + // I really don't like the dependency on the blocks_view time filter being on insert_time. + // If we were to make the 'time relevance' of a block more generous, we would need to add a condition + // on the insert_time of the blocks in the previous SQL. + + // todo: remove query_single_view, use PartitionCache::filter + // todo: add query_partitions to use PartitionedTableProvider let blocks_answer = query_single_view( lake, existing_partitions, diff --git a/rust/analytics/src/lakehouse/partitioned_execution_plan.rs b/rust/analytics/src/lakehouse/partitioned_execution_plan.rs new file mode 100644 index 00000000..fd6aed55 --- /dev/null +++ b/rust/analytics/src/lakehouse/partitioned_execution_plan.rs @@ -0,0 +1,42 @@ +use super::{partition::Partition, reader_factory::ReaderFactory}; +use crate::dfext::predicate::filters_to_predicate; +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::Session, + datasource::{ + listing::PartitionedFile, + physical_plan::{parquet::ParquetExecBuilder, FileScanConfig}, + }, + execution::object_store::ObjectStoreUrl, + physical_plan::ExecutionPlan, + prelude::*, +}; +use object_store::ObjectStore; +use std::sync::Arc; + +pub fn make_partitioned_execution_plan( + schema: SchemaRef, + object_store: Arc, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + partitions: Arc>, +) -> datafusion::error::Result> { + let predicate = filters_to_predicate(schema.clone(), state, filters)?; + let mut file_group = vec![]; + for part in &*partitions { + file_group.push(PartitionedFile::new(&part.file_path, part.file_size as u64)); + } + + let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); + let file_scan_config = FileScanConfig::new(object_store_url, schema) + .with_limit(limit) + .with_projection(projection.cloned()) + .with_file_groups(vec![file_group]); + let reader_factory = ReaderFactory::new(object_store, partitions); + Ok(ParquetExecBuilder::new(file_scan_config) + .with_predicate(predicate.clone()) + .with_parquet_file_reader_factory(Arc::new(reader_factory)) + .build_arc()) +} diff --git a/rust/analytics/src/lakehouse/partitioned_table_provider.rs b/rust/analytics/src/lakehouse/partitioned_table_provider.rs new file mode 100644 index 00000000..55c5f964 --- /dev/null +++ b/rust/analytics/src/lakehouse/partitioned_table_provider.rs @@ -0,0 +1,78 @@ +use super::{partition::Partition, partitioned_execution_plan::make_partitioned_execution_plan}; +use async_trait::async_trait; +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::{Session, TableProvider}, + datasource::TableType, + logical_expr::TableProviderFilterPushDown, + physical_plan::ExecutionPlan, + prelude::*, +}; +use object_store::ObjectStore; +use std::{any::Any, sync::Arc}; + +// unlike MaterializedView, the partition list is fixed at construction +#[derive(Debug)] +pub struct PartitionedTableProvider { + schema: SchemaRef, + object_store: Arc, + partitions: Arc>, +} + +impl PartitionedTableProvider { + pub fn new( + schema: SchemaRef, + object_store: Arc, + partitions: Arc>, + ) -> Self { + Self { + schema, + object_store, + partitions, + } + } +} + +#[async_trait] +impl TableProvider for PartitionedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + make_partitioned_execution_plan( + self.schema(), + self.object_store.clone(), + state, + projection, + filters, + limit, + self.partitions.clone(), + ) + } + + /// Tell DataFusion to push filters down to the scan method + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::error::Result> { + // Inexact because the pruning can't handle all expressions and pruning + // is not done at the row level -- there may be rows in returned files + // that do not pass the filter + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} diff --git a/rust/analytics/src/lakehouse/processes_view.rs b/rust/analytics/src/lakehouse/processes_view.rs index b8acc554..e77a05f0 100644 --- a/rust/analytics/src/lakehouse/processes_view.rs +++ b/rust/analytics/src/lakehouse/processes_view.rs @@ -18,6 +18,9 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "processes"; const VIEW_INSTANCE_ID: &str = "global"; +lazy_static::lazy_static! { + static ref INSERT_TIME_COLUMN: Arc = Arc::new( String::from("insert_time")); +} #[derive(Debug)] pub struct ProcessesView { @@ -134,6 +137,14 @@ impl View for ProcessesView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } } pub fn processes_view_schema() -> Schema { diff --git a/rust/analytics/src/lakehouse/query.rs b/rust/analytics/src/lakehouse/query.rs index 5683209a..62000504 100644 --- a/rust/analytics/src/lakehouse/query.rs +++ b/rust/analytics/src/lakehouse/query.rs @@ -7,7 +7,7 @@ use super::{ }; use crate::{ lakehouse::{ - table_provider::MaterializedView, table_scan_rewrite::TableScanRewrite, + materialized_view::MaterializedView, table_scan_rewrite::TableScanRewrite, view_instance_table_function::ViewInstanceTableFunction, }, time::TimeRange, diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index 6c68b146..5262b34d 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -19,7 +19,8 @@ use std::{hash::DefaultHasher, sync::Arc}; pub struct SqlBatchView { view_set_name: Arc, view_instance_id: Arc, - event_time_column: Arc, // could be more general - for filtering + min_event_time_column: Arc, + max_event_time_column: Arc, src_query: Arc, transform_query: Arc, _merge_partitions_query: Arc, @@ -32,7 +33,8 @@ impl SqlBatchView { pub async fn new( view_set_name: Arc, view_instance_id: Arc, - event_time_column: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, src_query: Arc, transform_query: Arc, merge_partitions_query: Arc, @@ -58,7 +60,8 @@ impl SqlBatchView { Ok(Self { view_set_name, view_instance_id, - event_time_column, + min_event_time_column, + max_event_time_column, src_query, transform_query, _merge_partitions_query: merge_partitions_query, @@ -113,7 +116,8 @@ impl View for SqlBatchView { fetch_sql_partition_spec( ctx, self.transform_query.clone(), - self.event_time_column.clone(), + self.min_event_time_column.clone(), + self.max_event_time_column.clone(), view_meta, begin_insert, end_insert, @@ -140,4 +144,12 @@ impl View for SqlBatchView { fn make_time_filter(&self, _begin: DateTime, _end: DateTime) -> Result> { todo!(); } + + fn get_min_event_time_column_name(&self) -> Arc { + self.min_event_time_column.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + self.max_event_time_column.clone() + } } diff --git a/rust/analytics/src/lakehouse/sql_partition_spec.rs b/rust/analytics/src/lakehouse/sql_partition_spec.rs index b5766d74..b2b88f59 100644 --- a/rust/analytics/src/lakehouse/sql_partition_spec.rs +++ b/rust/analytics/src/lakehouse/sql_partition_spec.rs @@ -3,7 +3,7 @@ use super::{ write_partition::write_partition_from_rows, }; use crate::{ - dfext::get_column::{typed_column, typed_column_by_name}, + dfext::{get_column::typed_column_by_name, min_max_time_df::min_max_time_dataframe}, lakehouse::write_partition::PartitionRowSet, response_writer::Logger, }; @@ -11,8 +11,7 @@ use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; use datafusion::{ - arrow::array::{Int64Array, RecordBatch, TimestampNanosecondArray}, - functions_aggregate::min_max::{max, min}, + arrow::array::{Int64Array, RecordBatch}, prelude::*, }; use futures::StreamExt; @@ -22,7 +21,8 @@ use std::sync::Arc; pub struct SqlPartitionSpec { ctx: SessionContext, transform_query: Arc, - event_time_column: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, view_metadata: ViewMetadata, begin_insert: DateTime, end_insert: DateTime, @@ -30,10 +30,12 @@ pub struct SqlPartitionSpec { } impl SqlPartitionSpec { + #[allow(clippy::too_many_arguments)] pub fn new( ctx: SessionContext, transform_query: Arc, - event_time_column: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, view_metadata: ViewMetadata, begin_insert: DateTime, end_insert: DateTime, @@ -42,7 +44,8 @@ impl SqlPartitionSpec { Self { ctx, transform_query, - event_time_column, + min_event_time_column, + max_event_time_column, view_metadata, begin_insert, end_insert, @@ -87,27 +90,15 @@ impl PartitionSpec for SqlPartitionSpec { while let Some(rb_res) = stream.next().await { let rb = rb_res?; - let df = self.ctx.read_batch(rb.clone())?; - let df = df.aggregate( - vec![], - vec![ - min(col(&*self.event_time_column)), - max(col(&*self.event_time_column)), - ], - )?; - let minmax = df.collect().await?; - if minmax.len() != 1 { - anyhow::bail!("expected minmax to be size 1"); - } - let minmax = &minmax[0]; - let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?; - let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?; - if min_column.is_empty() || max_column.is_empty() { - anyhow::bail!("expected minmax to be size 1"); - } + let (mintime, maxtime) = min_max_time_dataframe( + self.ctx.read_batch(rb.clone())?, + &self.min_event_time_column, + &self.max_event_time_column, + ) + .await?; tx.send(PartitionRowSet { - min_time_row: DateTime::from_timestamp_nanos(min_column.value(0)), - max_time_row: DateTime::from_timestamp_nanos(max_column.value(0)), + min_time_row: mintime, + max_time_row: maxtime, rows: rb, }) .await?; @@ -121,7 +112,8 @@ impl PartitionSpec for SqlPartitionSpec { pub async fn fetch_sql_partition_spec( ctx: SessionContext, transform_query: Arc, - event_time_column: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, view_metadata: ViewMetadata, begin_insert: DateTime, end_insert: DateTime, @@ -140,7 +132,8 @@ pub async fn fetch_sql_partition_spec( Ok(SqlPartitionSpec::new( ctx, transform_query, - event_time_column, + min_event_time_column, + max_event_time_column, view_metadata, begin_insert, end_insert, diff --git a/rust/analytics/src/lakehouse/streams_view.rs b/rust/analytics/src/lakehouse/streams_view.rs index 1cc7802a..37dce8a8 100644 --- a/rust/analytics/src/lakehouse/streams_view.rs +++ b/rust/analytics/src/lakehouse/streams_view.rs @@ -18,6 +18,9 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "streams"; const VIEW_INSTANCE_ID: &str = "global"; +lazy_static::lazy_static! { + static ref INSERT_TIME_COLUMN: Arc = Arc::new( String::from("insert_time")); +} #[derive(Debug)] pub struct StreamsView { @@ -128,6 +131,14 @@ impl View for StreamsView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } } pub fn streams_view_schema() -> Schema { diff --git a/rust/analytics/src/lakehouse/table_scan_rewrite.rs b/rust/analytics/src/lakehouse/table_scan_rewrite.rs index f6d29bf3..6f495ad1 100644 --- a/rust/analytics/src/lakehouse/table_scan_rewrite.rs +++ b/rust/analytics/src/lakehouse/table_scan_rewrite.rs @@ -1,4 +1,4 @@ -use crate::{lakehouse::table_provider::MaterializedView, time::TimeRange}; +use crate::{lakehouse::materialized_view::MaterializedView, time::TimeRange}; use datafusion::error::DataFusionError; use datafusion::logical_expr::utils::conjunction; use datafusion::logical_expr::Filter; diff --git a/rust/analytics/src/lakehouse/thread_spans_view.rs b/rust/analytics/src/lakehouse/thread_spans_view.rs index c980de9e..209c8f2c 100644 --- a/rust/analytics/src/lakehouse/thread_spans_view.rs +++ b/rust/analytics/src/lakehouse/thread_spans_view.rs @@ -28,6 +28,10 @@ use std::sync::Arc; use uuid::Uuid; const VIEW_SET_NAME: &str = "thread_spans"; +lazy_static::lazy_static! { + static ref MIN_TIME_COLUMN: Arc = Arc::new( String::from("begin")); + static ref MAX_TIME_COLUMN: Arc = Arc::new( String::from("end")); +} #[derive(Debug)] pub struct ThreadSpansViewMaker {} @@ -278,4 +282,12 @@ impl View for ThreadSpansView { )), ]) } + + fn get_min_event_time_column_name(&self) -> Arc { + MIN_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + MAX_TIME_COLUMN.clone() + } } diff --git a/rust/analytics/src/lakehouse/view.rs b/rust/analytics/src/lakehouse/view.rs index 4f513488..91da4496 100644 --- a/rust/analytics/src/lakehouse/view.rs +++ b/rust/analytics/src/lakehouse/view.rs @@ -55,6 +55,12 @@ pub trait View: std::fmt::Debug + Send + Sync { /// make_time_filter returns a set of expressions that will filter out the rows of the partition /// outside the time range requested. fn make_time_filter(&self, _begin: DateTime, _end: DateTime) -> Result>; + + /// name of the column to take the min() of to get the first event timestamp in a dataframe + fn get_min_event_time_column_name(&self) -> Arc; + + /// name of the column to take the max() of to get the last event timestamp in a dataframe + fn get_max_event_time_column_name(&self) -> Arc; } impl dyn View { diff --git a/rust/analytics/src/lakehouse/view_instance_table_function.rs b/rust/analytics/src/lakehouse/view_instance_table_function.rs index 5522a03a..9d3f812f 100644 --- a/rust/analytics/src/lakehouse/view_instance_table_function.rs +++ b/rust/analytics/src/lakehouse/view_instance_table_function.rs @@ -1,5 +1,5 @@ use super::{ - partition_cache::QueryPartitionProvider, table_provider::MaterializedView, + materialized_view::MaterializedView, partition_cache::QueryPartitionProvider, view_factory::ViewFactory, }; use crate::time::TimeRange; diff --git a/rust/analytics/src/lakehouse/write_partition.rs b/rust/analytics/src/lakehouse/write_partition.rs index 65d0d4c8..b01cad2c 100644 --- a/rust/analytics/src/lakehouse/write_partition.rs +++ b/rust/analytics/src/lakehouse/write_partition.rs @@ -197,7 +197,7 @@ pub async fn write_partition_from_rows( lake: Arc, view_metadata: ViewMetadata, file_schema: Arc, - begin_insert_time: DateTime, + begin_insert_time: DateTime, //todo:change to min_insert_time & max_insert_time end_insert_time: DateTime, source_data_hash: Vec, mut rb_stream: Receiver, diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index 77ca262b..19b22ac2 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -8,8 +8,9 @@ use micromegas_analytics::lakehouse::partition_cache::{LivePartitionProvider, Pa use micromegas_analytics::lakehouse::query::query_single_view; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::default_view_factory; +use micromegas_analytics::lakehouse::write_partition::retire_partitions; use micromegas_analytics::lakehouse::{sql_batch_view::SqlBatchView, view_factory::ViewFactory}; -use micromegas_analytics::response_writer::ResponseWriter; +use micromegas_analytics::response_writer::{Logger, ResponseWriter}; use micromegas_ingestion::data_lake_connection::{connect_to_data_lake, DataLakeConnection}; use micromegas_telemetry_sink::TelemetryGuardBuilder; use micromegas_tracing::prelude::*; @@ -22,7 +23,7 @@ async fn make_log_entries_levels_per_process_view( let src_query = Arc::new(String::from( " SELECT process_id, - date_bin('1 minute', time) as time_bin, + time, CAST(level==1 as INT) as fatal, CAST(level==2 as INT) as err, CAST(level==3 as INT) as warn, @@ -31,10 +32,11 @@ async fn make_log_entries_levels_per_process_view( CAST(level==6 as INT) as trace FROM log_entries;", )); - let transform_query = Arc::new(String::from( " - SELECT time_bin, + SELECT date_bin('1 minute', time) as time_bin, + min(time) as min_time, + max(time) as max_time, process_id, sum(fatal) as nb_fatal, sum(err) as nb_err, @@ -46,10 +48,11 @@ async fn make_log_entries_levels_per_process_view( GROUP BY process_id, time_bin ORDER BY time_bin, process_id;", )); - let merge_partitions_query = Arc::new(String::from( " SELECT time_bin, + min(min_time) as min_time, + max(max_time) as max_time, process_id, sum(nb_fatal) as nb_fatal, sum(nb_err) as nb_err, @@ -57,15 +60,16 @@ async fn make_log_entries_levels_per_process_view( sum(nb_info) as nb_info, sum(nb_debug) as nb_debug, sum(nb_trace) as nb_trace - FROM src + FROM source GROUP BY process_id, time_bin ORDER BY time_bin, process_id;", )); - + let time_column = Arc::new(String::from("time_bin")); SqlBatchView::new( Arc::new("log_entries_per_process".to_owned()), Arc::new("global".to_owned()), - Arc::new("time_bin".to_owned()), + time_column.clone(), + time_column, src_query, transform_query, merge_partitions_query, @@ -75,58 +79,22 @@ async fn make_log_entries_levels_per_process_view( .await } -#[ignore] -#[tokio::test] -async fn sql_view_test() -> Result<()> { - let _telemetry_guard = TelemetryGuardBuilder::default() - .with_ctrlc_handling() - .with_local_sink_max_level(LevelFilter::Info) - .build(); - let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING") - .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?; - let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI") - .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; - let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); - let view_factory = Arc::new(default_view_factory()?); - let log_summary_view = Arc::new( - make_log_entries_levels_per_process_view(lake.clone(), view_factory.clone()).await?, - ); - let ref_schema = Arc::new(Schema::new(vec![ - Field::new( - "time_bin", - DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), - true, - ), - Field::new( - "process_id", - DataType::Dictionary(DataType::Int16.into(), DataType::Utf8.into()), - false, - ), - Field::new("nb_fatal", DataType::Int64, true), - Field::new("nb_err", DataType::Int64, true), - Field::new("nb_warn", DataType::Int64, true), - Field::new("nb_info", DataType::Int64, true), - Field::new("nb_debug", DataType::Int64, true), - Field::new("nb_trace", DataType::Int64, true), - ])); - - assert_eq!(log_summary_view.get_file_schema(), ref_schema); - - let ref_schema_hash: Vec = vec![219, 37, 165, 158, 123, 73, 39, 204]; - assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); - - let nb_partitions = 3; - let partition_time_delta = TimeDelta::minutes(1); +pub async fn materialize_range( + lake: Arc, + view_factory: Arc, + log_summary_view: Arc, + nb_partitions: i32, + partition_time_delta: TimeDelta, + logger: Arc, +) -> Result<()> { let now = Utc::now(); let end_range = now.duration_trunc(partition_time_delta)?; let begin_range = end_range - (partition_time_delta * nb_partitions); - let mut partitions = Arc::new( // we only need the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) .await?, ); - let null_response_writer = Arc::new(ResponseWriter::new(None)); let blocks_view = Arc::new(BlocksView::new()?); materialize_partition_range( partitions.clone(), @@ -135,7 +103,7 @@ async fn sql_view_test() -> Result<()> { begin_range, end_range, partition_time_delta, - null_response_writer.clone(), + logger.clone(), ) .await?; partitions = Arc::new( @@ -150,7 +118,7 @@ async fn sql_view_test() -> Result<()> { begin_range, end_range, partition_time_delta, - null_response_writer.clone(), + logger.clone(), ) .await?; partitions = Arc::new( @@ -164,15 +132,112 @@ async fn sql_view_test() -> Result<()> { begin_range, end_range, partition_time_delta, - null_response_writer.clone(), + logger.clone(), ) .await?; + Ok(()) +} +async fn retire_existing_partitions( + lake: Arc, + log_summary_view: Arc, + logger: Arc, +) -> Result<()> { + let mut tr = lake.db_pool.begin().await?; + let now = Utc::now(); + let begin = now - TimeDelta::days(10); + retire_partitions( + &mut tr, + &log_summary_view.get_view_set_name(), + &log_summary_view.get_view_instance_id(), + begin, + now, + logger, + ) + .await?; + tr.commit().await.with_context(|| "commit")?; + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn sql_view_test() -> Result<()> { + let _telemetry_guard = TelemetryGuardBuilder::default() + .with_ctrlc_handling() + .with_local_sink_max_level(LevelFilter::Info) + .build(); + let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING") + .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?; + let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI") + .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; + let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); + let view_factory = Arc::new(default_view_factory()?); + let log_summary_view = Arc::new( + make_log_entries_levels_per_process_view(lake.clone(), view_factory.clone()).await?, + ); + let null_response_writer = Arc::new(ResponseWriter::new(None)); + retire_existing_partitions( + lake.clone(), + log_summary_view.clone(), + null_response_writer.clone(), + ) + .await?; + let ref_schema = Arc::new(Schema::new(vec![ + Field::new( + "time_bin", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "min_time", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "max_time", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "process_id", + DataType::Dictionary(DataType::Int16.into(), DataType::Utf8.into()), + false, + ), + Field::new("nb_fatal", DataType::Int64, true), + Field::new("nb_err", DataType::Int64, true), + Field::new("nb_warn", DataType::Int64, true), + Field::new("nb_info", DataType::Int64, true), + Field::new("nb_debug", DataType::Int64, true), + Field::new("nb_trace", DataType::Int64, true), + ])); + assert_eq!(log_summary_view.get_file_schema(), ref_schema); + let ref_schema_hash: Vec = vec![105, 221, 132, 185, 221, 232, 62, 136]; + assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); + materialize_range( + lake.clone(), + view_factory.clone(), + log_summary_view.clone(), + 12, + TimeDelta::seconds(10), + null_response_writer.clone(), + ) + .await?; + materialize_range( + lake.clone(), + view_factory.clone(), + log_summary_view.clone(), + 2, + TimeDelta::minutes(1), + null_response_writer.clone(), + ) + .await?; let answer = query_single_view( lake.clone(), Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), None, - "SELECT * FROM log_entries_per_process;", + "SELECT time_bin, process_id, min_time, max_time + FROM log_entries_per_process + ORDER BY time_bin, process_id, min_time, max_time;", log_summary_view, ) .await?; @@ -180,7 +245,7 @@ async fn sql_view_test() -> Result<()> { datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); eprintln!("{pretty_results}"); + //todo: validate that the sums match the number of log entries info!("bye"); - Ok(()) } From 589f560a13c9351aa71f1ad756332a7a392fafe8 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 13:41:11 -0500 Subject: [PATCH 11/19] compare results with equivalent query on log_entries table --- .../analytics/src/lakehouse/sql_batch_view.rs | 23 +++++- rust/analytics/tests/sql_view_test.rs | 76 +++++++++++++++---- 2 files changed, 82 insertions(+), 17 deletions(-) diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index 5262b34d..c63237e1 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -9,7 +9,10 @@ use crate::time::TimeRange; use anyhow::{Context, Result}; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion::{arrow::datatypes::Schema, prelude::*, sql::TableReference}; +use datafusion::{ + arrow::datatypes::Schema, logical_expr::Between, prelude::*, scalar::ScalarValue, + sql::TableReference, +}; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use std::hash::Hash; use std::hash::Hasher; @@ -141,8 +144,22 @@ impl View for SqlBatchView { ) -> Result<()> { Ok(()) } - fn make_time_filter(&self, _begin: DateTime, _end: DateTime) -> Result> { - todo!(); + fn make_time_filter(&self, begin: DateTime, end: DateTime) -> Result> { + let utc: Arc = Arc::from("+00:00"); + Ok(vec![Expr::Between(Between::new( + col("time_bin").into(), + false, + Expr::Literal(ScalarValue::TimestampNanosecond( + begin.timestamp_nanos_opt(), + Some(utc.clone()), + )) + .into(), + Expr::Literal(ScalarValue::TimestampNanosecond( + end.timestamp_nanos_opt(), + Some(utc.clone()), + )) + .into(), + ))]) } fn get_min_event_time_column_name(&self) -> Arc { diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index 19b22ac2..2cc0f275 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -1,16 +1,17 @@ use anyhow::{Context, Result}; -use chrono::DurationRound; +use chrono::{DateTime, DurationRound}; use chrono::{TimeDelta, Utc}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; use micromegas_analytics::lakehouse::blocks_view::BlocksView; use micromegas_analytics::lakehouse::partition_cache::{LivePartitionProvider, PartitionCache}; -use micromegas_analytics::lakehouse::query::query_single_view; +use micromegas_analytics::lakehouse::query::{query, query_single_view}; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::default_view_factory; use micromegas_analytics::lakehouse::write_partition::retire_partitions; use micromegas_analytics::lakehouse::{sql_batch_view::SqlBatchView, view_factory::ViewFactory}; use micromegas_analytics::response_writer::{Logger, ResponseWriter}; +use micromegas_analytics::time::TimeRange; use micromegas_ingestion::data_lake_connection::{connect_to_data_lake, DataLakeConnection}; use micromegas_telemetry_sink::TelemetryGuardBuilder; use micromegas_tracing::prelude::*; @@ -83,13 +84,11 @@ pub async fn materialize_range( lake: Arc, view_factory: Arc, log_summary_view: Arc, - nb_partitions: i32, + begin_range: DateTime, + end_range: DateTime, partition_time_delta: TimeDelta, logger: Arc, ) -> Result<()> { - let now = Utc::now(); - let end_range = now.duration_trunc(partition_time_delta)?; - let begin_range = end_range - (partition_time_delta * nb_partitions); let mut partitions = Arc::new( // we only need the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) @@ -213,11 +212,16 @@ async fn sql_view_test() -> Result<()> { assert_eq!(log_summary_view.get_file_schema(), ref_schema); let ref_schema_hash: Vec = vec![105, 221, 132, 185, 221, 232, 62, 136]; assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); + + let end_range = Utc::now().duration_trunc(TimeDelta::minutes(1))?; + let begin_range = end_range - (TimeDelta::minutes(3)); + materialize_range( lake.clone(), view_factory.clone(), log_summary_view.clone(), - 12, + begin_range, + end_range, TimeDelta::seconds(10), null_response_writer.clone(), ) @@ -226,7 +230,8 @@ async fn sql_view_test() -> Result<()> { lake.clone(), view_factory.clone(), log_summary_view.clone(), - 2, + begin_range, + end_range, TimeDelta::minutes(1), null_response_writer.clone(), ) @@ -234,10 +239,21 @@ async fn sql_view_test() -> Result<()> { let answer = query_single_view( lake.clone(), Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), - None, - "SELECT time_bin, process_id, min_time, max_time - FROM log_entries_per_process - ORDER BY time_bin, process_id, min_time, max_time;", + Some(TimeRange::new(begin_range, end_range)), + " + SELECT time_bin, + min(min_time) as min_time, + max(max_time) as max_time, + process_id, + sum(nb_fatal) as nb_fatal, + sum(nb_err) as nb_err, + sum(nb_warn) as nb_warn, + sum(nb_info) as nb_info, + sum(nb_debug) as nb_debug, + sum(nb_trace) as nb_trace + FROM log_entries_per_process + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", log_summary_view, ) .await?; @@ -245,7 +261,39 @@ async fn sql_view_test() -> Result<()> { datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); eprintln!("{pretty_results}"); - //todo: validate that the sums match the number of log entries - info!("bye"); + let answer = query( + lake.clone(), + Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), + Some(TimeRange::new(begin_range, end_range)), + " + SELECT date_bin('1 minute', time) as time_bin, + min(time) as min_time, + max(time) as max_time, + process_id, + sum(nb_fatal) as nb_fatal, + sum(nb_err) as nb_err, + sum(nb_warn) as nb_warn, + sum(nb_info) as nb_info, + sum(nb_debug) as nb_debug, + sum(nb_trace) as nb_trace + FROM ( + SELECT process_id, + time, + CAST(level==1 as INT) as nb_fatal, + CAST(level==2 as INT) as nb_err, + CAST(level==3 as INT) as nb_warn, + CAST(level==4 as INT) as nb_info, + CAST(level==5 as INT) as nb_debug, + CAST(level==6 as INT) as nb_trace + FROM log_entries + ) + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", + view_factory, + ) + .await?; + let pretty_results = + datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); + eprintln!("{pretty_results}"); Ok(()) } From f5269591bdfce8ddde6a5486c63ed2c21f2e8930 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 13:52:32 -0500 Subject: [PATCH 12/19] add global view to factory --- rust/analytics/src/lakehouse/view_factory.rs | 6 +++++- rust/analytics/tests/sql_view_test.rs | 13 ++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/rust/analytics/src/lakehouse/view_factory.rs b/rust/analytics/src/lakehouse/view_factory.rs index cdf19660..70455046 100644 --- a/rust/analytics/src/lakehouse/view_factory.rs +++ b/rust/analytics/src/lakehouse/view_factory.rs @@ -151,7 +151,7 @@ pub trait ViewMaker: Send + Sync + Debug { fn make_view(&self, view_instance_id: &str) -> Result>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ViewFactory { view_sets: HashMap>, global_views: Vec>, @@ -169,6 +169,10 @@ impl ViewFactory { &self.global_views } + pub fn add_global_view(&mut self, view: Arc) { + self.global_views.push(view); + } + pub fn add_view_set(&mut self, view_set_name: String, maker: Arc) { self.view_sets.insert(view_set_name, maker); } diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index 2cc0f275..c46fa555 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -5,7 +5,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; use micromegas_analytics::lakehouse::blocks_view::BlocksView; use micromegas_analytics::lakehouse::partition_cache::{LivePartitionProvider, PartitionCache}; -use micromegas_analytics::lakehouse::query::{query, query_single_view}; +use micromegas_analytics::lakehouse::query::query; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::default_view_factory; use micromegas_analytics::lakehouse::write_partition::retire_partitions; @@ -170,10 +170,13 @@ async fn sql_view_test() -> Result<()> { let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI") .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); - let view_factory = Arc::new(default_view_factory()?); + let mut view_factory = default_view_factory()?; let log_summary_view = Arc::new( - make_log_entries_levels_per_process_view(lake.clone(), view_factory.clone()).await?, + make_log_entries_levels_per_process_view(lake.clone(), Arc::new(view_factory.clone())) + .await?, ); + view_factory.add_global_view(log_summary_view.clone()); + let view_factory = Arc::new(view_factory); let null_response_writer = Arc::new(ResponseWriter::new(None)); retire_existing_partitions( lake.clone(), @@ -236,7 +239,7 @@ async fn sql_view_test() -> Result<()> { null_response_writer.clone(), ) .await?; - let answer = query_single_view( + let answer = query( lake.clone(), Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), Some(TimeRange::new(begin_range, end_range)), @@ -254,7 +257,7 @@ async fn sql_view_test() -> Result<()> { FROM log_entries_per_process GROUP BY process_id, time_bin ORDER BY time_bin, process_id;", - log_summary_view, + view_factory.clone(), ) .await?; let pretty_results = From ceaf353e4d990cacf8f69bc28cc1f49bf91410c7 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 15:35:51 -0500 Subject: [PATCH 13/19] make the sql-defined view merge its partitions transparently --- rust/analytics/src/lakehouse/query.rs | 10 +------ .../analytics/src/lakehouse/sql_batch_view.rs | 30 +++++++++++++++++-- rust/analytics/src/lakehouse/view.rs | 16 ++++++++-- rust/analytics/tests/sql_view_test.rs | 23 +++++++------- 4 files changed, 54 insertions(+), 25 deletions(-) diff --git a/rust/analytics/src/lakehouse/query.rs b/rust/analytics/src/lakehouse/query.rs index 62000504..837b3a18 100644 --- a/rust/analytics/src/lakehouse/query.rs +++ b/rust/analytics/src/lakehouse/query.rs @@ -17,7 +17,6 @@ use datafusion::{ arrow::array::RecordBatch, execution::{context::SessionContext, object_store::ObjectStoreUrl}, logical_expr::ScalarUDF, - sql::TableReference, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use micromegas_tracing::prelude::*; @@ -39,14 +38,7 @@ async fn register_table( part_provider, query_range.clone(), ); - let view_set_name = view.get_view_set_name().to_string(); - ctx.register_table( - TableReference::Bare { - table: view_set_name.into(), - }, - Arc::new(table), - )?; - Ok(()) + view.register_table(ctx, table).await } pub async fn query_single_view( diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index c63237e1..46d2b147 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -1,4 +1,5 @@ use super::{ + materialized_view::MaterializedView, partition_cache::{NullPartitionProvider, PartitionCache}, query::make_session_context, sql_partition_spec::fetch_sql_partition_spec, @@ -26,7 +27,7 @@ pub struct SqlBatchView { max_event_time_column: Arc, src_query: Arc, transform_query: Arc, - _merge_partitions_query: Arc, + merge_partitions_query: Arc, schema: Arc, view_factory: Arc, } @@ -67,7 +68,7 @@ impl SqlBatchView { max_event_time_column, src_query, transform_query, - _merge_partitions_query: merge_partitions_query, + merge_partitions_query, schema, view_factory, }) @@ -169,4 +170,29 @@ impl View for SqlBatchView { fn get_max_event_time_column_name(&self) -> Arc { self.max_event_time_column.clone() } + + async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> { + let view_name = self.get_view_set_name().to_string(); + let partitions_table_name = format!("__{view_name}__partitions"); + ctx.register_table( + TableReference::Bare { + table: partitions_table_name.clone().into(), + }, + Arc::new(table), + )?; + let df = ctx + .sql( + &self + .merge_partitions_query + .replace("{source}", &partitions_table_name), + ) + .await?; + ctx.register_table( + TableReference::Bare { + table: view_name.into(), + }, + df.into_view(), + )?; + Ok(()) + } } diff --git a/rust/analytics/src/lakehouse/view.rs b/rust/analytics/src/lakehouse/view.rs index 91da4496..74cf901b 100644 --- a/rust/analytics/src/lakehouse/view.rs +++ b/rust/analytics/src/lakehouse/view.rs @@ -1,9 +1,9 @@ -use super::partition_cache::PartitionCache; +use super::{materialized_view::MaterializedView, partition_cache::PartitionCache}; use crate::{response_writer::Logger, time::TimeRange}; use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion::{arrow::datatypes::Schema, logical_expr::Expr}; +use datafusion::{arrow::datatypes::Schema, logical_expr::Expr, prelude::*, sql::TableReference}; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use std::sync::Arc; @@ -61,6 +61,18 @@ pub trait View: std::fmt::Debug + Send + Sync { /// name of the column to take the max() of to get the last event timestamp in a dataframe fn get_max_event_time_column_name(&self) -> Arc; + + /// register the table in the SessionContext + async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> { + let view_set_name = self.get_view_set_name().to_string(); + ctx.register_table( + TableReference::Bare { + table: view_set_name.into(), + }, + Arc::new(table), + )?; + Ok(()) + } } impl dyn View { diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index c46fa555..b1619c6a 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -61,13 +61,13 @@ async fn make_log_entries_levels_per_process_view( sum(nb_info) as nb_info, sum(nb_debug) as nb_debug, sum(nb_trace) as nb_trace - FROM source + FROM {source} GROUP BY process_id, time_bin ORDER BY time_bin, process_id;", )); let time_column = Arc::new(String::from("time_bin")); SqlBatchView::new( - Arc::new("log_entries_per_process".to_owned()), + Arc::new("log_entries_per_process_per_minute".to_owned()), Arc::new("global".to_owned()), time_column.clone(), time_column, @@ -245,17 +245,16 @@ async fn sql_view_test() -> Result<()> { Some(TimeRange::new(begin_range, end_range)), " SELECT time_bin, - min(min_time) as min_time, - max(max_time) as max_time, + min_time, + max_time, process_id, - sum(nb_fatal) as nb_fatal, - sum(nb_err) as nb_err, - sum(nb_warn) as nb_warn, - sum(nb_info) as nb_info, - sum(nb_debug) as nb_debug, - sum(nb_trace) as nb_trace - FROM log_entries_per_process - GROUP BY process_id, time_bin + nb_fatal, + nb_err, + nb_warn, + nb_info, + nb_debug, + nb_trace + FROM log_entries_per_process_per_minute ORDER BY time_bin, process_id;", view_factory.clone(), ) From 1929f197c91a4b0aefbddddadf8f2dfc43f3c792 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Thu, 23 Jan 2025 16:24:05 -0500 Subject: [PATCH 14/19] custom merge query --- rust/analytics/src/lakehouse/merge.rs | 5 ++++- rust/analytics/src/lakehouse/sql_batch_view.rs | 4 ++++ rust/analytics/src/lakehouse/view.rs | 4 ++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/rust/analytics/src/lakehouse/merge.rs b/rust/analytics/src/lakehouse/merge.rs index 0941bde4..a8efad61 100644 --- a/rust/analytics/src/lakehouse/merge.rs +++ b/rust/analytics/src/lakehouse/merge.rs @@ -93,7 +93,10 @@ pub async fn create_merged_partition( }, Arc::new(table), )?; - let merged_df = ctx.sql("SELECT * FROM source;").await?; + let merge_query = view + .get_merge_partitions_query() + .replace("{source}", "source"); + let merged_df = ctx.sql(&merge_query).await?; let (tx, rx) = tokio::sync::mpsc::channel(1); let join_handle = tokio::spawn(write_partition_from_rows( lake.clone(), diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index 46d2b147..02f75fc5 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -195,4 +195,8 @@ impl View for SqlBatchView { )?; Ok(()) } + + fn get_merge_partitions_query(&self) -> Arc { + self.merge_partitions_query.clone() + } } diff --git a/rust/analytics/src/lakehouse/view.rs b/rust/analytics/src/lakehouse/view.rs index 74cf901b..53f6e863 100644 --- a/rust/analytics/src/lakehouse/view.rs +++ b/rust/analytics/src/lakehouse/view.rs @@ -73,6 +73,10 @@ pub trait View: std::fmt::Debug + Send + Sync { )?; Ok(()) } + + fn get_merge_partitions_query(&self) -> Arc { + Arc::new(String::from("SELECT * FROM {source};")) + } } impl dyn View { From ca49bfa101563bc6702a40695ab0d602bb4a69a5 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Fri, 24 Jan 2025 08:43:37 -0500 Subject: [PATCH 15/19] added assert_eq to make sure the results of the view match the reference --- rust/analytics/tests/sql_view_test.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index b1619c6a..9587f125 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -17,7 +17,7 @@ use micromegas_telemetry_sink::TelemetryGuardBuilder; use micromegas_tracing::prelude::*; use std::sync::Arc; -async fn make_log_entries_levels_per_process_view( +async fn make_log_entries_levels_per_process_minute_view( lake: Arc, view_factory: Arc, ) -> Result { @@ -90,7 +90,7 @@ pub async fn materialize_range( logger: Arc, ) -> Result<()> { let mut partitions = Arc::new( - // we only need the blocks partitions for this call + // todo: query only the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) .await?, ); @@ -172,8 +172,11 @@ async fn sql_view_test() -> Result<()> { let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); let mut view_factory = default_view_factory()?; let log_summary_view = Arc::new( - make_log_entries_levels_per_process_view(lake.clone(), Arc::new(view_factory.clone())) - .await?, + make_log_entries_levels_per_process_minute_view( + lake.clone(), + Arc::new(view_factory.clone()), + ) + .await?, ); view_factory.add_global_view(log_summary_view.clone()); let view_factory = Arc::new(view_factory); @@ -215,10 +218,8 @@ async fn sql_view_test() -> Result<()> { assert_eq!(log_summary_view.get_file_schema(), ref_schema); let ref_schema_hash: Vec = vec![105, 221, 132, 185, 221, 232, 62, 136]; assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); - let end_range = Utc::now().duration_trunc(TimeDelta::minutes(1))?; let begin_range = end_range - (TimeDelta::minutes(3)); - materialize_range( lake.clone(), view_factory.clone(), @@ -259,9 +260,9 @@ async fn sql_view_test() -> Result<()> { view_factory.clone(), ) .await?; - let pretty_results = + let pretty_results_view = datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); - eprintln!("{pretty_results}"); + eprintln!("{pretty_results_view}"); let answer = query( lake.clone(), @@ -294,8 +295,9 @@ async fn sql_view_test() -> Result<()> { view_factory, ) .await?; - let pretty_results = + let pretty_results_ref = datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); - eprintln!("{pretty_results}"); + eprintln!("{pretty_results_ref}"); + assert_eq!(pretty_results_view, pretty_results_ref); Ok(()) } From 6566cab3182668e7aa76e766000fc965887132b8 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Fri, 24 Jan 2025 08:46:37 -0500 Subject: [PATCH 16/19] todo --- rust/public/src/servers/maintenance.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/public/src/servers/maintenance.rs b/rust/public/src/servers/maintenance.rs index 4c4938bf..ed6785c9 100644 --- a/rust/public/src/servers/maintenance.rs +++ b/rust/public/src/servers/maintenance.rs @@ -86,7 +86,7 @@ pub async fn materialize_all_views( let end_range = now.duration_trunc(partition_time_delta)?; let begin_range = end_range - (partition_time_delta * nb_partitions); let mut partitions = Arc::new( - // we only need the blocks partitions for this call + // todo: query only the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) .await?, ); From 0710ac0c70921ef0eccd9c99afdd6a7af4e2e2bb Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Fri, 24 Jan 2025 08:51:18 -0500 Subject: [PATCH 17/19] renamed get_column to typed_column --- rust/analytics/src/dfext/min_max_time_df.rs | 2 +- rust/analytics/src/dfext/mod.rs | 4 ++-- rust/analytics/src/dfext/{get_column.rs => typed_column.rs} | 0 rust/analytics/src/lakehouse/partition_source_data.rs | 2 +- rust/analytics/src/lakehouse/sql_partition_spec.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) rename rust/analytics/src/dfext/{get_column.rs => typed_column.rs} (100%) diff --git a/rust/analytics/src/dfext/min_max_time_df.rs b/rust/analytics/src/dfext/min_max_time_df.rs index b1f54ae3..a14361e8 100644 --- a/rust/analytics/src/dfext/min_max_time_df.rs +++ b/rust/analytics/src/dfext/min_max_time_df.rs @@ -6,7 +6,7 @@ use datafusion::{ functions_aggregate::min_max::{max, min}, }; -use super::get_column::typed_column; +use super::typed_column::typed_column; pub async fn min_max_time_dataframe( df: DataFrame, diff --git a/rust/analytics/src/dfext/mod.rs b/rust/analytics/src/dfext/mod.rs index 6229b1b0..ac02f97a 100644 --- a/rust/analytics/src/dfext/mod.rs +++ b/rust/analytics/src/dfext/mod.rs @@ -2,8 +2,6 @@ pub mod async_log_stream; /// Utilities to help deal with df expressions pub mod expressions; -/// Access to a RecordBatch's columns -pub mod get_column; /// Stream a function's log as a table pub mod log_stream_table_provider; /// Get min & max from the time column @@ -12,3 +10,5 @@ pub mod min_max_time_df; pub mod predicate; /// Execution plan interface for an async task pub mod task_log_exec_plan; +/// Access to a RecordBatch's columns +pub mod typed_column; diff --git a/rust/analytics/src/dfext/get_column.rs b/rust/analytics/src/dfext/typed_column.rs similarity index 100% rename from rust/analytics/src/dfext/get_column.rs rename to rust/analytics/src/dfext/typed_column.rs diff --git a/rust/analytics/src/lakehouse/partition_source_data.rs b/rust/analytics/src/lakehouse/partition_source_data.rs index f1aaf8f3..b8885639 100644 --- a/rust/analytics/src/lakehouse/partition_source_data.rs +++ b/rust/analytics/src/lakehouse/partition_source_data.rs @@ -1,6 +1,6 @@ use super::partition_cache::PartitionCache; use crate::{ - dfext::get_column::typed_column_by_name, + dfext::typed_column::typed_column_by_name, lakehouse::{blocks_view::BlocksView, query::query_single_view}, time::TimeRange, }; diff --git a/rust/analytics/src/lakehouse/sql_partition_spec.rs b/rust/analytics/src/lakehouse/sql_partition_spec.rs index b2b88f59..cadb700d 100644 --- a/rust/analytics/src/lakehouse/sql_partition_spec.rs +++ b/rust/analytics/src/lakehouse/sql_partition_spec.rs @@ -3,7 +3,7 @@ use super::{ write_partition::write_partition_from_rows, }; use crate::{ - dfext::{get_column::typed_column_by_name, min_max_time_df::min_max_time_dataframe}, + dfext::{min_max_time_df::min_max_time_dataframe, typed_column::typed_column_by_name}, lakehouse::write_partition::PartitionRowSet, response_writer::Logger, }; From a8848e6bb834791bfc14afb65c2475b0b6b7e28a Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Fri, 24 Jan 2025 09:16:51 -0500 Subject: [PATCH 18/19] doc --- rust/analytics/src/lakehouse/mod.rs | 4 ++-- rust/analytics/src/lakehouse/sql_batch_view.rs | 14 ++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/rust/analytics/src/lakehouse/mod.rs b/rust/analytics/src/lakehouse/mod.rs index b02d3cb1..db3fed3c 100644 --- a/rust/analytics/src/lakehouse/mod.rs +++ b/rust/analytics/src/lakehouse/mod.rs @@ -1,4 +1,4 @@ -// Record batches + schema +/// Record batches + schema pub mod answer; /// Write parquet in object store pub mod async_parquet_writer; @@ -50,7 +50,7 @@ pub mod query; pub mod reader_factory; /// Exposes retire_partitions as a table function pub mod retire_partitions_table_function; -/// Sql-defined view updated at regular intervals +/// Sql-defined view updated in batch pub mod sql_batch_view; /// Specification for a view partition backed by a SQL query on the lakehouse. pub mod sql_partition_spec; diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs index 02f75fc5..689a8f6e 100644 --- a/rust/analytics/src/lakehouse/sql_batch_view.rs +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -19,6 +19,7 @@ use std::hash::Hash; use std::hash::Hasher; use std::{hash::DefaultHasher, sync::Arc}; +/// SQL-defined view updated in batch #[derive(Debug)] pub struct SqlBatchView { view_set_name: Arc, @@ -34,9 +35,18 @@ pub struct SqlBatchView { impl SqlBatchView { #[allow(clippy::too_many_arguments)] + /// # Arguments + /// + /// * `view_set_name` - name of the table + /// * `min_event_time_column` - min(column) should result in the first timestamp in a dataframe + /// * `max_event_time_column` - max(column) should result in the last timestamp in a dataframe + /// * `src_query` - used to count the rows of the underlying data to know if a cached partition is up to date + /// * `transform_query` - used to transform the source data into a cached partition + /// * `merge_partitions_query` - used to merge multiple partitions into a single one (and user queries which are one multiple partitions by default) + /// * `lake` - data lake + /// * `view_factory` - all views accessible to the `src_query` pub async fn new( view_set_name: Arc, - view_instance_id: Arc, min_event_time_column: Arc, max_event_time_column: Arc, src_query: Arc, @@ -63,7 +73,7 @@ impl SqlBatchView { Ok(Self { view_set_name, - view_instance_id, + view_instance_id: Arc::new(String::from("global")), min_event_time_column, max_event_time_column, src_query, From 113e362249727e772683bdba45f9a05a4acddf5c Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Fri, 24 Jan 2025 09:27:49 -0500 Subject: [PATCH 19/19] build fix --- rust/analytics/tests/sql_view_test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index 9587f125..a0ea2077 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -68,7 +68,6 @@ async fn make_log_entries_levels_per_process_minute_view( let time_column = Arc::new(String::from("time_bin")); SqlBatchView::new( Arc::new("log_entries_per_process_per_minute".to_owned()), - Arc::new("global".to_owned()), time_column.clone(), time_column, src_query,