From 3d8c37d4eb5b0ea7a2a3f6128dacec85898e8226 Mon Sep 17 00:00:00 2001 From: desmondcheongzx Date: Tue, 11 Feb 2025 15:18:52 -0800 Subject: [PATCH] Address comments --- src/common/scan-info/src/scan_task.rs | 2 -- src/common/scan-info/src/test/mod.rs | 4 --- src/daft-scan/src/glob.rs | 47 +++++++++++++++++---------- src/daft-scan/src/lib.rs | 4 --- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/common/scan-info/src/scan_task.rs b/src/common/scan-info/src/scan_task.rs index b0b39eb6df..886fe42891 100644 --- a/src/common/scan-info/src/scan_task.rs +++ b/src/common/scan-info/src/scan_task.rs @@ -21,8 +21,6 @@ pub trait ScanTaskLike: Debug + DisplayAs + Send + Sync { fn dyn_hash(&self, state: &mut dyn Hasher); #[must_use] fn materialized_schema(&self) -> SchemaRef; - // Avoid the builder pattern for update_num_rows so that ScanTaskLike can be a trait object. - fn update_num_rows(&mut self, num_rows: usize); #[must_use] fn num_rows(&self) -> Option; #[must_use] diff --git a/src/common/scan-info/src/test/mod.rs b/src/common/scan-info/src/test/mod.rs index e3cf812134..07ac5a2a33 100644 --- a/src/common/scan-info/src/test/mod.rs +++ b/src/common/scan-info/src/test/mod.rs @@ -52,10 +52,6 @@ impl ScanTaskLike for DummyScanTask { self.schema.clone() } - fn update_num_rows(&mut self, num_rows: usize) { - self.num_rows = Some(num_rows); - } - fn num_rows(&self) -> Option { self.num_rows } diff --git a/src/daft-scan/src/glob.rs b/src/daft-scan/src/glob.rs index 2500bffa9e..d9686e78ed 100644 --- a/src/daft-scan/src/glob.rs +++ b/src/daft-scan/src/glob.rs @@ -33,7 +33,9 @@ pub struct GlobScanOperator { hive_partitioning: bool, partitioning_keys: Vec, generated_fields: SchemaRef, - first_metadata: Option, + // When creating the glob scan operator, we might collect file metadata for the first file during schema inference. + // Cache this metadata (along with the first filepath) so we can use it to populate the stats for the first scan task. + first_metadata: Option<(String, TableMetadata)>, } /// Wrapper struct that implements a sync Iterator for a BoxStream @@ -222,9 +224,12 @@ impl GlobScanOperator { field_id_mapping.clone(), ) .await?; - let metadata = Some(TableMetadata { - length: metadata.num_rows, - }); + let metadata = Some(( + first_filepath, + TableMetadata { + length: metadata.num_rows, + }, + )); (schema, metadata) } FileFormatConfig::Csv(CsvSourceConfig { @@ -404,8 +409,15 @@ impl ScanOperator for GlobScanOperator { .map(|partition_spec| partition_spec.clone_field()) .collect(); let partition_schema = Schema::new(partition_fields)?; + let (first_filepath, first_metadata) = + if let Some((first_filepath, first_metadata)) = &self.first_metadata { + (Some(first_filepath), Some(first_metadata)) + } else { + (None, None) + }; + let mut populate_stats_first_scan_task = first_filepath.is_some(); // Create one ScanTask per file. - let mut scan_tasks = files + files .enumerate() .filter_map(|(idx, f)| { let scan_task_result = (|| { @@ -414,6 +426,11 @@ impl ScanOperator for GlobScanOperator { size: size_bytes, .. } = f?; + debug_assert!( + !populate_stats_first_scan_task + || path == *first_filepath.expect("If we populate stats for the first scan task, then the first filepath should be populated"), + "Currently the parallel globber should process files in the same order as during schema inference. If this assertion fails, then the ordering guarantees have changed and we should update how we populate stats for the first scan task." + ); // Create partition values from hive partitions, if any. let mut partition_values = if hive_partitioning { let hive_partitions = parse_hive_partitioning(&path)?; @@ -460,7 +477,12 @@ impl ScanOperator for GlobScanOperator { chunk_spec, size_bytes, iceberg_delete_files: None, - metadata: None, + metadata: if populate_stats_first_scan_task { + populate_stats_first_scan_task = false; + first_metadata.cloned() + } else { + None + }, partition_spec, statistics: None, parquet_metadata: None, @@ -478,17 +500,6 @@ impl ScanOperator for GlobScanOperator { Err(e) => Some(Err(e)), } }) - .collect::>>()?; - // If we collected file metadata and there is a single scan task, we can use this metadata - // to get the true cardinality of the scan task. - if let Some(first_metadata) = &self.first_metadata - && scan_tasks.len() == 1 - { - let task = Arc::get_mut(&mut scan_tasks[0]).expect( - "We should have exclusive access to the scan task during scan task materialization", - ); - task.update_num_rows(first_metadata.length); - } - Ok(scan_tasks) + .collect() } } diff --git a/src/daft-scan/src/lib.rs b/src/daft-scan/src/lib.rs index b636a41c5f..6c901fd0fd 100644 --- a/src/daft-scan/src/lib.rs +++ b/src/daft-scan/src/lib.rs @@ -459,10 +459,6 @@ impl ScanTaskLike for ScanTask { self.materialized_schema() } - fn update_num_rows(&mut self, num_rows: usize) { - self.metadata = Some(TableMetadata { length: num_rows }); - } - fn num_rows(&self) -> Option { self.num_rows() }