Skip to content

Commit da4208b

Browse files
authoredJan 8, 2025
Fix: ensure that compression type is also taken into consideration during ListingTableConfig infer_options (#14021)
* chore: add test to verify that schema is inferred as expected * chore: add comment to method as suggested * chore: restructure to avoid need to clone * chore: fix flaw in rewrite
1 parent c7e8858 commit da4208b

File tree

1 file changed

+50
-9
lines changed
  • datafusion/core/src/datasource/listing

1 file changed

+50
-9
lines changed
 

‎datafusion/core/src/datasource/listing/table.rs

+50-9
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,26 @@ impl ListingTableConfig {
114114
}
115115
}
116116

117-
fn infer_file_extension(path: &str) -> Result<String> {
117+
///Returns a tupe of (file_extension, optional compression_extension)
118+
///
119+
/// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
120+
/// For example a path ending with blah.test.csv returns `("csv", None)`
121+
fn infer_file_extension_and_compression_type(
122+
path: &str,
123+
) -> Result<(String, Option<String>)> {
118124
let mut exts = path.rsplit('.');
119125

120-
let mut splitted = exts.next().unwrap_or("");
126+
let splitted = exts.next().unwrap_or("");
121127

122128
let file_compression_type = FileCompressionType::from_str(splitted)
123129
.unwrap_or(FileCompressionType::UNCOMPRESSED);
124130

125131
if file_compression_type.is_compressed() {
126-
splitted = exts.next().unwrap_or("");
132+
let splitted2 = exts.next().unwrap_or("");
133+
Ok((splitted2.to_string(), Some(splitted.to_string())))
134+
} else {
135+
Ok((splitted.to_string(), None))
127136
}
128-
129-
Ok(splitted.to_string())
130137
}
131138

132139
/// Infer `ListingOptions` based on `table_path` suffix.
@@ -147,18 +154,33 @@ impl ListingTableConfig {
147154
.await
148155
.ok_or_else(|| DataFusionError::Internal("No files for table".into()))??;
149156

150-
let file_extension =
151-
ListingTableConfig::infer_file_extension(file.location.as_ref())?;
157+
let (file_extension, maybe_compression_type) =
158+
ListingTableConfig::infer_file_extension_and_compression_type(
159+
file.location.as_ref(),
160+
)?;
161+
162+
let mut format_options = HashMap::new();
163+
if let Some(ref compression_type) = maybe_compression_type {
164+
format_options
165+
.insert("format.compression".to_string(), compression_type.clone());
166+
}
152167

153168
let file_format = state
154169
.get_file_format_factory(&file_extension)
155170
.ok_or(config_datafusion_err!(
156171
"No file_format found with extension {file_extension}"
157172
))?
158-
.create(state, &HashMap::new())?;
173+
.create(state, &format_options)?;
174+
175+
let listing_file_extension =
176+
if let Some(compression_type) = maybe_compression_type {
177+
format!("{}.{}", &file_extension, &compression_type)
178+
} else {
179+
file_extension
180+
};
159181

160182
let listing_options = ListingOptions::new(file_format)
161-
.with_file_extension(file_extension)
183+
.with_file_extension(listing_file_extension)
162184
.with_target_partitions(state.config().target_partitions());
163185

164186
Ok(Self {
@@ -2194,4 +2216,23 @@ mod tests {
21942216

21952217
Ok(())
21962218
}
2219+
2220+
#[tokio::test]
2221+
async fn test_infer_options_compressed_csv() -> Result<()> {
2222+
let testdata = crate::test_util::arrow_test_data();
2223+
let filename = format!("{}/csv/aggregate_test_100.csv.gz", testdata);
2224+
let table_path = ListingTableUrl::parse(filename).unwrap();
2225+
2226+
let ctx = SessionContext::new();
2227+
2228+
let config = ListingTableConfig::new(table_path);
2229+
let config_with_opts = config.infer_options(&ctx.state()).await?;
2230+
let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
2231+
2232+
let schema = config_with_schema.file_schema.unwrap();
2233+
2234+
assert_eq!(schema.fields.len(), 13);
2235+
2236+
Ok(())
2237+
}
21972238
}

0 commit comments

Comments
 (0)