diff --git a/dlt/common/storages/load_package.py b/dlt/common/storages/load_package.py index 0e1c0ddf9c..1cd73daf2a 100644 --- a/dlt/common/storages/load_package.py +++ b/dlt/common/storages/load_package.py @@ -739,7 +739,7 @@ def is_package_partially_loaded(package_info: LoadPackageInfo) -> bool: @staticmethod def _job_elapsed_time_seconds(file_path: str, now_ts: float = None) -> float: - return (now_ts or pendulum.now().timestamp()) - os.path.getmtime(file_path) + return (now_ts or precise_time()) - os.path.getmtime(file_path) @staticmethod def filter_jobs_for_table( diff --git a/dlt/destinations/impl/dummy/dummy.py b/dlt/destinations/impl/dummy/dummy.py index ce83caefd9..5baf5589ff 100644 --- a/dlt/destinations/impl/dummy/dummy.py +++ b/dlt/destinations/impl/dummy/dummy.py @@ -34,6 +34,7 @@ WithStagingDataset, LoadJob, ) +from dlt.common.time import precise_time from dlt.destinations.sql_jobs import SqlMergeFollowupJob from dlt.destinations.exceptions import ( @@ -47,7 +48,7 @@ class LoadDummyBaseJob(RunnableLoadJob): def __init__(self, file_name: str, config: DummyClientConfiguration) -> None: super().__init__(file_name) self.config = copy(config) - self.start_time: float = pendulum.now().timestamp() + self.start_time = precise_time() if self.config.fail_terminally_in_init: raise DestinationTerminalException(self._exception) @@ -63,7 +64,7 @@ def run(self) -> None: raise Exception("Dummy job status raised exception") # timeout condition (terminal) - n = pendulum.now().timestamp() + n = precise_time() if n - self.start_time > self.config.timeout: # this will make the the job go to a failed state raise DestinationTerminalException("failed due to timeout") diff --git a/tests/common/storages/test_load_package.py b/tests/common/storages/test_load_package.py index 8c4d5a439b..678f8a214a 100644 --- a/tests/common/storages/test_load_package.py +++ b/tests/common/storages/test_load_package.py @@ -225,17 +225,20 @@ def test_job_elapsed_time_seconds(load_storage: LoadStorage) -> None: load_storage.normalized_packages.get_job_file_path(load_id, "started_jobs", fn) ) elapsed = PackageStorage._job_elapsed_time_seconds(fp) - sleep(0.3) + sleep(0.5) # do not touch file elapsed_2 = PackageStorage._job_elapsed_time_seconds(fp) - assert elapsed_2 - elapsed >= 0.3 + # Python 3.13 + Windows often shows elapsed values less than 0.5 + # it may be related to https://github.com/python/cpython/issues/65501 + # most probably + assert elapsed_2 - elapsed >= 0.47 # rename the file fp = load_storage.normalized_packages.retry_job(load_id, fn) # retry_job increases retry number in file name so the line below does not work # fp = storage.storage._make_path(storage._get_job_file_path(load_id, "new_jobs", fn)) elapsed_2 = PackageStorage._job_elapsed_time_seconds(fp) # it should keep its mod original date after rename - assert elapsed_2 - elapsed >= 0.3 + assert elapsed_2 - elapsed >= 0.47 def test_retry_job(load_storage: LoadStorage) -> None: