diff --git a/rust/analytics/src/dfext/min_max_time_df.rs b/rust/analytics/src/dfext/min_max_time_df.rs new file mode 100644 index 00000000..a14361e8 --- /dev/null +++ b/rust/analytics/src/dfext/min_max_time_df.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use chrono::{DateTime, Utc}; +use datafusion::prelude::*; +use datafusion::{ + arrow::array::TimestampNanosecondArray, + functions_aggregate::min_max::{max, min}, +}; + +use super::typed_column::typed_column; + +pub async fn min_max_time_dataframe( + df: DataFrame, + min_time_column_name: &str, + max_time_column_name: &str, +) -> Result<(DateTime, DateTime)> { + let df = df.aggregate( + vec![], + vec![ + min(col(min_time_column_name)), + max(col(max_time_column_name)), + ], + )?; + let minmax = df.collect().await?; + if minmax.len() != 1 { + anyhow::bail!("expected minmax to be size 1"); + } + let minmax = &minmax[0]; + let min_column: &TimestampNanosecondArray = typed_column(minmax, 0)?; + let max_column: &TimestampNanosecondArray = typed_column(minmax, 1)?; + if min_column.is_empty() || max_column.is_empty() { + anyhow::bail!("expected minmax to be size 1"); + } + Ok(( + DateTime::from_timestamp_nanos(min_column.value(0)), + DateTime::from_timestamp_nanos(max_column.value(0)), + )) +} diff --git a/rust/analytics/src/dfext/mod.rs b/rust/analytics/src/dfext/mod.rs index 9022b53f..ac02f97a 100644 --- a/rust/analytics/src/dfext/mod.rs +++ b/rust/analytics/src/dfext/mod.rs @@ -4,5 +4,11 @@ pub mod async_log_stream; pub mod expressions; /// Stream a function's log as a table pub mod log_stream_table_provider; +/// Get min & max from the time column +pub mod min_max_time_df; +/// Convert a filtering expression to a physical predicate +pub mod predicate; /// Execution plan interface for an async task pub mod task_log_exec_plan; +/// Access to a RecordBatch's columns +pub mod typed_column; diff --git a/rust/analytics/src/dfext/predicate.rs b/rust/analytics/src/dfext/predicate.rs new file mode 100644 index 00000000..6578e338 --- /dev/null +++ b/rust/analytics/src/dfext/predicate.rs @@ -0,0 +1,27 @@ +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::Session, + common::DFSchema, + logical_expr::utils::conjunction, + physical_plan::{expressions, PhysicalExpr}, + prelude::*, +}; +use std::sync::Arc; + +// from datafusion/datafusion-examples/examples/advanced_parquet_index.rs +pub fn filters_to_predicate( + schema: SchemaRef, + state: &dyn Session, + filters: &[Expr], +) -> datafusion::error::Result> { + let df_schema = DFSchema::try_from(schema)?; + let predicate = conjunction(filters.to_vec()); + let predicate = predicate + .map(|predicate| state.create_physical_expr(predicate, &df_schema)) + .transpose()? + // if there are no filters, use a literal true to have a predicate + // that always evaluates to true we can pass to the index + .unwrap_or_else(|| expressions::lit(true)); + + Ok(predicate) +} diff --git a/rust/analytics/src/dfext/typed_column.rs b/rust/analytics/src/dfext/typed_column.rs new file mode 100644 index 00000000..981911ec --- /dev/null +++ b/rust/analytics/src/dfext/typed_column.rs @@ -0,0 +1,28 @@ +use anyhow::{Context, Result}; + +pub fn typed_column_by_name<'a, T: core::any::Any>( + rc: &'a datafusion::arrow::array::RecordBatch, + column_name: &str, +) -> Result<&'a T> { + let column = rc + .column_by_name(column_name) + .with_context(|| format!("getting column {column_name}"))?; + column + .as_any() + .downcast_ref::() + .with_context(|| format!("casting {column_name}: {:?}", column.data_type())) +} + +pub fn typed_column( + rc: &datafusion::arrow::array::RecordBatch, + index: usize, +) -> Result<&T> { + let column = rc + .columns() + .get(index) + .with_context(|| format!("getting column {index}"))?; + column + .as_any() + .downcast_ref::() + .with_context(|| format!("casting {index}: {:?}", column.data_type())) +} diff --git a/rust/analytics/src/lakehouse/batch_update.rs b/rust/analytics/src/lakehouse/batch_update.rs index 34f60dfd..94371b9f 100644 --- a/rust/analytics/src/lakehouse/batch_update.rs +++ b/rust/analytics/src/lakehouse/batch_update.rs @@ -38,7 +38,6 @@ async fn verify_overlapping_partitions( let nb_source_events = hash_to_object_count(source_data_hash)?; let filtered = existing_partitions .filter(view_set_name, view_instance_id, begin_insert, end_insert) - .with_context(|| "filtering partition cache")? .partitions; if filtered.is_empty() { logger @@ -147,7 +146,15 @@ async fn materialize_partition( .with_context(|| "writing partition")?; } PartitionCreationStrategy::MergeExisting => { - create_merged_partition(lake, view, begin_insert, end_insert, logger).await?; + create_merged_partition( + existing_partitions, + lake, + view, + begin_insert, + end_insert, + logger, + ) + .await?; } PartitionCreationStrategy::Abort => {} } diff --git a/rust/analytics/src/lakehouse/blocks_view.rs b/rust/analytics/src/lakehouse/blocks_view.rs index 9ebc0c02..dc3b43fa 100644 --- a/rust/analytics/src/lakehouse/blocks_view.rs +++ b/rust/analytics/src/lakehouse/blocks_view.rs @@ -1,11 +1,9 @@ -use crate::time::TimeRange; - use super::{ metadata_partition_spec::fetch_metadata_partition_spec, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, - view_factory::ViewMaker, }; +use crate::time::TimeRange; use anyhow::{Context, Result}; use async_trait::async_trait; use chrono::{DateTime, Utc}; @@ -19,17 +17,8 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "blocks"; const VIEW_INSTANCE_ID: &str = "global"; - -#[derive(Debug)] -pub struct BlocksViewMaker {} - -impl ViewMaker for BlocksViewMaker { - fn make_view(&self, view_instance_id: &str) -> Result> { - if view_instance_id != "global" { - anyhow::bail!("only global view instance id is supported for metadata views"); - } - Ok(Arc::new(BlocksView::new()?)) - } +lazy_static::lazy_static! { + static ref INSERT_TIME_COLUMN: Arc = Arc::new( String::from("insert_time")); } #[derive(Debug)] @@ -78,7 +67,7 @@ impl View for BlocksView { async fn make_batch_partition_spec( &self, lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { @@ -123,6 +112,7 @@ impl View for BlocksView { anyhow::bail!("not supported"); } + // fetch_partition_source_data relies on this filter being on insert_time fn make_time_filter(&self, begin: DateTime, end: DateTime) -> Result> { let utc: Arc = Arc::from("+00:00"); Ok(vec![Expr::Between(Between::new( @@ -140,6 +130,14 @@ impl View for BlocksView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } } pub fn blocks_view_schema() -> Schema { diff --git a/rust/analytics/src/lakehouse/log_view.rs b/rust/analytics/src/lakehouse/log_view.rs index 8752e51c..eaaafa5e 100644 --- a/rust/analytics/src/lakehouse/log_view.rs +++ b/rust/analytics/src/lakehouse/log_view.rs @@ -2,7 +2,7 @@ use super::{ block_partition_spec::BlockPartitionSpec, jit_partitions::write_partition_from_blocks, log_block_processor::LogBlockProcessor, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, partition_source_data::fetch_partition_source_data, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, @@ -26,6 +26,9 @@ use std::sync::Arc; use uuid::Uuid; const VIEW_SET_NAME: &str = "log_entries"; +lazy_static::lazy_static! { + static ref TIME_COLUMN: Arc = Arc::new( String::from("time")); +} #[derive(Debug)] pub struct LogViewMaker {} @@ -72,7 +75,7 @@ impl View for LogView { async fn make_batch_partition_spec( &self, lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { @@ -80,7 +83,7 @@ impl View for LogView { anyhow::bail!("not supported for jit queries... should it?"); } let source_data = - fetch_partition_source_data(lake, part_provider, begin_insert, end_insert, "log") + fetch_partition_source_data(lake, existing_partitions, begin_insert, end_insert, "log") .await .with_context(|| "fetch_partition_source_data")?; Ok(Arc::new(BlockPartitionSpec { @@ -187,4 +190,12 @@ impl View for LogView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } } diff --git a/rust/analytics/src/lakehouse/table_provider.rs b/rust/analytics/src/lakehouse/materialized_view.rs similarity index 54% rename from rust/analytics/src/lakehouse/table_provider.rs rename to rust/analytics/src/lakehouse/materialized_view.rs index b02c9580..99f452ab 100644 --- a/rust/analytics/src/lakehouse/table_provider.rs +++ b/rust/analytics/src/lakehouse/materialized_view.rs @@ -1,19 +1,16 @@ -use super::{partition_cache::QueryPartitionProvider, view::View}; -use crate::{lakehouse::reader_factory::ReaderFactory, time::TimeRange}; +use super::{ + partition_cache::QueryPartitionProvider, + partitioned_execution_plan::make_partitioned_execution_plan, view::View, +}; +use crate::time::TimeRange; use async_trait::async_trait; use datafusion::{ arrow::datatypes::SchemaRef, catalog::{Session, TableProvider}, - common::DFSchema, - datasource::{ - listing::PartitionedFile, - physical_plan::{parquet::ParquetExecBuilder, FileScanConfig}, - TableType, - }, + datasource::TableType, error::DataFusionError, - execution::object_store::ObjectStoreUrl, - logical_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}, - physical_plan::{expressions, ExecutionPlan, PhysicalExpr}, + logical_expr::{Expr, TableProviderFilterPushDown}, + physical_plan::ExecutionPlan, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use object_store::ObjectStore; @@ -48,25 +45,6 @@ impl MaterializedView { pub fn get_view(&self) -> Arc { self.view.clone() } - - // from datafusion/datafusion-examples/examples/advanced_parquet_index.rs - fn filters_to_predicate( - &self, - schema: SchemaRef, - state: &dyn Session, - filters: &[Expr], - ) -> datafusion::error::Result> { - let df_schema = DFSchema::try_from(schema)?; - let predicate = conjunction(filters.to_vec()); - let predicate = predicate - .map(|predicate| state.create_physical_expr(predicate, &df_schema)) - .transpose()? - // if there are no filters, use a literal true to have a predicate - // that always evaluates to true we can pass to the index - .unwrap_or_else(|| expressions::lit(true)); - - Ok(predicate) - } } #[async_trait] @@ -90,13 +68,11 @@ impl TableProvider for MaterializedView { filters: &[Expr], limit: Option, ) -> datafusion::error::Result> { - let predicate = self.filters_to_predicate(self.view.get_file_schema(), state, filters)?; self.view .jit_update(self.lake.clone(), self.query_range.clone()) .await .map_err(|e| DataFusionError::External(e.into()))?; - let mut file_group = vec![]; let partitions = self .part_provider .fetch( @@ -108,22 +84,15 @@ impl TableProvider for MaterializedView { .await .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?; - for part in &partitions { - file_group.push(PartitionedFile::new(&part.file_path, part.file_size as u64)); - } - - let schema = self.schema(); - let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); - let file_scan_config = FileScanConfig::new(object_store_url, schema) - .with_limit(limit) - .with_projection(projection.cloned()) - .with_file_groups(vec![file_group]); - let reader_factory = - ReaderFactory::new(Arc::clone(&self.object_store), Arc::new(partitions)); - Ok(ParquetExecBuilder::new(file_scan_config) - .with_predicate(predicate.clone()) - .with_parquet_file_reader_factory(Arc::new(reader_factory)) - .build_arc()) + make_partitioned_execution_plan( + self.schema(), + self.object_store.clone(), + state, + projection, + filters, + limit, + Arc::new(partitions), + ) } /// Tell DataFusion to push filters down to the scan method diff --git a/rust/analytics/src/lakehouse/merge.rs b/rust/analytics/src/lakehouse/merge.rs index d5350a73..a8efad61 100644 --- a/rust/analytics/src/lakehouse/merge.rs +++ b/rust/analytics/src/lakehouse/merge.rs @@ -1,145 +1,130 @@ use super::{ + partition::Partition, + partition_cache::PartitionCache, partition_source_data::hash_to_object_count, + partitioned_table_provider::PartitionedTableProvider, view::View, write_partition::{write_partition_from_rows, PartitionRowSet}, }; -use crate::response_writer::Logger; -use anyhow::{Context, Result}; +use crate::{dfext::min_max_time_df::min_max_time_dataframe, response_writer::Logger}; +use anyhow::Result; use chrono::{DateTime, Utc}; -use datafusion::parquet::arrow::{ - async_reader::ParquetObjectReader, ParquetRecordBatchStreamBuilder, -}; +use datafusion::{execution::object_store::ObjectStoreUrl, prelude::*, sql::TableReference}; use futures::stream::StreamExt; use micromegas_ingestion::data_lake_connection::DataLakeConnection; -use object_store::path::Path; -use object_store::ObjectMeta; -use sqlx::Row; use std::sync::Arc; use xxhash_rust::xxh32::xxh32; +fn partition_set_stats( + view: Arc, + filtered_partitions: &[Partition], +) -> Result<(i64, i64)> { + let mut sum_size: i64 = 0; + let mut source_hash: i64 = 0; + let latest_file_schema_hash = view.get_file_schema_hash(); + for p in filtered_partitions { + // for some time all the hashes will actually be the number of events in the source data + // when views have different hash algos, we should delegate to the view the creation of the merged hash + source_hash = if p.source_data_hash.len() == std::mem::size_of::() { + source_hash + hash_to_object_count(&p.source_data_hash)? + } else { + //previous hash algo + xxh32(&p.source_data_hash, source_hash as u32).into() + }; + + sum_size += p.file_size; + + if p.view_metadata.file_schema_hash != latest_file_schema_hash { + anyhow::bail!( + "incompatible file schema with [{},{}]", + p.begin_insert_time.to_rfc3339(), + p.end_insert_time.to_rfc3339() + ); + } + } + Ok((sum_size, source_hash)) +} + pub async fn create_merged_partition( + existing_partitions: Arc, lake: Arc, view: Arc, - begin: DateTime, - end: DateTime, + begin_insert: DateTime, + end_insert: DateTime, logger: Arc, ) -> Result<()> { - let view_set_name = view.get_view_set_name().to_string(); - let view_instance_id = view.get_view_instance_id().to_string(); + let view_set_name = &view.get_view_set_name(); + let view_instance_id = &view.get_view_instance_id(); let desc = format!( "[{}, {}] {view_set_name} {view_instance_id}", - begin.to_rfc3339(), - end.to_rfc3339() + begin_insert.to_rfc3339(), + end_insert.to_rfc3339() ); // we are not looking for intersecting partitions, but only those that fit completely in the range - let rows = sqlx::query( - "SELECT file_path, file_size, updated, file_schema_hash, source_data_hash, begin_insert_time, end_insert_time, min_event_time, max_event_time - FROM lakehouse_partitions - WHERE view_set_name = $1 - AND view_instance_id = $2 - AND begin_insert_time >= $3 - AND end_insert_time <= $4 - ;", - ) - .bind(&view_set_name) - .bind(&view_instance_id) - .bind(begin) - .bind(end) - .fetch_all(&lake.db_pool) - .await - .with_context(|| "fetching partitions to merge")?; - if rows.len() < 2 { + // otherwise we'd get duplicated records + let filtered_partitions = existing_partitions + .filter_inside_range(view_set_name, view_instance_id, begin_insert, end_insert) + .partitions; + if filtered_partitions.len() < 2 { logger .write_log_entry(format!("{desc}: not enough partitions to merge")) .await?; return Ok(()); } - let latest_file_schema_hash = view.get_file_schema_hash(); - let mut sum_size: i64 = 0; - let mut source_hash: i64 = 0; - for r in &rows { - // for some time all the hashes will actually be the number of events in the source data - // when views have different hash algos, we should delegate to the view the creation of the merged hash - let source_data_hash: Vec = r.try_get("source_data_hash")?; - source_hash = if source_data_hash.len() == std::mem::size_of::() { - source_hash + hash_to_object_count(&source_data_hash)? - } else { - //previous hash algo - xxh32(&source_data_hash, source_hash as u32).into() - }; - - let file_size: i64 = r.try_get("file_size")?; - sum_size += file_size; - - let file_schema_hash: Vec = r.try_get("file_schema_hash")?; - if file_schema_hash != latest_file_schema_hash { - let begin_insert_time: DateTime = r.try_get("begin_insert_time")?; - let end_insert_time: DateTime = r.try_get("end_insert_time")?; - logger - .write_log_entry(format!( - "{desc}: incompatible file schema with [{},{}]", - begin_insert_time.to_rfc3339(), - end_insert_time.to_rfc3339() - )) - .await?; - return Ok(()); - } - } + let (sum_size, source_hash) = partition_set_stats(view.clone(), &filtered_partitions)?; logger .write_log_entry(format!( "{desc}: merging {} partitions sum_size={sum_size}", - rows.len() + filtered_partitions.len() )) .await?; - + let object_store = lake.blob_storage.inner(); + let table = PartitionedTableProvider::new( + view.get_file_schema(), + object_store.clone(), + Arc::new(filtered_partitions), + ); + let ctx = SessionContext::new(); + let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); + ctx.register_object_store(object_store_url.as_ref(), object_store.clone()); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + Arc::new(table), + )?; + let merge_query = view + .get_merge_partitions_query() + .replace("{source}", "source"); + let merged_df = ctx.sql(&merge_query).await?; let (tx, rx) = tokio::sync::mpsc::channel(1); let join_handle = tokio::spawn(write_partition_from_rows( lake.clone(), view.get_meta(), view.get_file_schema(), - begin, - end, + begin_insert, + end_insert, source_hash.to_le_bytes().to_vec(), rx, logger.clone(), )); - for r in &rows { - let file_path: String = r.try_get("file_path")?; - let file_size: i64 = r.try_get("file_size")?; - logger - .write_log_entry(format!("reading path={file_path} size={file_size}")) - .await?; - let min_time_row: DateTime = r.try_get("min_event_time")?; - let max_time_row: DateTime = r.try_get("max_event_time")?; - - let updated: DateTime = r.try_get("updated")?; - let meta = ObjectMeta { - location: Path::from(file_path), - last_modified: updated, - size: file_size as usize, - e_tag: None, - version: None, - }; - let reader = ParquetObjectReader::new(lake.blob_storage.inner(), meta); - let builder = ParquetRecordBatchStreamBuilder::new(reader) - .await - .with_context(|| "ParquetRecordBatchStreamBuilder::new")?; - let mut rbstream = builder - .with_batch_size(100 * 1024) // the default is 1024, which seems low - .build() - .with_context(|| "builder.build()")?; - while let Some(rb_res) = rbstream.next().await { - let rows = rb_res?; - tx.send(PartitionRowSet { - min_time_row, - max_time_row, - rows, - }) - .await?; - } + let mut stream = merged_df.execute_stream().await?; + while let Some(rb_res) = stream.next().await { + let rb = rb_res?; + let (mintime, maxtime) = min_max_time_dataframe( + ctx.read_batch(rb.clone())?, + &view.get_min_event_time_column_name(), + &view.get_max_event_time_column_name(), + ) + .await?; + tx.send(PartitionRowSet { + min_time_row: mintime, + max_time_row: maxtime, + rows: rb, + }) + .await?; } drop(tx); join_handle.await??; - Ok(()) } diff --git a/rust/analytics/src/lakehouse/metrics_view.rs b/rust/analytics/src/lakehouse/metrics_view.rs index f327cadb..4e1bd4d2 100644 --- a/rust/analytics/src/lakehouse/metrics_view.rs +++ b/rust/analytics/src/lakehouse/metrics_view.rs @@ -10,7 +10,7 @@ use super::{ generate_jit_partitions, is_jit_partition_up_to_date, write_partition_from_blocks, }, metrics_block_processor::MetricsBlockProcessor, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, partition_source_data::fetch_partition_source_data, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, @@ -28,6 +28,9 @@ use std::sync::Arc; use uuid::Uuid; const VIEW_SET_NAME: &str = "measures"; +lazy_static::lazy_static! { + static ref TIME_COLUMN: Arc = Arc::new( String::from("time")); +} #[derive(Debug)] pub struct MetricsViewMaker {} @@ -73,17 +76,22 @@ impl View for MetricsView { async fn make_batch_partition_spec( &self, lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { if *self.view_instance_id != "global" { anyhow::bail!("not supported for jit queries... should it?"); } - let source_data = - fetch_partition_source_data(lake, part_provider, begin_insert, end_insert, "metrics") - .await - .with_context(|| "fetch_partition_source_data")?; + let source_data = fetch_partition_source_data( + lake, + existing_partitions, + begin_insert, + end_insert, + "metrics", + ) + .await + .with_context(|| "fetch_partition_source_data")?; Ok(Arc::new(BlockPartitionSpec { view_metadata: ViewMetadata { view_set_name: self.view_set_name.clone(), @@ -189,4 +197,12 @@ impl View for MetricsView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + TIME_COLUMN.clone() + } } diff --git a/rust/analytics/src/lakehouse/mod.rs b/rust/analytics/src/lakehouse/mod.rs index 0864ee00..db3fed3c 100644 --- a/rust/analytics/src/lakehouse/mod.rs +++ b/rust/analytics/src/lakehouse/mod.rs @@ -18,6 +18,8 @@ pub mod log_block_processor; pub mod log_view; /// Exposes materialize_partitions as a table function pub mod materialize_partitions_table_function; +/// TableProvider implementation for the lakehouse +pub mod materialized_view; /// Merge consecutive parquet partitions into a single file pub mod merge; /// Specification for a view partition backed by a table in the postgresql metadata database. @@ -34,6 +36,10 @@ pub mod partition; pub mod partition_cache; /// Describes the event blocks backing a partition pub mod partition_source_data; +/// ExecutionPlan based on a set of parquet files +pub mod partitioned_execution_plan; +/// TableProvider based on a set of parquet files +pub mod partitioned_table_provider; /// Replicated view of the `processes` table of the postgresql metadata database. pub mod processes_view; /// property_get function support from SQL @@ -44,10 +50,12 @@ pub mod query; pub mod reader_factory; /// Exposes retire_partitions as a table function pub mod retire_partitions_table_function; +/// Sql-defined view updated in batch +pub mod sql_batch_view; +/// Specification for a view partition backed by a SQL query on the lakehouse. +pub mod sql_partition_spec; /// Replicated view of the `streams` table of the postgresql metadata database. pub mod streams_view; -/// TableProvider implementation for the lakehouse -pub mod table_provider; /// Rewrite table scans to take the query range into account pub mod table_scan_rewrite; /// Tracking of expired partitions diff --git a/rust/analytics/src/lakehouse/partition_cache.rs b/rust/analytics/src/lakehouse/partition_cache.rs index f0380529..7b41faaa 100644 --- a/rust/analytics/src/lakehouse/partition_cache.rs +++ b/rust/analytics/src/lakehouse/partition_cache.rs @@ -18,6 +18,7 @@ pub trait QueryPartitionProvider: std::fmt::Display + Send + Sync + std::fmt::De ) -> Result>; } +/// PartitionCache allows to query partitions based on the insert_time range #[derive(Debug)] pub struct PartitionCache { pub partitions: Vec, @@ -90,16 +91,14 @@ impl PartitionCache { }) } + // overlap test for a specific view pub fn filter( &self, view_set_name: &str, view_instance_id: &str, begin_insert: DateTime, end_insert: DateTime, - ) -> Result { - if begin_insert < self.begin_insert || end_insert > self.end_insert { - anyhow::bail!("filtering from a result set that's not large enough"); - } + ) -> Self { let mut partitions = vec![]; for part in &self.partitions { if *part.view_metadata.view_set_name == view_set_name @@ -110,16 +109,61 @@ impl PartitionCache { partitions.push(part.clone()); } } - Ok(Self { + Self { partitions, begin_insert, end_insert, - }) + } + } + + // overlap test for a all views + pub fn filter_insert_range( + &self, + begin_insert: DateTime, + end_insert: DateTime, + ) -> Self { + let mut partitions = vec![]; + for part in &self.partitions { + if part.begin_insert_time < end_insert && part.end_insert_time > begin_insert { + partitions.push(part.clone()); + } + } + Self { + partitions, + begin_insert, + end_insert, + } + } + + // single view that fits completely in the specified range + pub fn filter_inside_range( + &self, + view_set_name: &str, + view_instance_id: &str, + begin_insert: DateTime, + end_insert: DateTime, + ) -> Self { + let mut partitions = vec![]; + for part in &self.partitions { + if *part.view_metadata.view_set_name == view_set_name + && *part.view_metadata.view_instance_id == view_instance_id + && part.begin_insert_time >= begin_insert + && part.end_insert_time <= end_insert + { + partitions.push(part.clone()); + } + } + Self { + partitions, + begin_insert, + end_insert, + } } } #[async_trait] impl QueryPartitionProvider for PartitionCache { + /// unlike LivePartitionProvider, the query_range is tested against the insertion time, not the event time async fn fetch( &self, view_set_name: &str, @@ -129,11 +173,14 @@ impl QueryPartitionProvider for PartitionCache { ) -> Result> { let mut partitions = vec![]; if let Some(range) = query_range { + if range.begin < self.begin_insert || range.end > self.end_insert { + anyhow::bail!("filtering from a result set that's not large enough"); + } for part in &self.partitions { if *part.view_metadata.view_set_name == view_set_name && *part.view_metadata.view_instance_id == view_instance_id - && part.min_event_time < range.end - && part.max_event_time > range.begin + && part.begin_insert_time < range.end + && part.end_insert_time > range.begin && part.view_metadata.file_schema_hash == file_schema_hash { partitions.push(part.clone()); @@ -263,3 +310,25 @@ impl QueryPartitionProvider for LivePartitionProvider { Ok(partitions) } } + +#[derive(Debug)] +pub struct NullPartitionProvider {} + +impl fmt::Display for NullPartitionProvider { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +#[async_trait] +impl QueryPartitionProvider for NullPartitionProvider { + async fn fetch( + &self, + _view_set_name: &str, + _view_instance_id: &str, + _query_range: Option, + _file_schema_hash: Vec, + ) -> Result> { + Ok(vec![]) + } +} diff --git a/rust/analytics/src/lakehouse/partition_source_data.rs b/rust/analytics/src/lakehouse/partition_source_data.rs index e2ce19fc..b8885639 100644 --- a/rust/analytics/src/lakehouse/partition_source_data.rs +++ b/rust/analytics/src/lakehouse/partition_source_data.rs @@ -1,12 +1,14 @@ +use super::partition_cache::PartitionCache; use crate::{ + dfext::typed_column::typed_column_by_name, lakehouse::{blocks_view::BlocksView, query::query_single_view}, time::TimeRange, }; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; use datafusion::arrow::array::{ - Array, ArrayRef, AsArray, BinaryArray, GenericListArray, Int32Array, Int64Array, RecordBatch, - StringArray, StructArray, TimestampNanosecondArray, + Array, ArrayRef, AsArray, BinaryArray, GenericListArray, Int32Array, Int64Array, StringArray, + StructArray, TimestampNanosecondArray, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use micromegas_telemetry::property::Property; @@ -15,8 +17,6 @@ use micromegas_tracing::prelude::*; use std::sync::Arc; use uuid::Uuid; -use super::partition_cache::QueryPartitionProvider; - pub struct PartitionSourceBlock { pub block: BlockMetadata, pub stream: Arc, @@ -34,16 +34,6 @@ pub fn hash_to_object_count(hash: &[u8]) -> Result { )) } -pub fn get_column<'a, T: core::any::Any>(rc: &'a RecordBatch, column_name: &str) -> Result<&'a T> { - let column = rc - .column_by_name(column_name) - .with_context(|| format!("getting column {column_name}"))?; - column - .as_any() - .downcast_ref::() - .with_context(|| format!("casting {column_name}: {:?}", column.data_type())) -} - fn read_property_list(value: ArrayRef) -> Result> { let properties: &StructArray = value.as_struct(); let (key_index, _key_field) = properties @@ -66,7 +56,7 @@ fn read_property_list(value: ArrayRef) -> Result> { pub async fn fetch_partition_source_data( lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, source_stream_tag: &str, @@ -91,9 +81,15 @@ pub async fn fetch_partition_source_data( ;"); let mut block_ids_hash: i64 = 0; let mut partition_src_blocks = vec![]; + // I really don't like the dependency on the blocks_view time filter being on insert_time. + // If we were to make the 'time relevance' of a block more generous, we would need to add a condition + // on the insert_time of the blocks in the previous SQL. + + // todo: remove query_single_view, use PartitionCache::filter + // todo: add query_partitions to use PartitionedTableProvider let blocks_answer = query_single_view( lake, - part_provider, + existing_partitions, Some(TimeRange::new(begin_insert, end_insert)), &sql, blocks_view, @@ -101,38 +97,42 @@ pub async fn fetch_partition_source_data( .await .with_context(|| "blocks query")?; for b in blocks_answer.record_batches { - let block_id_column: &StringArray = get_column(&b, "block_id")?; - let stream_id_column: &StringArray = get_column(&b, "stream_id")?; - let process_id_column: &StringArray = get_column(&b, "process_id")?; - let begin_time_column: &TimestampNanosecondArray = get_column(&b, "begin_time")?; - let begin_ticks_column: &Int64Array = get_column(&b, "begin_ticks")?; - let end_time_column: &TimestampNanosecondArray = get_column(&b, "end_time")?; - let end_ticks_column: &Int64Array = get_column(&b, "end_ticks")?; - let nb_objects_column: &Int32Array = get_column(&b, "nb_objects")?; - let object_offset_column: &Int64Array = get_column(&b, "object_offset")?; - let payload_size_column: &Int64Array = get_column(&b, "payload_size")?; + let block_id_column: &StringArray = typed_column_by_name(&b, "block_id")?; + let stream_id_column: &StringArray = typed_column_by_name(&b, "stream_id")?; + let process_id_column: &StringArray = typed_column_by_name(&b, "process_id")?; + let begin_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "begin_time")?; + let begin_ticks_column: &Int64Array = typed_column_by_name(&b, "begin_ticks")?; + let end_time_column: &TimestampNanosecondArray = typed_column_by_name(&b, "end_time")?; + let end_ticks_column: &Int64Array = typed_column_by_name(&b, "end_ticks")?; + let nb_objects_column: &Int32Array = typed_column_by_name(&b, "nb_objects")?; + let object_offset_column: &Int64Array = typed_column_by_name(&b, "object_offset")?; + let payload_size_column: &Int64Array = typed_column_by_name(&b, "payload_size")?; let block_insert_time_column: &TimestampNanosecondArray = - get_column(&b, "block_insert_time")?; + typed_column_by_name(&b, "block_insert_time")?; let dependencies_metadata_column: &BinaryArray = - get_column(&b, "streams.dependencies_metadata")?; - let objects_metadata_column: &BinaryArray = get_column(&b, "streams.objects_metadata")?; - let stream_tags_column: &GenericListArray = get_column(&b, "streams.tags")?; + typed_column_by_name(&b, "streams.dependencies_metadata")?; + let objects_metadata_column: &BinaryArray = + typed_column_by_name(&b, "streams.objects_metadata")?; + let stream_tags_column: &GenericListArray = typed_column_by_name(&b, "streams.tags")?; let stream_properties_column: &GenericListArray = - get_column(&b, "streams.properties")?; + typed_column_by_name(&b, "streams.properties")?; let process_start_time_column: &TimestampNanosecondArray = - get_column(&b, "processes.start_time")?; - let process_start_ticks_column: &Int64Array = get_column(&b, "processes.start_ticks")?; - let process_tsc_freq_column: &Int64Array = get_column(&b, "processes.tsc_frequency")?; - let process_exe_column: &StringArray = get_column(&b, "processes.exe")?; - let process_username_column: &StringArray = get_column(&b, "processes.username")?; - let process_realname_column: &StringArray = get_column(&b, "processes.realname")?; - let process_computer_column: &StringArray = get_column(&b, "processes.computer")?; - let process_distro_column: &StringArray = get_column(&b, "processes.distro")?; - let process_cpu_column: &StringArray = get_column(&b, "processes.cpu_brand")?; - let process_parent_column: &StringArray = get_column(&b, "processes.parent_process_id")?; + typed_column_by_name(&b, "processes.start_time")?; + let process_start_ticks_column: &Int64Array = + typed_column_by_name(&b, "processes.start_ticks")?; + let process_tsc_freq_column: &Int64Array = + typed_column_by_name(&b, "processes.tsc_frequency")?; + let process_exe_column: &StringArray = typed_column_by_name(&b, "processes.exe")?; + let process_username_column: &StringArray = typed_column_by_name(&b, "processes.username")?; + let process_realname_column: &StringArray = typed_column_by_name(&b, "processes.realname")?; + let process_computer_column: &StringArray = typed_column_by_name(&b, "processes.computer")?; + let process_distro_column: &StringArray = typed_column_by_name(&b, "processes.distro")?; + let process_cpu_column: &StringArray = typed_column_by_name(&b, "processes.cpu_brand")?; + let process_parent_column: &StringArray = + typed_column_by_name(&b, "processes.parent_process_id")?; let process_properties_column: &GenericListArray = - get_column(&b, "processes.properties")?; + typed_column_by_name(&b, "processes.properties")?; for ir in 0..b.num_rows() { let block_insert_time = block_insert_time_column.value(ir); diff --git a/rust/analytics/src/lakehouse/partitioned_execution_plan.rs b/rust/analytics/src/lakehouse/partitioned_execution_plan.rs new file mode 100644 index 00000000..fd6aed55 --- /dev/null +++ b/rust/analytics/src/lakehouse/partitioned_execution_plan.rs @@ -0,0 +1,42 @@ +use super::{partition::Partition, reader_factory::ReaderFactory}; +use crate::dfext::predicate::filters_to_predicate; +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::Session, + datasource::{ + listing::PartitionedFile, + physical_plan::{parquet::ParquetExecBuilder, FileScanConfig}, + }, + execution::object_store::ObjectStoreUrl, + physical_plan::ExecutionPlan, + prelude::*, +}; +use object_store::ObjectStore; +use std::sync::Arc; + +pub fn make_partitioned_execution_plan( + schema: SchemaRef, + object_store: Arc, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + partitions: Arc>, +) -> datafusion::error::Result> { + let predicate = filters_to_predicate(schema.clone(), state, filters)?; + let mut file_group = vec![]; + for part in &*partitions { + file_group.push(PartitionedFile::new(&part.file_path, part.file_size as u64)); + } + + let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); + let file_scan_config = FileScanConfig::new(object_store_url, schema) + .with_limit(limit) + .with_projection(projection.cloned()) + .with_file_groups(vec![file_group]); + let reader_factory = ReaderFactory::new(object_store, partitions); + Ok(ParquetExecBuilder::new(file_scan_config) + .with_predicate(predicate.clone()) + .with_parquet_file_reader_factory(Arc::new(reader_factory)) + .build_arc()) +} diff --git a/rust/analytics/src/lakehouse/partitioned_table_provider.rs b/rust/analytics/src/lakehouse/partitioned_table_provider.rs new file mode 100644 index 00000000..55c5f964 --- /dev/null +++ b/rust/analytics/src/lakehouse/partitioned_table_provider.rs @@ -0,0 +1,78 @@ +use super::{partition::Partition, partitioned_execution_plan::make_partitioned_execution_plan}; +use async_trait::async_trait; +use datafusion::{ + arrow::datatypes::SchemaRef, + catalog::{Session, TableProvider}, + datasource::TableType, + logical_expr::TableProviderFilterPushDown, + physical_plan::ExecutionPlan, + prelude::*, +}; +use object_store::ObjectStore; +use std::{any::Any, sync::Arc}; + +// unlike MaterializedView, the partition list is fixed at construction +#[derive(Debug)] +pub struct PartitionedTableProvider { + schema: SchemaRef, + object_store: Arc, + partitions: Arc>, +} + +impl PartitionedTableProvider { + pub fn new( + schema: SchemaRef, + object_store: Arc, + partitions: Arc>, + ) -> Self { + Self { + schema, + object_store, + partitions, + } + } +} + +#[async_trait] +impl TableProvider for PartitionedTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> datafusion::error::Result> { + make_partitioned_execution_plan( + self.schema(), + self.object_store.clone(), + state, + projection, + filters, + limit, + self.partitions.clone(), + ) + } + + /// Tell DataFusion to push filters down to the scan method + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> datafusion::error::Result> { + // Inexact because the pruning can't handle all expressions and pruning + // is not done at the row level -- there may be rows in returned files + // that do not pass the filter + Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()]) + } +} diff --git a/rust/analytics/src/lakehouse/processes_view.rs b/rust/analytics/src/lakehouse/processes_view.rs index 9a48f1ab..e77a05f0 100644 --- a/rust/analytics/src/lakehouse/processes_view.rs +++ b/rust/analytics/src/lakehouse/processes_view.rs @@ -2,9 +2,8 @@ use crate::time::TimeRange; use super::{ metadata_partition_spec::fetch_metadata_partition_spec, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, - view_factory::ViewMaker, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -19,14 +18,8 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "processes"; const VIEW_INSTANCE_ID: &str = "global"; - -#[derive(Debug)] -pub struct ProcessesViewMaker {} - -impl ViewMaker for ProcessesViewMaker { - fn make_view(&self, view_instance_id: &str) -> Result> { - Ok(Arc::new(ProcessesView::new(view_instance_id)?)) - } +lazy_static::lazy_static! { + static ref INSERT_TIME_COLUMN: Arc = Arc::new( String::from("insert_time")); } #[derive(Debug)] @@ -38,11 +31,7 @@ pub struct ProcessesView { } impl ProcessesView { - pub fn new(view_instance_id: &str) -> Result { - if view_instance_id != "global" { - anyhow::bail!("only global view instance id is supported for metadata views"); - } - + pub fn new() -> Result { let data_sql = Arc::new(String::from( "SELECT process_id, exe, @@ -86,7 +75,7 @@ impl View for ProcessesView { async fn make_batch_partition_spec( &self, lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { @@ -148,6 +137,14 @@ impl View for ProcessesView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } } pub fn processes_view_schema() -> Schema { diff --git a/rust/analytics/src/lakehouse/query.rs b/rust/analytics/src/lakehouse/query.rs index 5683209a..837b3a18 100644 --- a/rust/analytics/src/lakehouse/query.rs +++ b/rust/analytics/src/lakehouse/query.rs @@ -7,7 +7,7 @@ use super::{ }; use crate::{ lakehouse::{ - table_provider::MaterializedView, table_scan_rewrite::TableScanRewrite, + materialized_view::MaterializedView, table_scan_rewrite::TableScanRewrite, view_instance_table_function::ViewInstanceTableFunction, }, time::TimeRange, @@ -17,7 +17,6 @@ use datafusion::{ arrow::array::RecordBatch, execution::{context::SessionContext, object_store::ObjectStoreUrl}, logical_expr::ScalarUDF, - sql::TableReference, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use micromegas_tracing::prelude::*; @@ -39,14 +38,7 @@ async fn register_table( part_provider, query_range.clone(), ); - let view_set_name = view.get_view_set_name().to_string(); - ctx.register_table( - TableReference::Bare { - table: view_set_name.into(), - }, - Arc::new(table), - )?; - Ok(()) + view.register_table(ctx, table).await } pub async fn query_single_view( diff --git a/rust/analytics/src/lakehouse/sql_batch_view.rs b/rust/analytics/src/lakehouse/sql_batch_view.rs new file mode 100644 index 00000000..689a8f6e --- /dev/null +++ b/rust/analytics/src/lakehouse/sql_batch_view.rs @@ -0,0 +1,212 @@ +use super::{ + materialized_view::MaterializedView, + partition_cache::{NullPartitionProvider, PartitionCache}, + query::make_session_context, + sql_partition_spec::fetch_sql_partition_spec, + view::{PartitionSpec, View, ViewMetadata}, + view_factory::ViewFactory, +}; +use crate::time::TimeRange; +use anyhow::{Context, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion::{ + arrow::datatypes::Schema, logical_expr::Between, prelude::*, scalar::ScalarValue, + sql::TableReference, +}; +use micromegas_ingestion::data_lake_connection::DataLakeConnection; +use std::hash::Hash; +use std::hash::Hasher; +use std::{hash::DefaultHasher, sync::Arc}; + +/// SQL-defined view updated in batch +#[derive(Debug)] +pub struct SqlBatchView { + view_set_name: Arc, + view_instance_id: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, + src_query: Arc, + transform_query: Arc, + merge_partitions_query: Arc, + schema: Arc, + view_factory: Arc, +} + +impl SqlBatchView { + #[allow(clippy::too_many_arguments)] + /// # Arguments + /// + /// * `view_set_name` - name of the table + /// * `min_event_time_column` - min(column) should result in the first timestamp in a dataframe + /// * `max_event_time_column` - max(column) should result in the last timestamp in a dataframe + /// * `src_query` - used to count the rows of the underlying data to know if a cached partition is up to date + /// * `transform_query` - used to transform the source data into a cached partition + /// * `merge_partitions_query` - used to merge multiple partitions into a single one (and user queries which are one multiple partitions by default) + /// * `lake` - data lake + /// * `view_factory` - all views accessible to the `src_query` + pub async fn new( + view_set_name: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, + src_query: Arc, + transform_query: Arc, + merge_partitions_query: Arc, + lake: Arc, + view_factory: Arc, + ) -> Result { + let null_part_provider = Arc::new(NullPartitionProvider {}); + let ctx = make_session_context(lake, null_part_provider, None, view_factory.clone()) + .await + .with_context(|| "make_session_context")?; + let src_df = ctx.sql(&src_query).await?; + let src_view = src_df.into_view(); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + src_view, + )?; + + let transformed_df = ctx.sql(&transform_query).await?; + let schema = transformed_df.schema().inner().clone(); + + Ok(Self { + view_set_name, + view_instance_id: Arc::new(String::from("global")), + min_event_time_column, + max_event_time_column, + src_query, + transform_query, + merge_partitions_query, + schema, + view_factory, + }) + } +} + +#[async_trait] +impl View for SqlBatchView { + fn get_view_set_name(&self) -> Arc { + self.view_set_name.clone() + } + + fn get_view_instance_id(&self) -> Arc { + self.view_instance_id.clone() + } + + async fn make_batch_partition_spec( + &self, + lake: Arc, + existing_partitions: Arc, + begin_insert: DateTime, + end_insert: DateTime, + ) -> Result> { + let view_meta = ViewMetadata { + view_set_name: self.get_view_set_name(), + view_instance_id: self.get_view_instance_id(), + file_schema_hash: self.get_file_schema_hash(), + }; + let partitions_in_range = + Arc::new(existing_partitions.filter_insert_range(begin_insert, end_insert)); + let ctx = make_session_context( + lake.clone(), + partitions_in_range.clone(), + None, + self.view_factory.clone(), + ) + .await + .with_context(|| "make_session_context")?; + let src_df = ctx.sql(&self.src_query).await?; + let src_view = src_df.into_view(); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + src_view, + )?; + + Ok(Arc::new( + fetch_sql_partition_spec( + ctx, + self.transform_query.clone(), + self.min_event_time_column.clone(), + self.max_event_time_column.clone(), + view_meta, + begin_insert, + end_insert, + ) + .await + .with_context(|| "fetch_sql_partition_spec")?, + )) + } + fn get_file_schema_hash(&self) -> Vec { + let mut hasher = DefaultHasher::new(); + self.schema.hash(&mut hasher); + hasher.finish().to_le_bytes().to_vec() + } + fn get_file_schema(&self) -> Arc { + self.schema.clone() + } + async fn jit_update( + &self, + _lake: Arc, + _query_range: Option, + ) -> Result<()> { + Ok(()) + } + fn make_time_filter(&self, begin: DateTime, end: DateTime) -> Result> { + let utc: Arc = Arc::from("+00:00"); + Ok(vec![Expr::Between(Between::new( + col("time_bin").into(), + false, + Expr::Literal(ScalarValue::TimestampNanosecond( + begin.timestamp_nanos_opt(), + Some(utc.clone()), + )) + .into(), + Expr::Literal(ScalarValue::TimestampNanosecond( + end.timestamp_nanos_opt(), + Some(utc.clone()), + )) + .into(), + ))]) + } + + fn get_min_event_time_column_name(&self) -> Arc { + self.min_event_time_column.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + self.max_event_time_column.clone() + } + + async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> { + let view_name = self.get_view_set_name().to_string(); + let partitions_table_name = format!("__{view_name}__partitions"); + ctx.register_table( + TableReference::Bare { + table: partitions_table_name.clone().into(), + }, + Arc::new(table), + )?; + let df = ctx + .sql( + &self + .merge_partitions_query + .replace("{source}", &partitions_table_name), + ) + .await?; + ctx.register_table( + TableReference::Bare { + table: view_name.into(), + }, + df.into_view(), + )?; + Ok(()) + } + + fn get_merge_partitions_query(&self) -> Arc { + self.merge_partitions_query.clone() + } +} diff --git a/rust/analytics/src/lakehouse/sql_partition_spec.rs b/rust/analytics/src/lakehouse/sql_partition_spec.rs new file mode 100644 index 00000000..cadb700d --- /dev/null +++ b/rust/analytics/src/lakehouse/sql_partition_spec.rs @@ -0,0 +1,142 @@ +use super::{ + view::{PartitionSpec, ViewMetadata}, + write_partition::write_partition_from_rows, +}; +use crate::{ + dfext::{min_max_time_df::min_max_time_dataframe, typed_column::typed_column_by_name}, + lakehouse::write_partition::PartitionRowSet, + response_writer::Logger, +}; +use anyhow::Result; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use datafusion::{ + arrow::array::{Int64Array, RecordBatch}, + prelude::*, +}; +use futures::StreamExt; +use micromegas_ingestion::data_lake_connection::DataLakeConnection; +use std::sync::Arc; + +pub struct SqlPartitionSpec { + ctx: SessionContext, + transform_query: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, + view_metadata: ViewMetadata, + begin_insert: DateTime, + end_insert: DateTime, + record_count: i64, +} + +impl SqlPartitionSpec { + #[allow(clippy::too_many_arguments)] + pub fn new( + ctx: SessionContext, + transform_query: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, + view_metadata: ViewMetadata, + begin_insert: DateTime, + end_insert: DateTime, + record_count: i64, + ) -> Self { + Self { + ctx, + transform_query, + min_event_time_column, + max_event_time_column, + view_metadata, + begin_insert, + end_insert, + record_count, + } + } +} + +#[async_trait] +impl PartitionSpec for SqlPartitionSpec { + fn get_source_data_hash(&self) -> Vec { + self.record_count.to_le_bytes().to_vec() + } + + async fn write(&self, lake: Arc, logger: Arc) -> Result<()> { + if self.record_count == 0 { + return Ok(()); + } + let desc = format!( + "[{}, {}] {} {}", + self.view_metadata.view_set_name, + self.view_metadata.view_instance_id, + self.begin_insert.to_rfc3339(), + self.end_insert.to_rfc3339() + ); + logger.write_log_entry(format!("writing {desc}")).await?; + let df = self.ctx.sql(&self.transform_query).await?; + let schema = df.schema().inner().clone(); + let mut stream = df.execute_stream().await?; + + let (tx, rx) = tokio::sync::mpsc::channel(1); + let join_handle = tokio::spawn(write_partition_from_rows( + lake.clone(), + self.view_metadata.clone(), + schema, + self.begin_insert, + self.end_insert, + self.get_source_data_hash(), + rx, + logger.clone(), + )); + + while let Some(rb_res) = stream.next().await { + let rb = rb_res?; + let (mintime, maxtime) = min_max_time_dataframe( + self.ctx.read_batch(rb.clone())?, + &self.min_event_time_column, + &self.max_event_time_column, + ) + .await?; + tx.send(PartitionRowSet { + min_time_row: mintime, + max_time_row: maxtime, + rows: rb, + }) + .await?; + } + drop(tx); + join_handle.await??; + Ok(()) + } +} + +pub async fn fetch_sql_partition_spec( + ctx: SessionContext, + transform_query: Arc, + min_event_time_column: Arc, + max_event_time_column: Arc, + view_metadata: ViewMetadata, + begin_insert: DateTime, + end_insert: DateTime, +) -> Result { + let df = ctx.sql("SELECT COUNT(*) as count FROM source;").await?; + let batches: Vec = df.collect().await?; + if batches.len() != 1 { + anyhow::bail!("fetch_sql_partition_spec: query should return a single batch"); + } + let rb = &batches[0]; + let count_column: &Int64Array = typed_column_by_name(rb, "count")?; + if count_column.len() != 1 { + anyhow::bail!("fetch_sql_partition_spec: query should return a single row"); + } + let count = count_column.value(0); + Ok(SqlPartitionSpec::new( + ctx, + transform_query, + min_event_time_column, + max_event_time_column, + view_metadata, + begin_insert, + end_insert, + count, + )) +} diff --git a/rust/analytics/src/lakehouse/streams_view.rs b/rust/analytics/src/lakehouse/streams_view.rs index 242215a6..37dce8a8 100644 --- a/rust/analytics/src/lakehouse/streams_view.rs +++ b/rust/analytics/src/lakehouse/streams_view.rs @@ -2,9 +2,8 @@ use crate::time::TimeRange; use super::{ metadata_partition_spec::fetch_metadata_partition_spec, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, view::{PartitionSpec, View, ViewMetadata}, - view_factory::ViewMaker, }; use anyhow::{Context, Result}; use async_trait::async_trait; @@ -19,14 +18,8 @@ use std::sync::Arc; const VIEW_SET_NAME: &str = "streams"; const VIEW_INSTANCE_ID: &str = "global"; - -#[derive(Debug)] -pub struct StreamsViewMaker {} - -impl ViewMaker for StreamsViewMaker { - fn make_view(&self, view_instance_id: &str) -> Result> { - Ok(Arc::new(StreamsView::new(view_instance_id)?)) - } +lazy_static::lazy_static! { + static ref INSERT_TIME_COLUMN: Arc = Arc::new( String::from("insert_time")); } #[derive(Debug)] @@ -38,11 +31,7 @@ pub struct StreamsView { } impl StreamsView { - pub fn new(view_instance_id: &str) -> Result { - if view_instance_id != "global" { - anyhow::bail!("only global view instance id is supported for metadata views"); - } - + pub fn new() -> Result { let data_sql = Arc::new(String::from( "SELECT stream_id, process_id, @@ -80,7 +69,7 @@ impl View for StreamsView { async fn make_batch_partition_spec( &self, lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result> { @@ -142,6 +131,14 @@ impl View for StreamsView { .into(), ))]) } + + fn get_min_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + INSERT_TIME_COLUMN.clone() + } } pub fn streams_view_schema() -> Schema { diff --git a/rust/analytics/src/lakehouse/table_scan_rewrite.rs b/rust/analytics/src/lakehouse/table_scan_rewrite.rs index f6d29bf3..6f495ad1 100644 --- a/rust/analytics/src/lakehouse/table_scan_rewrite.rs +++ b/rust/analytics/src/lakehouse/table_scan_rewrite.rs @@ -1,4 +1,4 @@ -use crate::{lakehouse::table_provider::MaterializedView, time::TimeRange}; +use crate::{lakehouse::materialized_view::MaterializedView, time::TimeRange}; use datafusion::error::DataFusionError; use datafusion::logical_expr::utils::conjunction; use datafusion::logical_expr::Filter; diff --git a/rust/analytics/src/lakehouse/thread_spans_view.rs b/rust/analytics/src/lakehouse/thread_spans_view.rs index e20e214e..209c8f2c 100644 --- a/rust/analytics/src/lakehouse/thread_spans_view.rs +++ b/rust/analytics/src/lakehouse/thread_spans_view.rs @@ -1,6 +1,6 @@ use super::{ jit_partitions::{generate_jit_partitions, is_jit_partition_up_to_date}, - partition_cache::QueryPartitionProvider, + partition_cache::PartitionCache, partition_source_data::{hash_to_object_count, PartitionSourceDataBlocks}, view::{PartitionSpec, View, ViewMetadata}, view_factory::ViewMaker, @@ -28,6 +28,10 @@ use std::sync::Arc; use uuid::Uuid; const VIEW_SET_NAME: &str = "thread_spans"; +lazy_static::lazy_static! { + static ref MIN_TIME_COLUMN: Arc = Arc::new( String::from("begin")); + static ref MAX_TIME_COLUMN: Arc = Arc::new( String::from("end")); +} #[derive(Debug)] pub struct ThreadSpansViewMaker {} @@ -191,7 +195,7 @@ impl View for ThreadSpansView { async fn make_batch_partition_spec( &self, _lake: Arc, - _part_provider: Arc, + _existing_partitions: Arc, _begin_insert: DateTime, _end_insert: DateTime, ) -> Result> { @@ -278,4 +282,12 @@ impl View for ThreadSpansView { )), ]) } + + fn get_min_event_time_column_name(&self) -> Arc { + MIN_TIME_COLUMN.clone() + } + + fn get_max_event_time_column_name(&self) -> Arc { + MAX_TIME_COLUMN.clone() + } } diff --git a/rust/analytics/src/lakehouse/view.rs b/rust/analytics/src/lakehouse/view.rs index 8c364789..53f6e863 100644 --- a/rust/analytics/src/lakehouse/view.rs +++ b/rust/analytics/src/lakehouse/view.rs @@ -1,13 +1,12 @@ +use super::{materialized_view::MaterializedView, partition_cache::PartitionCache}; use crate::{response_writer::Logger, time::TimeRange}; use anyhow::Result; use async_trait::async_trait; use chrono::{DateTime, Utc}; -use datafusion::{arrow::datatypes::Schema, logical_expr::Expr}; +use datafusion::{arrow::datatypes::Schema, logical_expr::Expr, prelude::*, sql::TableReference}; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use std::sync::Arc; -use super::partition_cache::QueryPartitionProvider; - #[async_trait] pub trait PartitionSpec: Send + Sync { fn get_source_data_hash(&self) -> Vec; @@ -34,7 +33,7 @@ pub trait View: std::fmt::Debug + Send + Sync { async fn make_batch_partition_spec( &self, lake: Arc, - part_provider: Arc, + existing_partitions: Arc, begin_insert: DateTime, end_insert: DateTime, ) -> Result>; @@ -56,6 +55,28 @@ pub trait View: std::fmt::Debug + Send + Sync { /// make_time_filter returns a set of expressions that will filter out the rows of the partition /// outside the time range requested. fn make_time_filter(&self, _begin: DateTime, _end: DateTime) -> Result>; + + /// name of the column to take the min() of to get the first event timestamp in a dataframe + fn get_min_event_time_column_name(&self) -> Arc; + + /// name of the column to take the max() of to get the last event timestamp in a dataframe + fn get_max_event_time_column_name(&self) -> Arc; + + /// register the table in the SessionContext + async fn register_table(&self, ctx: &SessionContext, table: MaterializedView) -> Result<()> { + let view_set_name = self.get_view_set_name().to_string(); + ctx.register_table( + TableReference::Bare { + table: view_set_name.into(), + }, + Arc::new(table), + )?; + Ok(()) + } + + fn get_merge_partitions_query(&self) -> Arc { + Arc::new(String::from("SELECT * FROM {source};")) + } } impl dyn View { diff --git a/rust/analytics/src/lakehouse/view_factory.rs b/rust/analytics/src/lakehouse/view_factory.rs index 26499ecf..70455046 100644 --- a/rust/analytics/src/lakehouse/view_factory.rs +++ b/rust/analytics/src/lakehouse/view_factory.rs @@ -136,9 +136,11 @@ //! //! //! +use super::blocks_view::BlocksView; +use super::processes_view::ProcessesView; +use super::streams_view::StreamsView; use super::{ - blocks_view::BlocksViewMaker, log_view::LogViewMaker, metrics_view::MetricsViewMaker, - processes_view::ProcessesViewMaker, streams_view::StreamsViewMaker, + log_view::LogViewMaker, metrics_view::MetricsViewMaker, thread_spans_view::ThreadSpansViewMaker, view::View, }; use anyhow::Result; @@ -149,7 +151,7 @@ pub trait ViewMaker: Send + Sync + Debug { fn make_view(&self, view_instance_id: &str) -> Result>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ViewFactory { view_sets: HashMap>, global_views: Vec>, @@ -167,6 +169,10 @@ impl ViewFactory { &self.global_views } + pub fn add_global_view(&mut self, view: Arc) { + self.global_views.push(view); + } + pub fn add_view_set(&mut self, view_set_name: String, maker: Arc) { self.view_sets.insert(view_set_name, maker); } @@ -183,15 +189,15 @@ impl ViewFactory { pub fn default_view_factory() -> Result { let log_view_maker = Arc::new(LogViewMaker {}); let metrics_view_maker = Arc::new(MetricsViewMaker {}); - let processes_view_maker = Arc::new(ProcessesViewMaker {}); - let streams_view_maker = Arc::new(StreamsViewMaker {}); - let blocks_view_maker = Arc::new(BlocksViewMaker {}); + let processes_view = Arc::new(ProcessesView::new()?); + let streams_view = Arc::new(StreamsView::new()?); + let blocks_view = Arc::new(BlocksView::new()?); let global_views = vec![ log_view_maker.make_view("global")?, metrics_view_maker.make_view("global")?, - processes_view_maker.make_view("global")?, - streams_view_maker.make_view("global")?, - blocks_view_maker.make_view("global")?, + processes_view, + streams_view, + blocks_view, ]; let mut factory = ViewFactory::new(global_views); factory.add_view_set(String::from("log_entries"), log_view_maker); @@ -200,8 +206,5 @@ pub fn default_view_factory() -> Result { String::from("thread_spans"), Arc::new(ThreadSpansViewMaker {}), ); - factory.add_view_set(String::from("processes"), processes_view_maker); - factory.add_view_set(String::from("streams"), streams_view_maker); - factory.add_view_set(String::from("blocks"), blocks_view_maker); Ok(factory) } diff --git a/rust/analytics/src/lakehouse/view_instance_table_function.rs b/rust/analytics/src/lakehouse/view_instance_table_function.rs index 5522a03a..9d3f812f 100644 --- a/rust/analytics/src/lakehouse/view_instance_table_function.rs +++ b/rust/analytics/src/lakehouse/view_instance_table_function.rs @@ -1,5 +1,5 @@ use super::{ - partition_cache::QueryPartitionProvider, table_provider::MaterializedView, + materialized_view::MaterializedView, partition_cache::QueryPartitionProvider, view_factory::ViewFactory, }; use crate::time::TimeRange; diff --git a/rust/analytics/src/lakehouse/write_partition.rs b/rust/analytics/src/lakehouse/write_partition.rs index 65d0d4c8..b01cad2c 100644 --- a/rust/analytics/src/lakehouse/write_partition.rs +++ b/rust/analytics/src/lakehouse/write_partition.rs @@ -197,7 +197,7 @@ pub async fn write_partition_from_rows( lake: Arc, view_metadata: ViewMetadata, file_schema: Arc, - begin_insert_time: DateTime, + begin_insert_time: DateTime, //todo:change to min_insert_time & max_insert_time end_insert_time: DateTime, source_data_hash: Vec, mut rb_stream: Receiver, diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs new file mode 100644 index 00000000..a0ea2077 --- /dev/null +++ b/rust/analytics/tests/sql_view_test.rs @@ -0,0 +1,302 @@ +use anyhow::{Context, Result}; +use chrono::{DateTime, DurationRound}; +use chrono::{TimeDelta, Utc}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; +use micromegas_analytics::lakehouse::blocks_view::BlocksView; +use micromegas_analytics::lakehouse::partition_cache::{LivePartitionProvider, PartitionCache}; +use micromegas_analytics::lakehouse::query::query; +use micromegas_analytics::lakehouse::view::View; +use micromegas_analytics::lakehouse::view_factory::default_view_factory; +use micromegas_analytics::lakehouse::write_partition::retire_partitions; +use micromegas_analytics::lakehouse::{sql_batch_view::SqlBatchView, view_factory::ViewFactory}; +use micromegas_analytics::response_writer::{Logger, ResponseWriter}; +use micromegas_analytics::time::TimeRange; +use micromegas_ingestion::data_lake_connection::{connect_to_data_lake, DataLakeConnection}; +use micromegas_telemetry_sink::TelemetryGuardBuilder; +use micromegas_tracing::prelude::*; +use std::sync::Arc; + +async fn make_log_entries_levels_per_process_minute_view( + lake: Arc, + view_factory: Arc, +) -> Result { + let src_query = Arc::new(String::from( + " + SELECT process_id, + time, + CAST(level==1 as INT) as fatal, + CAST(level==2 as INT) as err, + CAST(level==3 as INT) as warn, + CAST(level==4 as INT) as info, + CAST(level==5 as INT) as debug, + CAST(level==6 as INT) as trace + FROM log_entries;", + )); + let transform_query = Arc::new(String::from( + " + SELECT date_bin('1 minute', time) as time_bin, + min(time) as min_time, + max(time) as max_time, + process_id, + sum(fatal) as nb_fatal, + sum(err) as nb_err, + sum(warn) as nb_warn, + sum(info) as nb_info, + sum(debug) as nb_debug, + sum(trace) as nb_trace + FROM source + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", + )); + let merge_partitions_query = Arc::new(String::from( + " + SELECT time_bin, + min(min_time) as min_time, + max(max_time) as max_time, + process_id, + sum(nb_fatal) as nb_fatal, + sum(nb_err) as nb_err, + sum(nb_warn) as nb_warn, + sum(nb_info) as nb_info, + sum(nb_debug) as nb_debug, + sum(nb_trace) as nb_trace + FROM {source} + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", + )); + let time_column = Arc::new(String::from("time_bin")); + SqlBatchView::new( + Arc::new("log_entries_per_process_per_minute".to_owned()), + time_column.clone(), + time_column, + src_query, + transform_query, + merge_partitions_query, + lake, + view_factory, + ) + .await +} + +pub async fn materialize_range( + lake: Arc, + view_factory: Arc, + log_summary_view: Arc, + begin_range: DateTime, + end_range: DateTime, + partition_time_delta: TimeDelta, + logger: Arc, +) -> Result<()> { + let mut partitions = Arc::new( + // todo: query only the blocks partitions for this call + PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) + .await?, + ); + let blocks_view = Arc::new(BlocksView::new()?); + materialize_partition_range( + partitions.clone(), + lake.clone(), + blocks_view, + begin_range, + end_range, + partition_time_delta, + logger.clone(), + ) + .await?; + partitions = Arc::new( + PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) + .await?, + ); + let log_entries_view = view_factory.make_view("log_entries", "global")?; + materialize_partition_range( + partitions.clone(), + lake.clone(), + log_entries_view, + begin_range, + end_range, + partition_time_delta, + logger.clone(), + ) + .await?; + partitions = Arc::new( + PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) + .await?, + ); + materialize_partition_range( + partitions.clone(), + lake.clone(), + log_summary_view.clone(), + begin_range, + end_range, + partition_time_delta, + logger.clone(), + ) + .await?; + Ok(()) +} + +async fn retire_existing_partitions( + lake: Arc, + log_summary_view: Arc, + logger: Arc, +) -> Result<()> { + let mut tr = lake.db_pool.begin().await?; + let now = Utc::now(); + let begin = now - TimeDelta::days(10); + retire_partitions( + &mut tr, + &log_summary_view.get_view_set_name(), + &log_summary_view.get_view_instance_id(), + begin, + now, + logger, + ) + .await?; + tr.commit().await.with_context(|| "commit")?; + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn sql_view_test() -> Result<()> { + let _telemetry_guard = TelemetryGuardBuilder::default() + .with_ctrlc_handling() + .with_local_sink_max_level(LevelFilter::Info) + .build(); + let connection_string = std::env::var("MICROMEGAS_SQL_CONNECTION_STRING") + .with_context(|| "reading MICROMEGAS_SQL_CONNECTION_STRING")?; + let object_store_uri = std::env::var("MICROMEGAS_OBJECT_STORE_URI") + .with_context(|| "reading MICROMEGAS_OBJECT_STORE_URI")?; + let lake = Arc::new(connect_to_data_lake(&connection_string, &object_store_uri).await?); + let mut view_factory = default_view_factory()?; + let log_summary_view = Arc::new( + make_log_entries_levels_per_process_minute_view( + lake.clone(), + Arc::new(view_factory.clone()), + ) + .await?, + ); + view_factory.add_global_view(log_summary_view.clone()); + let view_factory = Arc::new(view_factory); + let null_response_writer = Arc::new(ResponseWriter::new(None)); + retire_existing_partitions( + lake.clone(), + log_summary_view.clone(), + null_response_writer.clone(), + ) + .await?; + let ref_schema = Arc::new(Schema::new(vec![ + Field::new( + "time_bin", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "min_time", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "max_time", + DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into())), + true, + ), + Field::new( + "process_id", + DataType::Dictionary(DataType::Int16.into(), DataType::Utf8.into()), + false, + ), + Field::new("nb_fatal", DataType::Int64, true), + Field::new("nb_err", DataType::Int64, true), + Field::new("nb_warn", DataType::Int64, true), + Field::new("nb_info", DataType::Int64, true), + Field::new("nb_debug", DataType::Int64, true), + Field::new("nb_trace", DataType::Int64, true), + ])); + assert_eq!(log_summary_view.get_file_schema(), ref_schema); + let ref_schema_hash: Vec = vec![105, 221, 132, 185, 221, 232, 62, 136]; + assert_eq!(log_summary_view.get_file_schema_hash(), ref_schema_hash); + let end_range = Utc::now().duration_trunc(TimeDelta::minutes(1))?; + let begin_range = end_range - (TimeDelta::minutes(3)); + materialize_range( + lake.clone(), + view_factory.clone(), + log_summary_view.clone(), + begin_range, + end_range, + TimeDelta::seconds(10), + null_response_writer.clone(), + ) + .await?; + materialize_range( + lake.clone(), + view_factory.clone(), + log_summary_view.clone(), + begin_range, + end_range, + TimeDelta::minutes(1), + null_response_writer.clone(), + ) + .await?; + let answer = query( + lake.clone(), + Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), + Some(TimeRange::new(begin_range, end_range)), + " + SELECT time_bin, + min_time, + max_time, + process_id, + nb_fatal, + nb_err, + nb_warn, + nb_info, + nb_debug, + nb_trace + FROM log_entries_per_process_per_minute + ORDER BY time_bin, process_id;", + view_factory.clone(), + ) + .await?; + let pretty_results_view = + datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); + eprintln!("{pretty_results_view}"); + + let answer = query( + lake.clone(), + Arc::new(LivePartitionProvider::new(lake.db_pool.clone())), + Some(TimeRange::new(begin_range, end_range)), + " + SELECT date_bin('1 minute', time) as time_bin, + min(time) as min_time, + max(time) as max_time, + process_id, + sum(nb_fatal) as nb_fatal, + sum(nb_err) as nb_err, + sum(nb_warn) as nb_warn, + sum(nb_info) as nb_info, + sum(nb_debug) as nb_debug, + sum(nb_trace) as nb_trace + FROM ( + SELECT process_id, + time, + CAST(level==1 as INT) as nb_fatal, + CAST(level==2 as INT) as nb_err, + CAST(level==3 as INT) as nb_warn, + CAST(level==4 as INT) as nb_info, + CAST(level==5 as INT) as nb_debug, + CAST(level==6 as INT) as nb_trace + FROM log_entries + ) + GROUP BY process_id, time_bin + ORDER BY time_bin, process_id;", + view_factory, + ) + .await?; + let pretty_results_ref = + datafusion::arrow::util::pretty::pretty_format_batches(&answer.record_batches)?.to_string(); + eprintln!("{pretty_results_ref}"); + assert_eq!(pretty_results_view, pretty_results_ref); + Ok(()) +} diff --git a/rust/public/src/servers/maintenance.rs b/rust/public/src/servers/maintenance.rs index 360981d3..ed6785c9 100644 --- a/rust/public/src/servers/maintenance.rs +++ b/rust/public/src/servers/maintenance.rs @@ -3,7 +3,10 @@ use chrono::{DateTime, DurationRound}; use chrono::{TimeDelta, Utc}; use micromegas_analytics::delete::delete_old_data; use micromegas_analytics::lakehouse::batch_update::materialize_partition_range; +use micromegas_analytics::lakehouse::blocks_view::BlocksView; use micromegas_analytics::lakehouse::partition_cache::PartitionCache; +use micromegas_analytics::lakehouse::processes_view::ProcessesView; +use micromegas_analytics::lakehouse::streams_view::StreamsView; use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files; use micromegas_analytics::lakehouse::view::View; use micromegas_analytics::lakehouse::view_factory::ViewFactory; @@ -83,6 +86,7 @@ pub async fn materialize_all_views( let end_range = now.duration_trunc(partition_time_delta)?; let begin_range = end_range - (partition_time_delta * nb_partitions); let mut partitions = Arc::new( + // todo: query only the blocks partitions for this call PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) .await?, ); @@ -151,10 +155,10 @@ pub async fn every_second( } pub async fn daemon(lake: Arc, view_factory: Arc) -> Result<()> { - let blocks_view = view_factory.make_view("blocks", "global")?; - let views = Arc::new(vec![ - view_factory.make_view("processes", "global")?, - view_factory.make_view("streams", "global")?, + let blocks_view = Arc::new(BlocksView::new()?); + let views: Arc>> = Arc::new(vec![ + Arc::new(ProcessesView::new()?), + Arc::new(StreamsView::new()?), view_factory.make_view("log_entries", "global")?, view_factory.make_view("measures", "global")?, ]); diff --git a/rust/tracing/src/dispatch.rs b/rust/tracing/src/dispatch.rs index 7ac8cf6f..d4af0fe5 100644 --- a/rust/tracing/src/dispatch.rs +++ b/rust/tracing/src/dispatch.rs @@ -370,14 +370,21 @@ impl Dispatch { } pub fn get_sink(&self) -> Arc { - self.sink.read().unwrap().clone() + if let Ok(guard) = self.sink.try_read() { + (*guard).clone() + } else { + Arc::new(NullEventSink {}) + } } fn shutdown(&self) { let old_sink = self.get_sink(); - old_sink.on_shutdown(); let null_sink = Arc::new(NullEventSink {}); - *self.sink.write().unwrap() = null_sink; + if let Ok(mut guard) = self.sink.write() { + *guard = null_sink; + drop(guard) + } + old_sink.on_shutdown(); } fn startup(&self) { diff --git a/rust/tracing/src/flush_monitor.rs b/rust/tracing/src/flush_monitor.rs index 810e87ad..54e48d25 100644 --- a/rust/tracing/src/flush_monitor.rs +++ b/rust/tracing/src/flush_monitor.rs @@ -39,7 +39,11 @@ impl FlushMonitor { impl Default for FlushMonitor { fn default() -> Self { - // Default is to flush every minute - Self::new(60) + // Default is to flush every minute unless specified by the env variable + const DEFAULT_PERIOD: i64 = 60; + let nb_seconds = std::env::var("MICROMEGAS_FLUSH_PERIOD") + .map(|v| v.parse::().unwrap_or(DEFAULT_PERIOD)) + .unwrap_or(DEFAULT_PERIOD); + Self::new(nb_seconds) } } diff --git a/rust/tracing/src/logs/events.rs b/rust/tracing/src/logs/events.rs index c47f47e7..0ca65b48 100644 --- a/rust/tracing/src/logs/events.rs +++ b/rust/tracing/src/logs/events.rs @@ -291,14 +291,14 @@ impl InProcSerialize for TaggedLogString { } } +#[allow(unused_imports)] #[cfg(test)] mod test { - use std::thread; - use crate::logs::{ events::{Level, LevelFilter}, FilterState, LogMetadata, FILTER_LEVEL_UNSET_VALUE, }; + use std::thread; #[test] fn test_filter_levels() { diff --git a/rust/tracing/src/time.rs b/rust/tracing/src/time.rs index 94f9c30b..86e996a6 100644 --- a/rust/tracing/src/time.rs +++ b/rust/tracing/src/time.rs @@ -147,9 +147,10 @@ fn frequency_fallback() -> u64 { 0 } +#[allow(unused_imports)] #[cfg(test)] mod tests { - use super::*; + use crate::time::frequency; #[test] fn test_frequency() {