diff --git a/python/deltalake/table.py b/python/deltalake/table.py index fdc32886be..89f3d9aa36 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -210,16 +210,17 @@ def metadata(self) -> Metadata: """ return self._metadata - def vacuum(self, retention_hours: int, dry_run: bool = True) -> List[str]: + def vacuum(self, retention_hours: Optional[int] = None, dry_run: bool = True) -> List[str]: """ Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. - :param retention_hours: the retention threshold in hours + :param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise. :param dry_run: when activated, list only the files, delete otherwise :return: the list of files no longer referenced by the Delta Table and are older than the retention threshold. """ - if retention_hours < 0: - raise ValueError("The retention periods should be positive.") + if retention_hours: + if retention_hours < 0: + raise ValueError("The retention periods should be positive.") return self._table.vacuum(dry_run, retention_hours) diff --git a/python/src/lib.rs b/python/src/lib.rs index 84b7f1c915..79d8a8c6c1 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -191,7 +191,7 @@ impl RawDeltaTable { } /// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold. - pub fn vacuum(&mut self, dry_run: bool, retention_hours: u64) -> PyResult> { + pub fn vacuum(&mut self, dry_run: bool, retention_hours: Option) -> PyResult> { rt()? .block_on(self._table.vacuum(retention_hours, dry_run)) .map_err(PyDeltaTableError::from_raw) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index e9e2c70673..146c124d41 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -78,4 +78,5 @@ glibc_version = "0" utime = "0.3" serial_test = "0" pretty_assertions = "0" -tempdir = "0" \ No newline at end of file +tempdir = "0" +maplit = { version = "1" } diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index f5f3a408bb..33a6512725 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -167,6 +167,8 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, Checkpoi let mut stats_conversions: Vec<(SchemaPath, SchemaDataType)> = Vec::new(); collect_stats_conversions(&mut stats_conversions, current_metadata.schema.get_fields()); + let tombstones = state.unexpired_tombstones(); + // protocol let mut jsons = std::iter::once(action::Action::protocol(action::Protocol { min_reader_version: state.min_reader_version(), @@ -191,10 +193,9 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, Checkpoi ) // removes .chain( - state - .tombstones() + tombstones .iter() - .map(|f| action::Action::remove(f.clone())), + .map(|f| action::Action::remove((*f).clone())), ) .map(|a| serde_json::to_value(a).map_err(ArrowError::from)) // adds @@ -213,7 +214,7 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result, Checkpoi let writeable_cursor = InMemoryWriteableCursor::default(); let mut writer = ArrowWriter::try_new(writeable_cursor.clone(), arrow_schema.clone(), None)?; let batch_size = - state.app_transaction_version().len() + state.tombstones().len() + state.files().len() + 2; // 1 (protocol) + 1 (metadata) + state.app_transaction_version().len() + tombstones.len() + state.files().len() + 2; // 1 (protocol) + 1 (metadata) let decoder = Decoder::new(arrow_schema, batch_size, None); while let Some(batch) = decoder.next_batch(&mut jsons)? { writer.write(&batch)?; diff --git a/rust/src/delta.rs b/rust/src/delta.rs index cb6fa9d01f..a03913e78c 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -20,7 +20,7 @@ use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; use std::io::{BufRead, BufReader, Cursor}; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp::Ordering, collections::HashSet}; use uuid::Uuid; @@ -28,10 +28,12 @@ use crate::action::Stats; use super::action; use super::action::{Action, DeltaOperation}; +use super::delta_config; use super::partitions::{DeltaTablePartition, PartitionFilter}; use super::schema::*; use super::storage; use super::storage::{parse_uri, StorageBackend, StorageError, UriError}; +use crate::delta_config::DeltaConfigError; /// Metadata for a checkpoint file #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] @@ -311,6 +313,13 @@ pub enum ApplyLogError { /// Storage error details returned while reading the log content. source: StorageError, }, + /// Error returned when reading delta config failed. + #[error("Failed to read delta config: {}", .source)] + Config { + /// Delta config error returned when reading delta config failed. + #[from] + source: DeltaConfigError, + }, /// Error returned when a line from log record is invalid. #[error("Failed to read line from log record")] Io { @@ -371,14 +380,25 @@ pub struct DeltaTableState { min_reader_version: i32, min_writer_version: i32, current_metadata: Option, + tombstone_retention_millis: DeltaDataTypeLong, } impl DeltaTableState { /// Full list of tombstones (remove actions) representing files removed from table state). - pub fn tombstones(&self) -> &Vec { + pub fn all_tombstones(&self) -> &Vec { self.tombstones.as_ref() } + /// List of unexpired tombstones (remove actions) representing files removed from table state. + /// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week. + pub fn unexpired_tombstones(&self) -> Vec<&action::Remove> { + let retention_timestamp = Utc::now().timestamp_millis() - self.tombstone_retention_millis; + self.tombstones + .iter() + .filter(|t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp) + .collect() + } + /// Full list of add actions representing all parquet files that are part of the current /// delta table state. pub fn files(&self) -> &Vec { @@ -867,9 +887,9 @@ impl DeltaTable { .ok_or(DeltaTableError::NoMetadata) } - /// Returns a vector of tombstones (i.e. `Remove` actions present in the current delta log. - pub fn get_tombstones(&self) -> &Vec { - &self.state.tombstones + /// Returns a vector of active tombstones (i.e. `Remove` actions present in the current delta log). + pub fn get_tombstones(&self) -> Vec<&action::Remove> { + self.state.unexpired_tombstones() } /// Returns the current version of the DeltaTable based on the loaded metadata. @@ -890,21 +910,29 @@ impl DeltaTable { } /// List files no longer referenced by a Delta table and are older than the retention threshold. - fn get_stale_files(&self, retention_hours: u64) -> Result, DeltaTableError> { - if retention_hours < 168 { + fn get_stale_files( + &self, + retention_hours: Option, + ) -> Result, DeltaTableError> { + let retention_millis = retention_hours + .map(|hours| 3600000 * hours as i64) + .unwrap_or(self.state.tombstone_retention_millis); + + if retention_millis < self.state.tombstone_retention_millis { return Err(DeltaTableError::InvalidVacuumRetentionPeriod); } - let before_duration = (SystemTime::now() - Duration::from_secs(3600 * retention_hours)) - .duration_since(UNIX_EPOCH); - let delete_before_timestamp = match before_duration { - Ok(duration) => duration.as_millis() as i64, - Err(_) => return Err(DeltaTableError::InvalidVacuumRetentionPeriod), - }; + + let tombstone_retention_timestamp = Utc::now().timestamp_millis() - retention_millis; Ok(self - .get_tombstones() + .state + .all_tombstones() .iter() - .filter(|tombstone| tombstone.deletion_timestamp.unwrap_or(0) < delete_before_timestamp) + .filter(|tombstone| { + // if the file has a creation time before the `tombstone_retention_timestamp` + // then it's considered as a stale file + tombstone.deletion_timestamp.unwrap_or(0) < tombstone_retention_timestamp + }) .map(|tombstone| tombstone.path.as_str()) .collect::>()) } @@ -928,10 +956,15 @@ impl DeltaTable { } /// Run the Vacuum command on the Delta Table: delete files no longer referenced by a Delta table and are older than the retention threshold. - /// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots and uncommitted files can still be in use by concurrent readers or writers to the table. If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be corrupted when vacuum deletes files that have not yet been committed. + /// We do not recommend that you set a retention interval shorter than 7 days, because old snapshots + /// and uncommitted files can still be in use by concurrent readers or writers to the table. + /// If vacuum cleans up active files, concurrent readers can fail or, worse, tables can be + /// corrupted when vacuum deletes files that have not yet been committed. + /// If `retention_hours` is not set then the `configuration.deletedFileRetentionDuration` of + /// delta table is used or if that's missing too, then the default value of 7 days otherwise. pub async fn vacuum( &mut self, - retention_hours: u64, + retention_hours: Option, dry_run: bool, ) -> Result, DeltaTableError> { let expired_tombstones = self.get_stale_files(retention_hours)?; @@ -1024,9 +1057,7 @@ impl DeltaTable { .await .map_err(|e| DeltaTableError::from(DeltaTransactionError::from(e)))?; - // NOTE: since we have the log entry in memory already, - // we could optimize this further by merging the log entry instead of updating from storage. - self.update_incremental().await?; + self.update().await?; Ok(version) } @@ -1418,7 +1449,7 @@ impl<'a> DeltaTransaction<'a> { ) -> Result { let mut attempt_number: u32 = 0; loop { - self.delta_table.update_incremental().await?; + self.delta_table.update().await?; let version = self.delta_table.version + 1; @@ -1472,10 +1503,7 @@ fn log_entry_from_actions(actions: &[Action]) -> Result Result<(), serde_json::error::Error> { +fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), ApplyLogError> { match action { Action::add(v) => { state.files.push(v); @@ -1489,7 +1517,11 @@ fn process_action( state.min_writer_version = v.min_writer_version; } Action::metaData(v) => { - state.current_metadata = Some(DeltaTableMetaData::try_from(v)?); + let md = DeltaTableMetaData::try_from(v)?; + state.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION + .get_interval_from_metadata(&md)? + .as_millis() as i64; + state.current_metadata = Some(md); } Action::txn(v) => { *state @@ -1565,6 +1597,7 @@ mod tests { min_reader_version: 1, min_writer_version: 2, app_transaction_version, + tombstone_retention_millis: 0, }; let txn_action = Action::txn(action::Txn { diff --git a/rust/src/delta_config.rs b/rust/src/delta_config.rs new file mode 100644 index 0000000000..7a0103949c --- /dev/null +++ b/rust/src/delta_config.rs @@ -0,0 +1,275 @@ +//! Delta Table configuration + +use crate::{DeltaDataTypeInt, DeltaDataTypeLong, DeltaTableMetaData}; +use lazy_static::lazy_static; +use std::time::Duration; + +lazy_static! { + /// How often to checkpoint the delta log. + pub static ref CHECKPOINT_INTERVAL: DeltaConfig = DeltaConfig::new("checkpointInterval", "10"); + + /// The shortest duration we have to keep logically deleted data files around before deleting + /// them physically. + /// Note: this value should be large enough: + /// - It should be larger than the longest possible duration of a job if you decide to run "VACUUM" + /// when there are concurrent readers or writers accessing the table. + ///- If you are running a streaming query reading from the table, you should make sure the query + /// doesn't stop longer than this value. Otherwise, the query may not be able to restart as it + /// still needs to read old files. + pub static ref TOMBSTONE_RETENTION: DeltaConfig = + DeltaConfig::new("deletedFileRetentionDuration", "interval 1 week"); +} + +/// Delta configuration error +#[derive(thiserror::Error, Debug, PartialEq)] +pub enum DeltaConfigError { + /// Error returned when configuration validation failed. + #[error("Validation failed - {0}")] + Validation(String), +} + +/// Delta table's `metadata.configuration` entry. +#[derive(Debug)] +pub struct DeltaConfig { + /// The configuration name + pub key: String, + /// The default value if `key` is not set in `metadata.configuration`. + pub default: String, +} + +impl DeltaConfig { + fn new(key: &str, default: &str) -> Self { + Self { + key: key.to_string(), + default: default.to_string(), + } + } + + /// Returns the value from `metadata.configuration` for `self.key` as DeltaDataTypeInt. + /// If it's missing in metadata then the `self.default` is used. + #[allow(dead_code)] + pub fn get_int_from_metadata( + &self, + metadata: &DeltaTableMetaData, + ) -> Result { + Ok(parse_int(&self.get_raw_from_metadata(metadata))? as i32) + } + + /// Returns the value from `metadata.configuration` for `self.key` as DeltaDataTypeLong. + /// If it's missing in metadata then the `self.default` is used. + #[allow(dead_code)] + pub fn get_long_from_metadata( + &self, + metadata: &DeltaTableMetaData, + ) -> Result { + parse_int(&self.get_raw_from_metadata(metadata)) + } + + /// Returns the value from `metadata.configuration` for `self.key` as Duration type for the interval. + /// The string value of this config has to have the following format: interval . + /// Where is either week, day, hour, second, millisecond, microsecond or nanosecond. + /// If it's missing in metadata then the `self.default` is used. + pub fn get_interval_from_metadata( + &self, + metadata: &DeltaTableMetaData, + ) -> Result { + parse_interval(&self.get_raw_from_metadata(metadata)) + } + + fn get_raw_from_metadata(&self, metadata: &DeltaTableMetaData) -> String { + metadata + .configuration + .get(&self.key) + .and_then(|opt| opt.as_deref()) + .unwrap_or_else(|| self.default.as_str()) + .to_string() + } +} + +const SECONDS_PER_MINUTE: u64 = 60; +const SECONDS_PER_HOUR: u64 = 60 * SECONDS_PER_MINUTE; +const SECONDS_PER_DAY: u64 = 24 * SECONDS_PER_HOUR; +const SECONDS_PER_WEEK: u64 = 7 * SECONDS_PER_DAY; + +fn parse_interval(value: &str) -> Result { + let not_an_interval = + || DeltaConfigError::Validation(format!("'{}' is not an interval", value)); + + if !value.starts_with("interval ") { + return Err(not_an_interval()); + } + let mut it = value.split_whitespace(); + let _ = it.next(); // skip "interval" + let number = parse_int(it.next().ok_or_else(not_an_interval)?)?; + if number < 0 { + return Err(DeltaConfigError::Validation(format!( + "interval '{}' cannot be negative", + value + ))); + } + let number = number as u64; + + let duration = match it.next().ok_or_else(not_an_interval)? { + "nanosecond" => Duration::from_nanos(number), + "microsecond" => Duration::from_micros(number), + "millisecond" => Duration::from_millis(number), + "second" => Duration::from_secs(number), + "minute" => Duration::from_secs(number * SECONDS_PER_MINUTE), + "hour" => Duration::from_secs(number * SECONDS_PER_HOUR), + "day" => Duration::from_secs(number * SECONDS_PER_DAY), + "week" => Duration::from_secs(number * SECONDS_PER_WEEK), + unit => { + return Err(DeltaConfigError::Validation(format!( + "Unknown unit '{}'", + unit + ))); + } + }; + + Ok(duration) +} + +fn parse_int(value: &str) -> Result { + value.parse().map_err(|e| { + DeltaConfigError::Validation(format!("Cannot parse '{}' as integer: {}", value, e)) + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Schema; + use std::collections::HashMap; + + fn dummy_metadata() -> DeltaTableMetaData { + let schema = Schema::new(Vec::new()); + DeltaTableMetaData::new(None, None, None, schema, Vec::new(), HashMap::new()) + } + + #[test] + fn get_interval_from_metadata_test() { + let mut md = dummy_metadata(); + + // default 1 week + assert_eq!( + TOMBSTONE_RETENTION + .get_interval_from_metadata(&md) + .unwrap() + .as_secs(), + 1 * SECONDS_PER_WEEK, + ); + + // change to 2 day + md.configuration.insert( + TOMBSTONE_RETENTION.key.to_string(), + Some("interval 2 day".to_string()), + ); + assert_eq!( + TOMBSTONE_RETENTION + .get_interval_from_metadata(&md) + .unwrap() + .as_secs(), + 2 * SECONDS_PER_DAY, + ); + } + + #[test] + fn get_long_from_metadata_test() { + assert_eq!( + CHECKPOINT_INTERVAL + .get_long_from_metadata(&dummy_metadata()) + .unwrap(), + 10, + ) + } + + #[test] + fn get_int_from_metadata_test() { + assert_eq!( + CHECKPOINT_INTERVAL + .get_int_from_metadata(&dummy_metadata()) + .unwrap(), + 10, + ) + } + + #[test] + fn parse_interval_test() { + assert_eq!( + parse_interval("interval 123 nanosecond").unwrap(), + Duration::from_nanos(123) + ); + + assert_eq!( + parse_interval("interval 123 microsecond").unwrap(), + Duration::from_micros(123) + ); + + assert_eq!( + parse_interval("interval 123 millisecond").unwrap(), + Duration::from_millis(123) + ); + + assert_eq!( + parse_interval("interval 123 second").unwrap(), + Duration::from_secs(123) + ); + + assert_eq!( + parse_interval("interval 123 minute").unwrap(), + Duration::from_secs(123 * 60) + ); + + assert_eq!( + parse_interval("interval 123 hour").unwrap(), + Duration::from_secs(123 * 3600) + ); + + assert_eq!( + parse_interval("interval 123 day").unwrap(), + Duration::from_secs(123 * 86400) + ); + + assert_eq!( + parse_interval("interval 123 week").unwrap(), + Duration::from_secs(123 * 604800) + ); + } + + #[test] + fn parse_interval_invalid_test() { + assert_eq!( + parse_interval("whatever").err().unwrap(), + DeltaConfigError::Validation("'whatever' is not an interval".to_string()) + ); + + assert_eq!( + parse_interval("interval").err().unwrap(), + DeltaConfigError::Validation("'interval' is not an interval".to_string()) + ); + + assert_eq!( + parse_interval("interval 2").err().unwrap(), + DeltaConfigError::Validation("'interval 2' is not an interval".to_string()) + ); + + assert_eq!( + parse_interval("interval 2 years").err().unwrap(), + DeltaConfigError::Validation("Unknown unit 'years'".to_string()) + ); + + assert_eq!( + parse_interval("interval two years").err().unwrap(), + DeltaConfigError::Validation( + "Cannot parse 'two' as integer: invalid digit found in string".to_string() + ) + ); + + assert_eq!( + parse_interval("interval -25 hours").err().unwrap(), + DeltaConfigError::Validation( + "interval 'interval -25 hours' cannot be negative".to_string() + ) + ); + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 3130890aa7..45387288bb 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -63,6 +63,7 @@ pub mod checkpoints; pub mod data_catalog; mod delta; pub mod delta_arrow; +pub mod delta_config; pub mod partitions; mod schema; pub mod storage; diff --git a/rust/tests/azure_test.rs b/rust/tests/azure_test.rs index 0b3ce4abde..207c91ed8d 100644 --- a/rust/tests/azure_test.rs +++ b/rust/tests/azure_test.rs @@ -29,7 +29,7 @@ mod azure { "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", ] ); - let tombstones = table.get_tombstones(); + let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 31); assert_eq!( tombstones[0], diff --git a/rust/tests/checkpoint_writer_test.rs b/rust/tests/checkpoint_writer_test.rs index b044a10ee4..0354627931 100644 --- a/rust/tests/checkpoint_writer_test.rs +++ b/rust/tests/checkpoint_writer_test.rs @@ -1,8 +1,11 @@ -extern crate deltalake; - -use deltalake::checkpoints; +use chrono::Utc; +use deltalake::action::*; +use deltalake::*; +use maplit::hashmap; +use std::collections::HashMap; use std::fs; use std::path::{Path, PathBuf}; +use uuid::Uuid; // NOTE: The below is a useful external command for inspecting the written checkpoint schema visually: // parquet-tools inspect tests/data/checkpoints/_delta_log/00000000000000000005.checkpoint.parquet @@ -18,7 +21,7 @@ async fn write_simple_checkpoint() { cleanup_checkpoint_files(log_path.as_path()); // Load the delta table at version 5 - let mut table = deltalake::open_table_with_version(table_location, 5) + let table = deltalake::open_table_with_version(table_location, 5) .await .unwrap(); @@ -92,3 +95,115 @@ fn cleanup_checkpoint_files(log_path: &Path) { } } } + +mod fs_common; + +#[tokio::test] +async fn test_checkpoints_with_tombstones() { + let main_branch = true; + if main_branch { + test_checkpoints_with_tombstones_main().await + } else { + test_checkpoints_with_tombstones_map_support().await + } +} + +async fn test_checkpoints_with_tombstones_main() {} + +async fn test_checkpoints_with_tombstones_map_support() { + let path = "./tests/data/checkpoints_rw"; + let log_dir = Path::new(path).join("_delta_log"); + fs::create_dir_all(&log_dir).unwrap(); + fs_common::cleanup_dir_except(log_dir, vec![]); + + let schema = Schema::new(vec![SchemaField::new( + "id".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + )]); + let config = hashmap! { + delta_config::TOMBSTONE_RETENTION.key.clone() => Some("interval 1 minute".to_string()) + }; + let mut table = fs_common::create_test_table(path, schema, config).await; + + let a1 = add(3 * 60 * 1000); // 3 mins ago, + let a2 = add(2 * 60 * 1000); // 2 mins ago, + + assert_eq!(1, commit_add(&mut table, &a1).await); + assert_eq!(2, commit_add(&mut table, &a2).await); + checkpoints::create_checkpoint_from_table(&table) + .await + .unwrap(); + table.update().await.unwrap(); // make table to read the checkpoint + assert_eq!(table.get_files(), vec![a1.path.as_str(), a2.path.as_str()]); + + let (removes1, opt1) = pseudo_optimize(&mut table, 5 * 59 * 1000).await; + assert_eq!(table.get_files(), vec![opt1.path.as_str()]); + assert_eq!( + table.get_tombstones(), + removes1.iter().collect::>() + ); + + checkpoints::create_checkpoint_from_table(&table) + .await + .unwrap(); + table.update().await.unwrap(); // make table to read the checkpoint + assert_eq!(table.get_files(), vec![opt1.path.as_str()]); + assert_eq!(table.get_tombstones(), Vec::<&Remove>::new()); // stale removes are deleted from the state +} + +async fn pseudo_optimize(table: &mut DeltaTable, offset_millis: i64) -> (Vec, Add) { + let removes: Vec = table + .get_files() + .iter() + .map(|p| Remove { + path: p.to_string(), + deletion_timestamp: Some(Utc::now().timestamp_millis() - offset_millis), + data_change: false, + extended_file_metadata: None, + partition_values: None, + size: None, + tags: None, + }) + .collect(); + + let add = Add { + data_change: false, + ..add(offset_millis) + }; + + let actions = removes + .iter() + .cloned() + .map(Action::remove) + .chain(std::iter::once(Action::add(add.clone()))) + .collect(); + + commit_actions(table, actions).await; + (removes, add) +} + +fn add(offset_millis: i64) -> Add { + Add { + path: Uuid::new_v4().to_string(), + size: 100, + partition_values: Default::default(), + partition_values_parsed: None, + modification_time: Utc::now().timestamp_millis() - offset_millis, + data_change: true, + stats: None, + stats_parsed: None, + tags: None, + } +} + +async fn commit_add(table: &mut DeltaTable, add: &Add) -> i64 { + commit_actions(table, vec![Action::add(add.clone())]).await +} + +async fn commit_actions(table: &mut DeltaTable, actions: Vec) -> i64 { + let mut tx = table.create_transaction(None); + tx.add_actions(actions); + tx.commit(None).await.unwrap() +} diff --git a/rust/tests/data/checkpoints_rw/.gitignore b/rust/tests/data/checkpoints_rw/.gitignore new file mode 100644 index 0000000000..3c4fa72b9b --- /dev/null +++ b/rust/tests/data/checkpoints_rw/.gitignore @@ -0,0 +1 @@ +_delta_log/* \ No newline at end of file diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index 9c035de46f..1191c70c0b 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -1,3 +1,6 @@ +use deltalake::action::Protocol; +use deltalake::{storage, DeltaTable, DeltaTableMetaData, Schema}; +use std::collections::HashMap; use std::fs; use std::path::Path; @@ -13,3 +16,19 @@ pub fn cleanup_dir_except>(path: P, ignore_files: Vec) { } } } + +pub async fn create_test_table( + path: &str, + schema: Schema, + config: HashMap>, +) -> DeltaTable { + let backend = storage::get_backend_for_uri(path).unwrap(); + let mut table = DeltaTable::new(path, backend).unwrap(); + let md = DeltaTableMetaData::new(None, None, None, schema, Vec::new(), config); + let protocol = Protocol { + min_reader_version: 1, + min_writer_version: 2, + }; + table.create(md, protocol, None).await.unwrap(); + table +} diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index fd5e058634..c11bd415c6 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -22,7 +22,7 @@ async fn read_delta_2_0_table_without_version() { "part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet", ] ); - let tombstones = table.get_tombstones(); + let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 4); assert_eq!( tombstones[0], @@ -132,7 +132,7 @@ async fn read_delta_8_0_table_without_version() { .collect::>(), vec![0, 0] ); - let tombstones = table.get_tombstones(); + let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 1); assert_eq!( tombstones[0], @@ -297,14 +297,17 @@ async fn vacuum_delta_8_0_table() { let dry_run = true; assert!(matches!( - table.vacuum(retention_hours, dry_run).await.unwrap_err(), + table + .vacuum(Some(retention_hours), dry_run) + .await + .unwrap_err(), deltalake::DeltaTableError::InvalidVacuumRetentionPeriod, )); let retention_hours = 169; assert_eq!( - table.vacuum(retention_hours, dry_run).await.unwrap(), + table.vacuum(Some(retention_hours), dry_run).await.unwrap(), vec![backend.join_paths(&[ "tests", "data", @@ -320,5 +323,8 @@ async fn vacuum_delta_8_0_table() { / 3600; let empty: Vec = Vec::new(); - assert_eq!(table.vacuum(retention_hours, dry_run).await.unwrap(), empty); + assert_eq!( + table.vacuum(Some(retention_hours), dry_run).await.unwrap(), + empty + ); } diff --git a/rust/tests/read_simple_table_test.rs b/rust/tests/read_simple_table_test.rs index 3dd6c6d934..5d1d3a4e91 100644 --- a/rust/tests/read_simple_table_test.rs +++ b/rust/tests/read_simple_table_test.rs @@ -24,7 +24,7 @@ async fn read_simple_table() { "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", ] ); - let tombstones = table.get_tombstones(); + let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 31); assert_eq!( tombstones[0], diff --git a/rust/tests/s3_test.rs b/rust/tests/s3_test.rs index 17d6b28384..efa662137a 100644 --- a/rust/tests/s3_test.rs +++ b/rust/tests/s3_test.rs @@ -50,7 +50,7 @@ mod s3 { "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet", ] ); - let tombstones = table.get_tombstones(); + let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 31); assert_eq!( tombstones[0], @@ -86,7 +86,7 @@ mod s3 { "part-00001-bb70d2ba-c196-4df2-9c85-f34969ad3aa9-c000.snappy.parquet", ] ); - let tombstones = table.get_tombstones(); + let tombstones = table.get_state().all_tombstones(); assert_eq!(tombstones.len(), 29); assert_eq!( tombstones[0],