From 4131e4ba0ef813cbc200fa0d7a6da373f7824e71 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 02:14:31 +0100 Subject: [PATCH 1/8] combine loads --- python/deltalake/table.py | 53 +++++++++++++++++++++++++++++++++ python/tests/test_table_read.py | 18 +++++++---- 2 files changed, 65 insertions(+), 6 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index a2d6189fb6..98d0456579 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -453,13 +453,58 @@ def file_uris( file_uris.__doc__ = "" + def load_to(self, version: int | str | datetime) -> None: + """ + Load/time travel a DeltaTable with a specified version number, or a timestamp version of the table. If a + string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. + + Args: + version: the identifier of the version of the DeltaTable to load + + Example: + **Use a version number** + ``` + 1 + ``` + + **Use a datetime object** + ``` + datetime(2023,1,1) + ``` + + **Use a datetime in string format** + ``` + "2018-01-26T18:30:09Z" + "2018-12-19T16:39:57-08:00" + "2018-01-26T18:30:09.453+00:00" + ``` + """ + if isinstance(version, int): + self._table.load_version(version) + elif isinstance(version, datetime): + self._table.load_with_datetime(version.isoformat()) + elif isinstance(version, str): + self._table.load_with_datetime(version) + else: + raise TypeError( + "Invalid datatype provided for version, only int, str or datetime are accepted." + ) + def load_version(self, version: int) -> None: """ Load a DeltaTable with a specified version. + !!! warning "Deprecated" + Load_version and load_with_datetime have been combined into `DeltaTable.load_to`. + Args: version: the identifier of the version of the DeltaTable to load """ + warnings.warn( + "Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_to() instead.", + category=DeprecationWarning, + stacklevel=2, + ) self._table.load_version(version) def load_with_datetime(self, datetime_string: str) -> None: @@ -467,6 +512,9 @@ def load_with_datetime(self, datetime_string: str) -> None: Time travel Delta table to the latest version that's created at or before provided `datetime_string` argument. The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string. + !!! warning "Deprecated" + Load_version and load_with_datetime have been combined into `DeltaTable.load_to`. + Args: datetime_string: the identifier of the datetime point of the DeltaTable to load @@ -477,6 +525,11 @@ def load_with_datetime(self, datetime_string: str) -> None: "2018-01-26T18:30:09.453+00:00" ``` """ + warnings.warn( + "Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_to() instead.", + category=DeprecationWarning, + stacklevel=2, + ) self._table.load_with_datetime(datetime_string) @property diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index a49374e710..b366e98b50 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -63,7 +63,7 @@ def test_read_simple_table_using_options_to_dict(): assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]} -def test_load_with_datetime(): +def test_load_to_datetime(): log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log" log_mtime_pair = [ ("00000000000000000000.json", 1588398451.0), @@ -78,15 +78,21 @@ def test_load_with_datetime(): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) - dt.load_with_datetime("2020-05-01T00:47:31-07:00") + dt.load_to("2020-05-01T00:47:31-07:00") assert dt.version() == 0 - dt.load_with_datetime("2020-05-02T22:47:31-07:00") + dt.load_to("2020-05-02T22:47:31-07:00") assert dt.version() == 1 - dt.load_with_datetime("2020-05-25T22:47:31-07:00") + dt.load_to("2020-05-25T22:47:31-07:00") + assert dt.version() == 4 + dt.load_to(datetime.fromisoformat("2020-05-01T00:47:31-07:00")) + assert dt.version() == 0 + dt.load_to(datetime.fromisoformat("2020-05-02T22:47:31-07:00")) + assert dt.version() == 1 + dt.load_to(datetime.fromisoformat("2020-05-25T22:47:31-07:00")) assert dt.version() == 4 -def test_load_with_datetime_bad_format(): +def test_load_to_datetime_bad_format(): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) @@ -96,7 +102,7 @@ def test_load_with_datetime_bad_format(): "2020-05-01T00:47:31+08", ]: with pytest.raises(Exception, match="Failed to parse datetime string:"): - dt.load_with_datetime(bad_format) + dt.load_to(bad_format) def test_read_simple_table_update_incremental(): From 2a004d601c33a5fa8e6adfbac69aaeabdbff1aaf Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 02:18:43 +0100 Subject: [PATCH 2/8] lint --- python/deltalake/table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 98d0456579..73df8ac697 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -453,7 +453,7 @@ def file_uris( file_uris.__doc__ = "" - def load_to(self, version: int | str | datetime) -> None: + def load_to(self, version: Union[int, str, datetime]) -> None: """ Load/time travel a DeltaTable with a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. From 135fc7ab2c3a07cbca5468c612708270db8e0fc4 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:35:39 +0100 Subject: [PATCH 3/8] fix timestamp to millis --- crates/deltalake-core/src/table/mod.rs | 9 ++++----- python/deltalake/table.py | 13 +++++++------ python/tests/test_table_read.py | 27 +++++++++++++------------- 3 files changed, 25 insertions(+), 24 deletions(-) diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index de6a176e91..ee155f0da7 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -601,10 +601,10 @@ impl DeltaTable { .object_store() .head(&commit_uri_from_version(version)) .await?; - let ts = meta.last_modified.timestamp(); + let ts = meta.last_modified.timestamp_millis(); // also cache timestamp for version self.version_timestamp.insert(version, ts); - + Ok(ts) } } @@ -832,14 +832,13 @@ impl DeltaTable { let mut min_version = 0; let mut max_version = self.get_latest_version().await?; let mut version = min_version; - let target_ts = datetime.timestamp(); - + let target_ts = datetime.timestamp_millis(); + // binary search while min_version <= max_version { let pivot = (max_version + min_version) / 2; version = pivot; let pts = self.get_version_timestamp(pivot).await?; - match pts.cmp(&target_ts) { Ordering::Equal => { break; diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 73df8ac697..ec2a5d249a 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -455,7 +455,7 @@ def file_uris( def load_to(self, version: Union[int, str, datetime]) -> None: """ - Load/time travel a DeltaTable with a specified version number, or a timestamp version of the table. If a + Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. Args: @@ -464,19 +464,20 @@ def load_to(self, version: Union[int, str, datetime]) -> None: Example: **Use a version number** ``` - 1 + dt = DeltaTable("test_table") + dt.load_to(1) ``` **Use a datetime object** ``` - datetime(2023,1,1) + dt.load_to(datetime(2023,1,1)) ``` **Use a datetime in string format** ``` - "2018-01-26T18:30:09Z" - "2018-12-19T16:39:57-08:00" - "2018-01-26T18:30:09.453+00:00" + dt.load_to("2018-01-26T18:30:09Z") + dt.load_to("2018-12-19T16:39:57-08:00") + dt.load_to("2018-01-26T18:30:09.453+00:00") ``` """ if isinstance(version, int): diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index b366e98b50..d54b928a45 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -63,7 +63,15 @@ def test_read_simple_table_using_options_to_dict(): assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"value": [1, 2, 3]} -def test_load_to_datetime(): +@pytest.mark.parametrize( + ["date_value", "expected_version"], + [ + ("2020-05-01T00:47:31-07:00", 0), + ("2020-05-02T22:47:31-07:00", 1), + ("2020-05-25T22:47:31-07:00", 4), + ], +) +def test_load_to_datetime(date_value: str, expected_version): log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log" log_mtime_pair = [ ("00000000000000000000.json", 1588398451.0), @@ -78,18 +86,11 @@ def test_load_to_datetime(): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) - dt.load_to("2020-05-01T00:47:31-07:00") - assert dt.version() == 0 - dt.load_to("2020-05-02T22:47:31-07:00") - assert dt.version() == 1 - dt.load_to("2020-05-25T22:47:31-07:00") - assert dt.version() == 4 - dt.load_to(datetime.fromisoformat("2020-05-01T00:47:31-07:00")) - assert dt.version() == 0 - dt.load_to(datetime.fromisoformat("2020-05-02T22:47:31-07:00")) - assert dt.version() == 1 - dt.load_to(datetime.fromisoformat("2020-05-25T22:47:31-07:00")) - assert dt.version() == 4 + dt.load_to(date_value) + assert dt.version() == expected_version + dt = DeltaTable(table_path) + dt.load_to(datetime.fromisoformat(date_value)) + assert dt.version() == expected_version def test_load_to_datetime_bad_format(): From 3c54b9e4e988ede616c4728ca29d60182c7aa21f Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 11:38:00 +0100 Subject: [PATCH 4/8] fmt --- crates/deltalake-core/src/table/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/deltalake-core/src/table/mod.rs b/crates/deltalake-core/src/table/mod.rs index ee155f0da7..2df417839c 100644 --- a/crates/deltalake-core/src/table/mod.rs +++ b/crates/deltalake-core/src/table/mod.rs @@ -604,7 +604,7 @@ impl DeltaTable { let ts = meta.last_modified.timestamp_millis(); // also cache timestamp for version self.version_timestamp.insert(version, ts); - + Ok(ts) } } @@ -833,7 +833,7 @@ impl DeltaTable { let mut max_version = self.get_latest_version().await?; let mut version = min_version; let target_ts = datetime.timestamp_millis(); - + // binary search while min_version <= max_version { let pivot = (max_version + min_version) / 2; From 7cfba9e8d20f74dfc357b55405e957df2b1cd709 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 17:20:22 +0100 Subject: [PATCH 5/8] read version --- crates/deltalake-core/tests/command_restore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index 80c2083261..ac33ed3c0e 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -115,7 +115,7 @@ async fn test_restore_by_datetime() -> Result<(), Box> { let context = setup_test().await?; let mut table = context.table; let history = table.history(Some(10)).await?; - let timestamp = history.get(1).unwrap().timestamp.unwrap(); + let timestamp = history.get(2).unwrap().timestamp.unwrap(); let naive = NaiveDateTime::from_timestamp_millis(timestamp).unwrap(); let datetime: DateTime = Utc.from_utc_datetime(&naive); From 958cccb6039d01d456db8da17a4f0016405656ab Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:21:36 +0100 Subject: [PATCH 6/8] zzzzzz --- crates/deltalake-core/tests/command_restore.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/deltalake-core/tests/command_restore.rs b/crates/deltalake-core/tests/command_restore.rs index ac33ed3c0e..2c1c06cbb6 100644 --- a/crates/deltalake-core/tests/command_restore.rs +++ b/crates/deltalake-core/tests/command_restore.rs @@ -11,6 +11,8 @@ use rand::Rng; use std::error::Error; use std::fs; use std::sync::Arc; +use std::thread; +use std::time::Duration; use tempdir::TempDir; #[derive(Debug)] @@ -42,19 +44,21 @@ async fn setup_test() -> Result> { .await?; let batch = get_record_batch(); - + thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Append) .await .unwrap(); + thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Overwrite) .await .unwrap(); + thread::sleep(Duration::from_secs(1)); let table = DeltaOps(table) .write(vec![batch.clone()]) .with_save_mode(SaveMode::Append) @@ -115,7 +119,7 @@ async fn test_restore_by_datetime() -> Result<(), Box> { let context = setup_test().await?; let mut table = context.table; let history = table.history(Some(10)).await?; - let timestamp = history.get(2).unwrap().timestamp.unwrap(); + let timestamp = history.get(1).unwrap().timestamp.unwrap(); let naive = NaiveDateTime::from_timestamp_millis(timestamp).unwrap(); let datetime: DateTime = Utc.from_utc_datetime(&naive); From f2d90a690404b9b403eb27f7d4066fecf333d5ef Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:38:25 +0100 Subject: [PATCH 7/8] rename --- python/deltalake/table.py | 20 ++++++++++---------- python/tests/test_table_read.py | 10 +++++----- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index ec2a5d249a..f843faeb5c 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -453,7 +453,7 @@ def file_uris( file_uris.__doc__ = "" - def load_to(self, version: Union[int, str, datetime]) -> None: + def load_as_version(self, version: Union[int, str, datetime]) -> None: """ Load/time travel a DeltaTable to a specified version number, or a timestamp version of the table. If a string is passed then the argument should be an RFC 3339 and ISO 8601 date and time string format. @@ -465,19 +465,19 @@ def load_to(self, version: Union[int, str, datetime]) -> None: **Use a version number** ``` dt = DeltaTable("test_table") - dt.load_to(1) + dt.load_as_version(1) ``` **Use a datetime object** ``` - dt.load_to(datetime(2023,1,1)) + dt.load_as_version(datetime(2023,1,1)) ``` **Use a datetime in string format** ``` - dt.load_to("2018-01-26T18:30:09Z") - dt.load_to("2018-12-19T16:39:57-08:00") - dt.load_to("2018-01-26T18:30:09.453+00:00") + dt.load_as_version("2018-01-26T18:30:09Z") + dt.load_as_version("2018-12-19T16:39:57-08:00") + dt.load_as_version("2018-01-26T18:30:09.453+00:00") ``` """ if isinstance(version, int): @@ -496,13 +496,13 @@ def load_version(self, version: int) -> None: Load a DeltaTable with a specified version. !!! warning "Deprecated" - Load_version and load_with_datetime have been combined into `DeltaTable.load_to`. + Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`. Args: version: the identifier of the version of the DeltaTable to load """ warnings.warn( - "Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_to() instead.", + "Call to deprecated method DeltaTable.load_version. Use DeltaTable.load_as_version() instead.", category=DeprecationWarning, stacklevel=2, ) @@ -514,7 +514,7 @@ def load_with_datetime(self, datetime_string: str) -> None: The `datetime_string` argument should be an RFC 3339 and ISO 8601 date and time string. !!! warning "Deprecated" - Load_version and load_with_datetime have been combined into `DeltaTable.load_to`. + Load_version and load_with_datetime have been combined into `DeltaTable.load_as_version`. Args: datetime_string: the identifier of the datetime point of the DeltaTable to load @@ -527,7 +527,7 @@ def load_with_datetime(self, datetime_string: str) -> None: ``` """ warnings.warn( - "Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_to() instead.", + "Call to deprecated method DeltaTable.load_with_datetime. Use DeltaTable.load_as_version() instead.", category=DeprecationWarning, stacklevel=2, ) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index d54b928a45..74c7a1b339 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -71,7 +71,7 @@ def test_read_simple_table_using_options_to_dict(): ("2020-05-25T22:47:31-07:00", 4), ], ) -def test_load_to_datetime(date_value: str, expected_version): +def test_load_as_version_datetime(date_value: str, expected_version): log_dir = "../crates/deltalake-core/tests/data/simple_table/_delta_log" log_mtime_pair = [ ("00000000000000000000.json", 1588398451.0), @@ -86,14 +86,14 @@ def test_load_to_datetime(date_value: str, expected_version): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) - dt.load_to(date_value) + dt.load_as_version(date_value) assert dt.version() == expected_version dt = DeltaTable(table_path) - dt.load_to(datetime.fromisoformat(date_value)) + dt.load_as_version(datetime.fromisoformat(date_value)) assert dt.version() == expected_version -def test_load_to_datetime_bad_format(): +def test_load_as_version_datetime_bad_format(): table_path = "../crates/deltalake-core/tests/data/simple_table" dt = DeltaTable(table_path) @@ -103,7 +103,7 @@ def test_load_to_datetime_bad_format(): "2020-05-01T00:47:31+08", ]: with pytest.raises(Exception, match="Failed to parse datetime string:"): - dt.load_to(bad_format) + dt.load_as_version(bad_format) def test_read_simple_table_update_incremental(): From e07bd8c780aee63529ebccbe1d8ccf87fe7b12e4 Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 14 Dec 2023 18:43:12 +0100 Subject: [PATCH 8/8] Empty-Commit