diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index 83374d1657..94fef6ae1b 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -644,7 +644,7 @@ impl DeltaTable { .object_store() .head(&commit_uri_from_version(version)) .await?; - let ts = meta.last_modified.timestamp(); + let ts = meta.last_modified.timestamp_millis(); // also cache timestamp for version self.version_timestamp.insert(version, ts); @@ -875,14 +875,13 @@ impl DeltaTable { let mut min_version = 0; let mut max_version = self.get_latest_version().await?; let mut version = min_version; - let target_ts = datetime.timestamp(); + let target_ts = datetime.timestamp_millis(); // binary search while min_version <= max_version { let pivot = (max_version + min_version) / 2; version = pivot; let pts = self.get_version_timestamp(pivot).await?; - match pts.cmp(&target_ts) { Ordering::Equal => { break; diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index 80c2083261..2c1c06cbb6 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -11,6 +11,8 @@ use rand::Rng; use std::error::Error; use std::fs; use std::sync::Arc; +use std::thread; +use std::time::Duration; use tempdir::TempDir; #[derive(Debug)] @@ -42,19 +44,21 @@ async fn setup_test() -> Result> { .await?; let batch = get_record_batch(); - + thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Append) .await .unwrap(); + thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Overwrite) .await .unwrap(); + thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Append) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 0fe9c25bb7..6075f64fd2 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -452,13 +452,59 @@ def file_uris( file_uris.__doc__ = "" + def load_as_version(self, version: Union[int, str, datetime]) -> None: + """ + Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a + string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. + + Args: + version: the identifier of the version of the DeltaTable to load + + Example: + **Use a version number** + ``` + dt = DeltaTable("test_table") + dt.load_as_version(1) + ``` + + **Use a datetime object** + ``` + dt.load_as_version(datetime(2023,1,1)) + ``` + + **Use a datetime in string format** + ``` + dt.load_as_version("2018-01-26T18:30:09Z") + dt.load_as_version("2018-12-19T16:39:57-08:00") + dt.load_as_version("2018-01-26T18:30:09.453+00:00") + ``` + """ + if isinstance(version, int): + self._table.load_version(version) + elif isinstance(version, datetime): + self._table.load_with_datetime(version.isoformat()) + elif isinstance(version, str): + self._table.load_with_datetime(version) + else: + raise TypeError( + "Invalid datatype provided for version, only int, str or datetime are accepted." + ) + def load_version(self, version: int) -> None: """ Load a DeltaTable with a specified version. + !!! warning "Deprecated" + Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`. + Args: version: the identifier of the version of the DeltaTable to load """ + warnings.warn( + "Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.", + category=DeprecationWarning, + stacklevel=2, + ) self._table.load_version(version) def load_with_datetime(self, datetime_string: str) -> None: @@ -466,6 +512,9 @@ def load_with_datetime(self, datetime_string: str) -> None: Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument. The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string. + !!! warning "Deprecated" + Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`. + Args: datetime_string: the identifier of the datetime point of the DeltaTable to load @@ -476,6 +525,11 @@ def load_with_datetime(self, datetime_string: str) -> None: "2018-01-26T18:30:09.453+00:00" ``` """ + warnings.warn( + "Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.", + category=DeprecationWarning, + stacklevel=2, + ) self._table.load_with_datetime(datetime_string) @property diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index a49374e710..74c7a1b339 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -63,7 +63,15 @@ def test_read_simple_table_using_options_to_dict(): assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]} -def test_load_with_datetime(): +@pytest.mark.parametrize( + ["date_value", "expected_version"], + [ + ("2020-05-01T00:47:31-07:00", 0), + ("2020-05-02T22:47:31-07:00", 1), + ("2020-05-25T22:47:31-07:00", 4), + ], +) +def test_load_as_version_datetime(date_value: str, expected_version): log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log" log_mtime_pair = [ ("00000000000000000000.json", 1588398451.0), @@ -78,15 +86,14 @@ def test_load_with_datetime(): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) - dt.load_with_datetime("2020-05-01T00:47:31-07:00") - assert dt.version() == 0 - dt.load_with_datetime("2020-05-02T22:47:31-07:00") - assert dt.version() == 1 - dt.load_with_datetime("2020-05-25T22:47:31-07:00") - assert dt.version() == 4 + dt.load_as_version(date_value) + assert dt.version() == expected_version + dt = DeltaTable(table_path) + dt.load_as_version(datetime.fromisoformat(date_value)) + assert dt.version() == expected_version -def test_load_with_datetime_bad_format(): +def test_load_as_version_datetime_bad_format(): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) @@ -96,7 +103,7 @@ def test_load_with_datetime_bad_format(): "2020-05-01T00:47:31+08", ]: with pytest.raises(Exception, match="Failed to parse datetime string:"): - dt.load_with_datetime(bad_format) + dt.load_as_version(bad_format) def test_read_simple_table_update_incremental():