Skip to content

Commit

Permalink
materialize views according to their update group
Browse files Browse the repository at this point in the history
  • Loading branch information
madesroches-ubi committed Jan 28, 2025
1 parent ecd3bc3 commit 9e34c59
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 82 deletions.
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/blocks_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ impl View for BlocksView {
fn get_max_event_time_column_name(&self) -> Arc<String> {
INSERT_TIME_COLUMN.clone()
}

fn get_update_group(&self) -> Option<i32> {
Some(1000)
}
}

pub fn blocks_view_schema() -> Schema {
Expand Down
8 changes: 8 additions & 0 deletions rust/analytics/src/lakehouse/log_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,12 @@ impl View for LogView {
fn get_max_event_time_column_name(&self) -> Arc<String> {
TIME_COLUMN.clone()
}

fn get_update_group(&self) -> Option<i32> {
if *(self.get_view_instance_id()) == "global" {
Some(2000)
} else {
None
}
}
}
8 changes: 8 additions & 0 deletions rust/analytics/src/lakehouse/metrics_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,12 @@ impl View for MetricsView {
fn get_max_event_time_column_name(&self) -> Arc<String> {
TIME_COLUMN.clone()
}

fn get_update_group(&self) -> Option<i32> {
if *(self.get_view_instance_id()) == "global" {
Some(2000)
} else {
None
}
}
}
6 changes: 0 additions & 6 deletions rust/analytics/src/lakehouse/partition_source_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ pub async fn fetch_partition_source_data(
) -> Result<PartitionSourceDataBlocks> {
let begin_rfc = begin_insert.to_rfc3339();
let end_rfc = end_insert.to_rfc3339();
let desc = format!("[{begin_rfc}, {end_rfc}] {source_stream_tag}",);
let sql = format!("
SELECT block_id, stream_id, process_id, begin_time, begin_ticks, end_time, end_ticks, nb_objects,
object_offset, payload_size, insert_time as block_insert_time,
Expand Down Expand Up @@ -197,11 +196,6 @@ pub async fn fetch_partition_source_data(
}));
}
}

info!(
"{desc} block_ids_hash={block_ids_hash} nb_source_blocks={}",
partition_src_blocks.len()
);
Ok(PartitionSourceDataBlocks {
blocks: partition_src_blocks,
block_ids_hash: block_ids_hash.to_le_bytes().to_vec(),
Expand Down
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/processes_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ impl View for ProcessesView {
fn get_max_event_time_column_name(&self) -> Arc<String> {
INSERT_TIME_COLUMN.clone()
}

fn get_update_group(&self) -> Option<i32> {
Some(1000)
}
}

pub fn processes_view_schema() -> Schema {
Expand Down
8 changes: 8 additions & 0 deletions rust/analytics/src/lakehouse/sql_batch_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct SqlBatchView {
merge_partitions_query: Arc<String>,
schema: Arc<Schema>,
view_factory: Arc<ViewFactory>,
update_group: Option<i32>,
}

impl SqlBatchView {
Expand All @@ -45,6 +46,7 @@ impl SqlBatchView {
/// * `merge_partitions_query` - used to merge multiple partitions into a single one (and user queries which are one multiple partitions by default)
/// * `lake` - data lake
/// * `view_factory` - all views accessible to the `src_query`
/// * `update_group` - tells the daemon which view should be materialized and in what order
pub async fn new(
view_set_name: Arc<String>,
min_event_time_column: Arc<String>,
Expand All @@ -54,6 +56,7 @@ impl SqlBatchView {
merge_partitions_query: Arc<String>,
lake: Arc<DataLakeConnection>,
view_factory: Arc<ViewFactory>,
update_group: Option<i32>,
) -> Result<Self> {
let null_part_provider = Arc::new(NullPartitionProvider {});
let ctx = make_session_context(lake, null_part_provider, None, view_factory.clone())
Expand Down Expand Up @@ -81,6 +84,7 @@ impl SqlBatchView {
merge_partitions_query,
schema,
view_factory,
update_group,
})
}
}
Expand Down Expand Up @@ -209,4 +213,8 @@ impl View for SqlBatchView {
fn get_merge_partitions_query(&self) -> Arc<String> {
self.merge_partitions_query.clone()
}

fn get_update_group(&self) -> Option<i32> {
self.update_group
}
}
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/streams_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ impl View for StreamsView {
fn get_max_event_time_column_name(&self) -> Arc<String> {
INSERT_TIME_COLUMN.clone()
}

fn get_update_group(&self) -> Option<i32> {
Some(1000)
}
}

pub fn streams_view_schema() -> Schema {
Expand Down
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/thread_spans_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,8 @@ impl View for ThreadSpansView {
fn get_max_event_time_column_name(&self) -> Arc<String> {
MAX_TIME_COLUMN.clone()
}

fn get_update_group(&self) -> Option<i32> {
None
}
}
4 changes: 4 additions & 0 deletions rust/analytics/src/lakehouse/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ pub trait View: std::fmt::Debug + Send + Sync {
Ok(())
}

/// how to merge smaller partitions into a bigger one
fn get_merge_partitions_query(&self) -> Arc<String> {
Arc::new(String::from("SELECT * FROM {source};"))
}

/// tells the daemon which view should be materialized and in what order
fn get_update_group(&self) -> Option<i32>;
}

impl dyn View {
Expand Down
1 change: 1 addition & 0 deletions rust/analytics/tests/sql_view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ async fn make_log_entries_levels_per_process_minute_view(
merge_partitions_query,
lake,
view_factory,
Some(4000),
)
.await
}
Expand Down
115 changes: 39 additions & 76 deletions rust/public/src/servers/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use chrono::{DateTime, DurationRound};
use chrono::{TimeDelta, Utc};
use micromegas_analytics::delete::delete_old_data;
use micromegas_analytics::lakehouse::batch_update::materialize_partition_range;
use micromegas_analytics::lakehouse::blocks_view::BlocksView;
use micromegas_analytics::lakehouse::partition_cache::PartitionCache;
use micromegas_analytics::lakehouse::processes_view::ProcessesView;
use micromegas_analytics::lakehouse::streams_view::StreamsView;
use micromegas_analytics::lakehouse::temp::delete_expired_temporary_files;
use micromegas_analytics::lakehouse::view::View;
use micromegas_analytics::lakehouse::view_factory::ViewFactory;
Expand All @@ -19,11 +16,10 @@ use std::sync::Arc;

type Views = Arc<Vec<Arc<dyn View>>>;
type FutureTask = Pin<Box<dyn Future<Output = Result<()>>>>;
type Callback = Box<dyn Fn(Arc<DataLakeConnection>, Arc<dyn View>, Views) -> FutureTask>;
type Callback = Box<dyn Fn(Arc<DataLakeConnection>, Views) -> FutureTask>;

pub struct TaskDef {
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
pub name: String,
pub period: TimeDelta,
Expand All @@ -35,7 +31,6 @@ pub struct TaskDef {
impl TaskDef {
pub async fn start(
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
name: String,
period: TimeDelta,
Expand All @@ -44,13 +39,12 @@ impl TaskDef {
) -> Result<Self> {
let now = Utc::now();
info!("running scheduled task name={name}");
if let Err(e) = callback(lake.clone(), blocks_view.clone(), views.clone()).await {
if let Err(e) = callback(lake.clone(), views.clone()).await {
error!("{e:?}");
}
let next_run = now.duration_trunc(period)? + period + offset;
Ok(Self {
lake,
blocks_view,
views,
name,
period,
Expand All @@ -63,54 +57,42 @@ impl TaskDef {
pub async fn tick(&mut self) -> Result<()> {
let now = Utc::now();
info!("running scheduled task name={}", &self.name);
(self.callback)(
self.lake.clone(),
self.blocks_view.clone(),
self.views.clone(),
)
.await
.with_context(|| "TaskDef::tick")?;
(self.callback)(self.lake.clone(), self.views.clone())
.await
.with_context(|| "TaskDef::tick")?;
self.next_run = now.duration_trunc(self.period)? + self.period + self.offset;
Ok(())
}
}

pub async fn materialize_all_views(
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
partition_time_delta: TimeDelta,
nb_partitions: i32,
) -> Result<()> {
let now = Utc::now();
let end_range = now.duration_trunc(partition_time_delta)?;
let begin_range = end_range - (partition_time_delta * nb_partitions);
let mut last_group = views.first().unwrap().get_update_group();
let mut partitions = Arc::new(
PartitionCache::fetch_overlapping_insert_range_for_view(
&lake.db_pool,
blocks_view.get_view_set_name(),
blocks_view.get_view_instance_id(),
begin_range,
end_range,
)
.await?,
);
let null_response_writer = Arc::new(ResponseWriter::new(None));
materialize_partition_range(
partitions.clone(),
lake.clone(),
blocks_view,
begin_range,
end_range,
partition_time_delta,
null_response_writer.clone(),
)
.await?;
partitions = Arc::new(
PartitionCache::fetch_overlapping_insert_range(&lake.db_pool, begin_range, end_range)
.await?,
);
let null_response_writer = Arc::new(ResponseWriter::new(None));
for view in &*views {
if view.get_update_group() != last_group {
// views in the same group should have no inter-dependencies
last_group = view.get_update_group();
partitions = Arc::new(
PartitionCache::fetch_overlapping_insert_range(
&lake.db_pool,
begin_range,
end_range,
)
.await?,
);
}
materialize_partition_range(
partitions.clone(),
lake.clone(),
Expand All @@ -125,87 +107,68 @@ pub async fn materialize_all_views(
Ok(())
}

pub async fn every_day(
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
) -> Result<()> {
materialize_all_views(lake, blocks_view, views, TimeDelta::days(1), 3).await
pub async fn every_day(lake: Arc<DataLakeConnection>, views: Views) -> Result<()> {
materialize_all_views(lake, views, TimeDelta::days(1), 3).await
}

pub async fn every_hour(
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
) -> Result<()> {
pub async fn every_hour(lake: Arc<DataLakeConnection>, views: Views) -> Result<()> {
delete_old_data(&lake, 90).await?;
delete_expired_temporary_files(lake.clone()).await?;
materialize_all_views(lake, blocks_view, views, TimeDelta::hours(1), 3).await
materialize_all_views(lake, views, TimeDelta::hours(1), 3).await
}

pub async fn every_minute(
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
) -> Result<()> {
materialize_all_views(lake, blocks_view, views, TimeDelta::minutes(1), 3).await
pub async fn every_minute(lake: Arc<DataLakeConnection>, views: Views) -> Result<()> {
materialize_all_views(lake, views, TimeDelta::minutes(1), 3).await
}

pub async fn every_second(
lake: Arc<DataLakeConnection>,
blocks_view: Arc<dyn View>,
views: Views,
) -> Result<()> {
materialize_all_views(lake, blocks_view, views, TimeDelta::seconds(1), 5).await
pub async fn every_second(lake: Arc<DataLakeConnection>, views: Views) -> Result<()> {
materialize_all_views(lake, views, TimeDelta::seconds(1), 5).await
}

pub async fn daemon(lake: Arc<DataLakeConnection>, view_factory: Arc<ViewFactory>) -> Result<()> {
let blocks_view = Arc::new(BlocksView::new()?);
let views: Arc<Vec<Arc<dyn View>>> = Arc::new(vec![
Arc::new(ProcessesView::new()?),
Arc::new(StreamsView::new()?),
view_factory.make_view("log_entries", "global")?,
view_factory.make_view("measures", "global")?,
]);
let mut views_to_update: Vec<Arc<dyn View>> = view_factory
.get_global_views()
.iter()
.filter(|v| v.get_update_group().is_some())
.cloned()
.collect();
views_to_update.sort_by_key(|v| v.get_update_group().unwrap_or(i32::MAX));
let views = Arc::new(views_to_update);
let mut tasks = vec![
TaskDef::start(
lake.clone(),
blocks_view.clone(),
views.clone(),
String::from("every_day"),
TimeDelta::days(1),
TimeDelta::minutes(5),
Box::new(|lake, blocks_view, views| Box::pin(every_day(lake, blocks_view, views))),
Box::new(|lake, views| Box::pin(every_day(lake, views))),
)
.await?,
TaskDef::start(
lake.clone(),
blocks_view.clone(),
views.clone(),
String::from("every_hour"),
TimeDelta::hours(1),
TimeDelta::minutes(2),
Box::new(|lake, blocks_view, views| Box::pin(every_hour(lake, blocks_view, views))),
Box::new(|lake, views| Box::pin(every_hour(lake, views))),
)
.await?,
TaskDef::start(
lake.clone(),
blocks_view.clone(),
views.clone(),
String::from("every minute"),
TimeDelta::minutes(1),
TimeDelta::seconds(2),
Box::new(|lake, blocks_view, views| Box::pin(every_minute(lake, blocks_view, views))),
Box::new(|lake, views| Box::pin(every_minute(lake, views))),
)
.await?,
TaskDef::start(
lake.clone(),
blocks_view.clone(),
views.clone(),
String::from("every second"),
TimeDelta::seconds(1),
TimeDelta::milliseconds(100),
Box::new(|lake, blocks_view, views| Box::pin(every_second(lake, blocks_view, views))),
Box::new(|lake, views| Box::pin(every_second(lake, views))),
)
.await?,
];
Expand Down

0 comments on commit 9e34c59

Please sign in to comment.