Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
desmondcheongzx committed Feb 11, 2025
1 parent ead90b3 commit 3d8c37d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 28 deletions.
2 changes: 0 additions & 2 deletions src/common/scan-info/src/scan_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ pub trait ScanTaskLike: Debug + DisplayAs + Send + Sync {
fn dyn_hash(&self, state: &mut dyn Hasher);
#[must_use]
fn materialized_schema(&self) -> SchemaRef;
// Avoid the builder pattern for update_num_rows so that ScanTaskLike can be a trait object.
fn update_num_rows(&mut self, num_rows: usize);
#[must_use]
fn num_rows(&self) -> Option<usize>;
#[must_use]
Expand Down
4 changes: 0 additions & 4 deletions src/common/scan-info/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ impl ScanTaskLike for DummyScanTask {
self.schema.clone()
}

fn update_num_rows(&mut self, num_rows: usize) {
self.num_rows = Some(num_rows);
}

fn num_rows(&self) -> Option<usize> {
self.num_rows
}
Expand Down
47 changes: 29 additions & 18 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ pub struct GlobScanOperator {
hive_partitioning: bool,
partitioning_keys: Vec<PartitionField>,
generated_fields: SchemaRef,
first_metadata: Option<TableMetadata>,
// When creating the glob scan operator, we might collect file metadata for the first file during schema inference.
// Cache this metadata (along with the first filepath) so we can use it to populate the stats for the first scan task.
first_metadata: Option<(String, TableMetadata)>,
}

/// Wrapper struct that implements a sync Iterator for a BoxStream
Expand Down Expand Up @@ -222,9 +224,12 @@ impl GlobScanOperator {
field_id_mapping.clone(),
)
.await?;
let metadata = Some(TableMetadata {
length: metadata.num_rows,
});
let metadata = Some((
first_filepath,
TableMetadata {
length: metadata.num_rows,
},
));
(schema, metadata)
}
FileFormatConfig::Csv(CsvSourceConfig {
Expand Down Expand Up @@ -404,8 +409,15 @@ impl ScanOperator for GlobScanOperator {
.map(|partition_spec| partition_spec.clone_field())
.collect();
let partition_schema = Schema::new(partition_fields)?;
let (first_filepath, first_metadata) =
if let Some((first_filepath, first_metadata)) = &self.first_metadata {
(Some(first_filepath), Some(first_metadata))
} else {
(None, None)
};
let mut populate_stats_first_scan_task = first_filepath.is_some();
// Create one ScanTask per file.
let mut scan_tasks = files
files
.enumerate()
.filter_map(|(idx, f)| {
let scan_task_result = (|| {
Expand All @@ -414,6 +426,11 @@ impl ScanOperator for GlobScanOperator {
size: size_bytes,
..
} = f?;
debug_assert!(
!populate_stats_first_scan_task
|| path == *first_filepath.expect("If we populate stats for the first scan task, then the first filepath should be populated"),
"Currently the parallel globber should process files in the same order as during schema inference. If this assertion fails, then the ordering guarantees have changed and we should update how we populate stats for the first scan task."
);
// Create partition values from hive partitions, if any.
let mut partition_values = if hive_partitioning {
let hive_partitions = parse_hive_partitioning(&path)?;
Expand Down Expand Up @@ -460,7 +477,12 @@ impl ScanOperator for GlobScanOperator {
chunk_spec,
size_bytes,
iceberg_delete_files: None,
metadata: None,
metadata: if populate_stats_first_scan_task {
populate_stats_first_scan_task = false;
first_metadata.cloned()
} else {
None
},
partition_spec,
statistics: None,
parquet_metadata: None,
Expand All @@ -478,17 +500,6 @@ impl ScanOperator for GlobScanOperator {
Err(e) => Some(Err(e)),
}
})
.collect::<DaftResult<Vec<ScanTaskLikeRef>>>()?;
// If we collected file metadata and there is a single scan task, we can use this metadata
// to get the true cardinality of the scan task.
if let Some(first_metadata) = &self.first_metadata
&& scan_tasks.len() == 1
{
let task = Arc::get_mut(&mut scan_tasks[0]).expect(
"We should have exclusive access to the scan task during scan task materialization",
);
task.update_num_rows(first_metadata.length);
}
Ok(scan_tasks)
.collect()
}
}
4 changes: 0 additions & 4 deletions src/daft-scan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,6 @@ impl ScanTaskLike for ScanTask {
self.materialized_schema()
}

fn update_num_rows(&mut self, num_rows: usize) {
self.metadata = Some(TableMetadata { length: num_rows });
}

fn num_rows(&self) -> Option<usize> {
self.num_rows()
}
Expand Down

0 comments on commit 3d8c37d

Please sign in to comment.