From d79bb75efec42b2e1309861f1e901a4582575238 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Desroches Date: Fri, 24 Jan 2025 16:18:19 -0500 Subject: [PATCH] query_partitions (#287) * factor out query_partitions from merge * use query_partitions in fetch_partition_source_data, streamline empty partition * PartitionCache::fetch_overlapping_insert_range_for_view --- rust/analytics/src/lakehouse/batch_update.rs | 3 + .../src/lakehouse/block_partition_spec.rs | 4 ++ rust/analytics/src/lakehouse/blocks_view.rs | 1 - rust/analytics/src/lakehouse/merge.rs | 28 +++------ .../src/lakehouse/metadata_partition_spec.rs | 4 ++ .../src/lakehouse/partition_cache.rs | 62 +++++++++++++++++++ .../src/lakehouse/partition_source_data.rs | 38 +++++------- rust/analytics/src/lakehouse/query.rs | 28 ++++++++- .../src/lakehouse/sql_partition_spec.rs | 4 ++ rust/analytics/src/lakehouse/view.rs | 1 + .../src/lakehouse/write_partition.rs | 2 +- rust/analytics/tests/sql_view_test.rs | 13 ++-- rust/public/src/servers/maintenance.rs | 11 +++- 13 files changed, 148 insertions(+), 51 deletions(-) diff --git a/rust/analytics/src/lakehouse/batch_update.rs b/rust/analytics/src/lakehouse/batch_update.rs index 94371b9..4d80a7d 100644 --- a/rust/analytics/src/lakehouse/batch_update.rs +++ b/rust/analytics/src/lakehouse/batch_update.rs @@ -125,6 +125,9 @@ async fn materialize_partition( ) .await .with_context(|| "make_batch_partition_spec")?; + if partition_spec.is_empty() { + return Ok(()); + } let view_instance_id = view.get_view_instance_id(); let strategy = verify_overlapping_partitions( &existing_partitions, diff --git a/rust/analytics/src/lakehouse/block_partition_spec.rs b/rust/analytics/src/lakehouse/block_partition_spec.rs index b467f9f..c95d0fb 100644 --- a/rust/analytics/src/lakehouse/block_partition_spec.rs +++ b/rust/analytics/src/lakehouse/block_partition_spec.rs @@ -37,6 +37,10 @@ pub struct BlockPartitionSpec { #[async_trait] impl PartitionSpec for BlockPartitionSpec { + fn is_empty(&self) -> bool { + self.source_data.blocks.is_empty() + } + fn get_source_data_hash(&self) -> Vec { self.source_data.block_ids_hash.clone() } diff --git a/rust/analytics/src/lakehouse/blocks_view.rs b/rust/analytics/src/lakehouse/blocks_view.rs index dc3b43f..6e941b4 100644 --- a/rust/analytics/src/lakehouse/blocks_view.rs +++ b/rust/analytics/src/lakehouse/blocks_view.rs @@ -112,7 +112,6 @@ impl View for BlocksView { anyhow::bail!("not supported"); } - // fetch_partition_source_data relies on this filter being on insert_time fn make_time_filter(&self, begin: DateTime, end: DateTime) -> Result> { let utc: Arc = Arc::from("+00:00"); Ok(vec![Expr::Between(Between::new( diff --git a/rust/analytics/src/lakehouse/merge.rs b/rust/analytics/src/lakehouse/merge.rs index a8efad6..ca3b142 100644 --- a/rust/analytics/src/lakehouse/merge.rs +++ b/rust/analytics/src/lakehouse/merge.rs @@ -2,14 +2,14 @@ use super::{ partition::Partition, partition_cache::PartitionCache, partition_source_data::hash_to_object_count, - partitioned_table_provider::PartitionedTableProvider, + query::query_partitions, view::View, write_partition::{write_partition_from_rows, PartitionRowSet}, }; use crate::{dfext::min_max_time_df::min_max_time_dataframe, response_writer::Logger}; use anyhow::Result; use chrono::{DateTime, Utc}; -use datafusion::{execution::object_store::ObjectStoreUrl, prelude::*, sql::TableReference}; +use datafusion::prelude::*; use futures::stream::StreamExt; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use std::sync::Arc; @@ -78,25 +78,16 @@ pub async fn create_merged_partition( filtered_partitions.len() )) .await?; - let object_store = lake.blob_storage.inner(); - let table = PartitionedTableProvider::new( - view.get_file_schema(), - object_store.clone(), - Arc::new(filtered_partitions), - ); - let ctx = SessionContext::new(); - let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); - ctx.register_object_store(object_store_url.as_ref(), object_store.clone()); - ctx.register_table( - TableReference::Bare { - table: "source".into(), - }, - Arc::new(table), - )?; let merge_query = view .get_merge_partitions_query() .replace("{source}", "source"); - let merged_df = ctx.sql(&merge_query).await?; + let merged_df = query_partitions( + lake.clone(), + view.get_file_schema(), + filtered_partitions, + &merge_query, + ) + .await?; let (tx, rx) = tokio::sync::mpsc::channel(1); let join_handle = tokio::spawn(write_partition_from_rows( lake.clone(), @@ -109,6 +100,7 @@ pub async fn create_merged_partition( logger.clone(), )); let mut stream = merged_df.execute_stream().await?; + let ctx = SessionContext::new(); while let Some(rb_res) = stream.next().await { let rb = rb_res?; let (mintime, maxtime) = min_max_time_dataframe( diff --git a/rust/analytics/src/lakehouse/metadata_partition_spec.rs b/rust/analytics/src/lakehouse/metadata_partition_spec.rs index 410eca8..9e541a2 100644 --- a/rust/analytics/src/lakehouse/metadata_partition_spec.rs +++ b/rust/analytics/src/lakehouse/metadata_partition_spec.rs @@ -58,6 +58,10 @@ pub async fn fetch_metadata_partition_spec( #[async_trait] impl PartitionSpec for MetadataPartitionSpec { + fn is_empty(&self) -> bool { + self.record_count < 1 + } + fn get_source_data_hash(&self) -> Vec { self.record_count.to_le_bytes().to_vec() } diff --git a/rust/analytics/src/lakehouse/partition_cache.rs b/rust/analytics/src/lakehouse/partition_cache.rs index 7b41faa..3dde765 100644 --- a/rust/analytics/src/lakehouse/partition_cache.rs +++ b/rust/analytics/src/lakehouse/partition_cache.rs @@ -91,6 +91,68 @@ impl PartitionCache { }) } + pub async fn fetch_overlapping_insert_range_for_view( + pool: &sqlx::PgPool, + view_set_name: Arc, + view_instance_id: Arc, + begin_insert: DateTime, + end_insert: DateTime, + ) -> Result { + let rows = sqlx::query( + "SELECT begin_insert_time, + end_insert_time, + min_event_time, + max_event_time, + updated, + file_path, + file_size, + file_schema_hash, + source_data_hash, + file_metadata + FROM lakehouse_partitions + WHERE begin_insert_time < $1 + AND end_insert_time > $2 + AND view_set_name = $3 + AND view_instance_id = $4 + AND file_metadata IS NOT NULL + ;", + ) + .bind(end_insert) + .bind(begin_insert) + .bind(&*view_set_name) + .bind(&*view_instance_id) + .fetch_all(pool) + .await + .with_context(|| "fetching partitions")?; + let mut partitions = vec![]; + for r in rows { + let view_metadata = ViewMetadata { + view_set_name: view_set_name.clone(), + view_instance_id: view_instance_id.clone(), + file_schema_hash: r.try_get("file_schema_hash")?, + }; + let file_metadata_buffer: Vec = r.try_get("file_metadata")?; + let file_metadata = Arc::new(parse_parquet_metadata(&file_metadata_buffer.into())?); + partitions.push(Partition { + view_metadata, + begin_insert_time: r.try_get("begin_insert_time")?, + end_insert_time: r.try_get("end_insert_time")?, + min_event_time: r.try_get("min_event_time")?, + max_event_time: r.try_get("max_event_time")?, + updated: r.try_get("updated")?, + file_path: r.try_get("file_path")?, + file_size: r.try_get("file_size")?, + source_data_hash: r.try_get("source_data_hash")?, + file_metadata, + }); + } + Ok(Self { + partitions, + begin_insert, + end_insert, + }) + } + // overlap test for a specific view pub fn filter( &self, diff --git a/rust/analytics/src/lakehouse/partition_source_data.rs b/rust/analytics/src/lakehouse/partition_source_data.rs index b888563..deef795 100644 --- a/rust/analytics/src/lakehouse/partition_source_data.rs +++ b/rust/analytics/src/lakehouse/partition_source_data.rs @@ -1,8 +1,7 @@ use super::partition_cache::PartitionCache; use crate::{ dfext::typed_column::typed_column_by_name, - lakehouse::{blocks_view::BlocksView, query::query_single_view}, - time::TimeRange, + lakehouse::{blocks_view::blocks_view_schema, query::query_partitions}, }; use anyhow::{Context, Result}; use chrono::{DateTime, Utc}; @@ -61,13 +60,9 @@ pub async fn fetch_partition_source_data( end_insert: DateTime, source_stream_tag: &str, ) -> Result { - let desc = format!( - "[{}, {}] {source_stream_tag}", - begin_insert.to_rfc3339(), - end_insert.to_rfc3339() - ); - - let blocks_view = Arc::new(BlocksView::new().with_context(|| "BlocksView::new")?); + let begin_rfc = begin_insert.to_rfc3339(); + let end_rfc = end_insert.to_rfc3339(); + let desc = format!("[{begin_rfc}, {end_rfc}] {source_stream_tag}",); let sql = format!(" SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects, object_offset, payload_size, insert_time as block_insert_time, @@ -75,28 +70,27 @@ pub async fn fetch_partition_source_data( \"processes.start_time\", \"processes.start_ticks\", \"processes.tsc_frequency\", \"processes.exe\", \"processes.username\", \"processes.realname\", \"processes.computer\", \"processes.distro\", \"processes.cpu_brand\", \"processes.parent_process_id\", \"processes.properties\" - FROM blocks + FROM source WHERE array_has( \"streams.tags\", '{source_stream_tag}' ) + AND insert_time >= '{begin_rfc}' + AND insert_time < '{end_rfc}' ORDER BY insert_time, block_id ;"); let mut block_ids_hash: i64 = 0; let mut partition_src_blocks = vec![]; - // I really don't like the dependency on the blocks_view time filter being on insert_time. - // If we were to make the 'time relevance' of a block more generous, we would need to add a condition - // on the insert_time of the blocks in the previous SQL. - - // todo: remove query_single_view, use PartitionCache::filter - // todo: add query_partitions to use PartitionedTableProvider - let blocks_answer = query_single_view( - lake, - existing_partitions, - Some(TimeRange::new(begin_insert, end_insert)), + let block_partitions = existing_partitions + .filter("blocks", "global", begin_insert, end_insert) + .partitions; + let df = query_partitions( + lake.clone(), + Arc::new(blocks_view_schema()), + block_partitions, &sql, - blocks_view, ) .await .with_context(|| "blocks query")?; - for b in blocks_answer.record_batches { + let record_batches = df.collect().await?; + for b in record_batches { let block_id_column: &StringArray = typed_column_by_name(&b, "block_id")?; let stream_id_column: &StringArray = typed_column_by_name(&b, "stream_id")?; let process_id_column: &StringArray = typed_column_by_name(&b, "process_id")?; diff --git a/rust/analytics/src/lakehouse/query.rs b/rust/analytics/src/lakehouse/query.rs index 837b3a1..728f9c1 100644 --- a/rust/analytics/src/lakehouse/query.rs +++ b/rust/analytics/src/lakehouse/query.rs @@ -1,7 +1,8 @@ use super::{ answer::Answer, list_partitions_table_function::ListPartitionsTableFunction, materialize_partitions_table_function::MaterializePartitionsTableFunction, - partition_cache::QueryPartitionProvider, property_get_function::PropertyGet, + partition::Partition, partition_cache::QueryPartitionProvider, + partitioned_table_provider::PartitionedTableProvider, property_get_function::PropertyGet, retire_partitions_table_function::RetirePartitionsTableFunction, view::View, view_factory::ViewFactory, }; @@ -14,9 +15,11 @@ use crate::{ }; use anyhow::{Context, Result}; use datafusion::{ - arrow::array::RecordBatch, + arrow::{array::RecordBatch, datatypes::SchemaRef}, execution::{context::SessionContext, object_store::ObjectStoreUrl}, logical_expr::ScalarUDF, + prelude::*, + sql::TableReference, }; use micromegas_ingestion::data_lake_connection::DataLakeConnection; use micromegas_tracing::prelude::*; @@ -41,6 +44,27 @@ async fn register_table( view.register_table(ctx, table).await } +/// query_partitions returns a dataframe, leaving the option of streaming the results +pub async fn query_partitions( + lake: Arc, + schema: SchemaRef, + partitions: Vec, + sql: &str, +) -> Result { + let object_store = lake.blob_storage.inner(); + let table = PartitionedTableProvider::new(schema, object_store.clone(), Arc::new(partitions)); + let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap(); + let ctx = SessionContext::new(); + ctx.register_object_store(object_store_url.as_ref(), object_store.clone()); + ctx.register_table( + TableReference::Bare { + table: "source".into(), + }, + Arc::new(table), + )?; + Ok(ctx.sql(sql).await?) +} + pub async fn query_single_view( lake: Arc, part_provider: Arc, diff --git a/rust/analytics/src/lakehouse/sql_partition_spec.rs b/rust/analytics/src/lakehouse/sql_partition_spec.rs index cadb700..9876306 100644 --- a/rust/analytics/src/lakehouse/sql_partition_spec.rs +++ b/rust/analytics/src/lakehouse/sql_partition_spec.rs @@ -56,6 +56,10 @@ impl SqlPartitionSpec { #[async_trait] impl PartitionSpec for SqlPartitionSpec { + fn is_empty(&self) -> bool { + self.record_count < 1 + } + fn get_source_data_hash(&self) -> Vec { self.record_count.to_le_bytes().to_vec() } diff --git a/rust/analytics/src/lakehouse/view.rs b/rust/analytics/src/lakehouse/view.rs index 53f6e86..34014f5 100644 --- a/rust/analytics/src/lakehouse/view.rs +++ b/rust/analytics/src/lakehouse/view.rs @@ -9,6 +9,7 @@ use std::sync::Arc; #[async_trait] pub trait PartitionSpec: Send + Sync { + fn is_empty(&self) -> bool; fn get_source_data_hash(&self) -> Vec; async fn write(&self, lake: Arc, logger: Arc) -> Result<()>; } diff --git a/rust/analytics/src/lakehouse/write_partition.rs b/rust/analytics/src/lakehouse/write_partition.rs index b01cad2..4af1ce6 100644 --- a/rust/analytics/src/lakehouse/write_partition.rs +++ b/rust/analytics/src/lakehouse/write_partition.rs @@ -91,7 +91,7 @@ pub async fn retire_partitions( // where a bigger one existed // its gets tricky in the jit case where a partition can have only one block and begin_insert == end_insert - //todo: use PartitionCache here, add filter_contained + //todo: use DELETE+RETURNING let old_partitions = sqlx::query( "SELECT file_path, file_size FROM lakehouse_partitions diff --git a/rust/analytics/tests/sql_view_test.rs b/rust/analytics/tests/sql_view_test.rs index a0ea207..c659e49 100644 --- a/rust/analytics/tests/sql_view_test.rs +++ b/rust/analytics/tests/sql_view_test.rs @@ -88,12 +88,17 @@ pub async fn materialize_range( partition_time_delta: TimeDelta, logger: Arc, ) -> Result<()> { + let blocks_view = Arc::new(BlocksView::new()?); let mut partitions = Arc::new( - // todo: query only the blocks partitions for this call - PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) - .await?, + PartitionCache::fetch_overlapping_insert_range_for_view( + &lake.db_pool, + blocks_view.get_view_set_name(), + blocks_view.get_view_instance_id(), + begin_range, + end_range, + ) + .await?, ); - let blocks_view = Arc::new(BlocksView::new()?); materialize_partition_range( partitions.clone(), lake.clone(), diff --git a/rust/public/src/servers/maintenance.rs b/rust/public/src/servers/maintenance.rs index ed6785c..ef572cb 100644 --- a/rust/public/src/servers/maintenance.rs +++ b/rust/public/src/servers/maintenance.rs @@ -86,9 +86,14 @@ pub async fn materialize_all_views( let end_range = now.duration_trunc(partition_time_delta)?; let begin_range = end_range - (partition_time_delta * nb_partitions); let mut partitions = Arc::new( - // todo: query only the blocks partitions for this call - PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range) - .await?, + PartitionCache::fetch_overlapping_insert_range_for_view( + &lake.db_pool, + blocks_view.get_view_set_name(), + blocks_view.get_view_instance_id(), + begin_range, + end_range, + ) + .await?, ); let null_response_writer = Arc::new(ResponseWriter::new(None)); materialize_partition_range(