Skip to content

Commit

Permalink
query_partitions (#287)
Browse files Browse the repository at this point in the history
* factor out query_partitions from merge

* use query_partitions in fetch_partition_source_data, streamline empty partition

* PartitionCache::fetch_overlapping_insert_range_for_view
  • Loading branch information
madesroches-ubi authored Jan 24, 2025
1 parent 806ce01 commit d79bb75
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 51 deletions.
3 changes: 3 additions & 0 deletions rust/analytics/src/lakehouse/batch_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ async fn materialize_partition(
)
.await
.with_context(|| "make_batch_partition_spec")?;
if partition_spec.is_empty() {
return Ok(());
}
let view_instance_id = view.get_view_instance_id();
let strategy = verify_overlapping_partitions(
&existing_partitions,
Expand Down
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/block_partition_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ pub struct BlockPartitionSpec {

#[async_trait]
impl PartitionSpec for BlockPartitionSpec {
fn is_empty(&self) -> bool {
self.source_data.blocks.is_empty()
}

fn get_source_data_hash(&self) -> Vec<u8> {
self.source_data.block_ids_hash.clone()
}
Expand Down
1 change: 0 additions & 1 deletion rust/analytics/src/lakehouse/blocks_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ impl View for BlocksView {
anyhow::bail!("not supported");
}

// fetch_partition_source_data relies on this filter being on insert_time
fn make_time_filter(&self, begin: DateTime<Utc>, end: DateTime<Utc>) -> Result<Vec<Expr>> {
let utc: Arc<str> = Arc::from("+00:00");
Ok(vec![Expr::Between(Between::new(
Expand Down
28 changes: 10 additions & 18 deletions rust/analytics/src/lakehouse/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use super::{
partition::Partition,
partition_cache::PartitionCache,
partition_source_data::hash_to_object_count,
partitioned_table_provider::PartitionedTableProvider,
query::query_partitions,
view::View,
write_partition::{write_partition_from_rows, PartitionRowSet},
};
use crate::{dfext::min_max_time_df::min_max_time_dataframe, response_writer::Logger};
use anyhow::Result;
use chrono::{DateTime, Utc};
use datafusion::{execution::object_store::ObjectStoreUrl, prelude::*, sql::TableReference};
use datafusion::prelude::*;
use futures::stream::StreamExt;
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use std::sync::Arc;
Expand Down Expand Up @@ -78,25 +78,16 @@ pub async fn create_merged_partition(
filtered_partitions.len()
))
.await?;
let object_store = lake.blob_storage.inner();
let table = PartitionedTableProvider::new(
view.get_file_schema(),
object_store.clone(),
Arc::new(filtered_partitions),
);
let ctx = SessionContext::new();
let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
ctx.register_object_store(object_store_url.as_ref(), object_store.clone());
ctx.register_table(
TableReference::Bare {
table: "source".into(),
},
Arc::new(table),
)?;
let merge_query = view
.get_merge_partitions_query()
.replace("{source}", "source");
let merged_df = ctx.sql(&merge_query).await?;
let merged_df = query_partitions(
lake.clone(),
view.get_file_schema(),
filtered_partitions,
&merge_query,
)
.await?;
let (tx, rx) = tokio::sync::mpsc::channel(1);
let join_handle = tokio::spawn(write_partition_from_rows(
lake.clone(),
Expand All @@ -109,6 +100,7 @@ pub async fn create_merged_partition(
logger.clone(),
));
let mut stream = merged_df.execute_stream().await?;
let ctx = SessionContext::new();
while let Some(rb_res) = stream.next().await {
let rb = rb_res?;
let (mintime, maxtime) = min_max_time_dataframe(
Expand Down
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/metadata_partition_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ pub async fn fetch_metadata_partition_spec(

#[async_trait]
impl PartitionSpec for MetadataPartitionSpec {
fn is_empty(&self) -> bool {
self.record_count < 1
}

fn get_source_data_hash(&self) -> Vec<u8> {
self.record_count.to_le_bytes().to_vec()
}
Expand Down
62 changes: 62 additions & 0 deletions rust/analytics/src/lakehouse/partition_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,68 @@ impl PartitionCache {
})
}

pub async fn fetch_overlapping_insert_range_for_view(
pool: &sqlx::PgPool,
view_set_name: Arc<String>,
view_instance_id: Arc<String>,
begin_insert: DateTime<Utc>,
end_insert: DateTime<Utc>,
) -> Result<Self> {
let rows = sqlx::query(
"SELECT begin_insert_time,
end_insert_time,
min_event_time,
max_event_time,
updated,
file_path,
file_size,
file_schema_hash,
source_data_hash,
file_metadata
FROM lakehouse_partitions
WHERE begin_insert_time < $1
AND end_insert_time > $2
AND view_set_name = $3
AND view_instance_id = $4
AND file_metadata IS NOT NULL
;",
)
.bind(end_insert)
.bind(begin_insert)
.bind(&*view_set_name)
.bind(&*view_instance_id)
.fetch_all(pool)
.await
.with_context(|| "fetching partitions")?;
let mut partitions = vec![];
for r in rows {
let view_metadata = ViewMetadata {
view_set_name: view_set_name.clone(),
view_instance_id: view_instance_id.clone(),
file_schema_hash: r.try_get("file_schema_hash")?,
};
let file_metadata_buffer: Vec<u8> = r.try_get("file_metadata")?;
let file_metadata = Arc::new(parse_parquet_metadata(&file_metadata_buffer.into())?);
partitions.push(Partition {
view_metadata,
begin_insert_time: r.try_get("begin_insert_time")?,
end_insert_time: r.try_get("end_insert_time")?,
min_event_time: r.try_get("min_event_time")?,
max_event_time: r.try_get("max_event_time")?,
updated: r.try_get("updated")?,
file_path: r.try_get("file_path")?,
file_size: r.try_get("file_size")?,
source_data_hash: r.try_get("source_data_hash")?,
file_metadata,
});
}
Ok(Self {
partitions,
begin_insert,
end_insert,
})
}

// overlap test for a specific view
pub fn filter(
&self,
Expand Down
38 changes: 16 additions & 22 deletions rust/analytics/src/lakehouse/partition_source_data.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use super::partition_cache::PartitionCache;
use crate::{
dfext::typed_column::typed_column_by_name,
lakehouse::{blocks_view::BlocksView, query::query_single_view},
time::TimeRange,
lakehouse::{blocks_view::blocks_view_schema, query::query_partitions},
};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -61,42 +60,37 @@ pub async fn fetch_partition_source_data(
end_insert: DateTime<Utc>,
source_stream_tag: &str,
) -> Result<PartitionSourceDataBlocks> {
let desc = format!(
"[{}, {}] {source_stream_tag}",
begin_insert.to_rfc3339(),
end_insert.to_rfc3339()
);

let blocks_view = Arc::new(BlocksView::new().with_context(|| "BlocksView::new")?);
let begin_rfc = begin_insert.to_rfc3339();
let end_rfc = end_insert.to_rfc3339();
let desc = format!("[{begin_rfc}, {end_rfc}] {source_stream_tag}",);
let sql = format!("
SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects,
object_offset, payload_size, insert_time as block_insert_time,
\"streams.dependencies_metadata\", \"streams.objects_metadata\", \"streams.tags\", \"streams.properties\",
\"processes.start_time\", \"processes.start_ticks\", \"processes.tsc_frequency\", \"processes.exe\",
\"processes.username\", \"processes.realname\", \"processes.computer\", \"processes.distro\", \"processes.cpu_brand\",
\"processes.parent_process_id\", \"processes.properties\"
FROM blocks
FROM source
WHERE array_has( \"streams.tags\", '{source_stream_tag}' )
AND insert_time >= '{begin_rfc}'
AND insert_time < '{end_rfc}'
ORDER BY insert_time, block_id
;");
let mut block_ids_hash: i64 = 0;
let mut partition_src_blocks = vec![];
// I really don't like the dependency on the blocks_view time filter being on insert_time.
// If we were to make the 'time relevance' of a block more generous, we would need to add a condition
// on the insert_time of the blocks in the previous SQL.

// todo: remove query_single_view, use PartitionCache::filter
// todo: add query_partitions to use PartitionedTableProvider
let blocks_answer = query_single_view(
lake,
existing_partitions,
Some(TimeRange::new(begin_insert, end_insert)),
let block_partitions = existing_partitions
.filter("blocks", "global", begin_insert, end_insert)
.partitions;
let df = query_partitions(
lake.clone(),
Arc::new(blocks_view_schema()),
block_partitions,
&sql,
blocks_view,
)
.await
.with_context(|| "blocks query")?;
for b in blocks_answer.record_batches {
let record_batches = df.collect().await?;
for b in record_batches {
let block_id_column: &StringArray = typed_column_by_name(&b, "block_id")?;
let stream_id_column: &StringArray = typed_column_by_name(&b, "stream_id")?;
let process_id_column: &StringArray = typed_column_by_name(&b, "process_id")?;
Expand Down
28 changes: 26 additions & 2 deletions rust/analytics/src/lakehouse/query.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::{
answer::Answer, list_partitions_table_function::ListPartitionsTableFunction,
materialize_partitions_table_function::MaterializePartitionsTableFunction,
partition_cache::QueryPartitionProvider, property_get_function::PropertyGet,
partition::Partition, partition_cache::QueryPartitionProvider,
partitioned_table_provider::PartitionedTableProvider, property_get_function::PropertyGet,
retire_partitions_table_function::RetirePartitionsTableFunction, view::View,
view_factory::ViewFactory,
};
Expand All @@ -14,9 +15,11 @@ use crate::{
};
use anyhow::{Context, Result};
use datafusion::{
arrow::array::RecordBatch,
arrow::{array::RecordBatch, datatypes::SchemaRef},
execution::{context::SessionContext, object_store::ObjectStoreUrl},
logical_expr::ScalarUDF,
prelude::*,
sql::TableReference,
};
use micromegas_ingestion::data_lake_connection::DataLakeConnection;
use micromegas_tracing::prelude::*;
Expand All @@ -41,6 +44,27 @@ async fn register_table(
view.register_table(ctx, table).await
}

/// query_partitions returns a dataframe, leaving the option of streaming the results
pub async fn query_partitions(
lake: Arc<DataLakeConnection>,
schema: SchemaRef,
partitions: Vec<Partition>,
sql: &str,
) -> Result<DataFrame> {
let object_store = lake.blob_storage.inner();
let table = PartitionedTableProvider::new(schema, object_store.clone(), Arc::new(partitions));
let object_store_url = ObjectStoreUrl::parse("obj://lakehouse/").unwrap();
let ctx = SessionContext::new();
ctx.register_object_store(object_store_url.as_ref(), object_store.clone());
ctx.register_table(
TableReference::Bare {
table: "source".into(),
},
Arc::new(table),
)?;
Ok(ctx.sql(sql).await?)
}

pub async fn query_single_view(
lake: Arc<DataLakeConnection>,
part_provider: Arc<dyn QueryPartitionProvider>,
Expand Down
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/sql_partition_spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ impl SqlPartitionSpec {

#[async_trait]
impl PartitionSpec for SqlPartitionSpec {
fn is_empty(&self) -> bool {
self.record_count < 1
}

fn get_source_data_hash(&self) -> Vec<u8> {
self.record_count.to_le_bytes().to_vec()
}
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/src/lakehouse/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::sync::Arc;

#[async_trait]
pub trait PartitionSpec: Send + Sync {
fn is_empty(&self) -> bool;
fn get_source_data_hash(&self) -> Vec<u8>;
async fn write(&self, lake: Arc<DataLakeConnection>, logger: Arc<dyn Logger>) -> Result<()>;
}
Expand Down
2 changes: 1 addition & 1 deletion rust/analytics/src/lakehouse/write_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub async fn retire_partitions(
// where a bigger one existed
// its gets tricky in the jit case where a partition can have only one block and begin_insert == end_insert

//todo: use PartitionCache here, add filter_contained
//todo: use DELETE+RETURNING
let old_partitions = sqlx::query(
"SELECT file_path, file_size
FROM lakehouse_partitions
Expand Down
13 changes: 9 additions & 4 deletions rust/analytics/tests/sql_view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,17 @@ pub async fn materialize_range(
partition_time_delta: TimeDelta,
logger: Arc<dyn Logger>,
) -> Result<()> {
let blocks_view = Arc::new(BlocksView::new()?);
let mut partitions = Arc::new(
// todo: query only the blocks partitions for this call
PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range)
.await?,
PartitionCache::fetch_overlapping_insert_range_for_view(
&lake.db_pool,
blocks_view.get_view_set_name(),
blocks_view.get_view_instance_id(),
begin_range,
end_range,
)
.await?,
);
let blocks_view = Arc::new(BlocksView::new()?);
materialize_partition_range(
partitions.clone(),
lake.clone(),
Expand Down
11 changes: 8 additions & 3 deletions rust/public/src/servers/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,14 @@ pub async fn materialize_all_views(
let end_range = now.duration_trunc(partition_time_delta)?;
let begin_range = end_range - (partition_time_delta * nb_partitions);
let mut partitions = Arc::new(
// todo: query only the blocks partitions for this call
PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range)
.await?,
PartitionCache::fetch_overlapping_insert_range_for_view(
&lake.db_pool,
blocks_view.get_view_set_name(),
blocks_view.get_view_instance_id(),
begin_range,
end_range,
)
.await?,
);
let null_response_writer = Arc::new(ResponseWriter::new(None));
materialize_partition_range(
Expand Down

0 comments on commit d79bb75

Please sign in to comment.