diff --git a/src/dvc_data/index/push.py b/src/dvc_data/index/push.py index 63bd6534..5e6e555b 100644 --- a/src/dvc_data/index/push.py +++ b/src/dvc_data/index/push.py @@ -1,6 +1,6 @@ import logging from functools import partial -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Optional, Set from dvc_objects.fs.callbacks import DEFAULT_CALLBACK @@ -18,6 +18,7 @@ from dvc_data.hashfile.meta import Meta + from .index import DataIndexKey logger = logging.getLogger(__name__) @@ -31,7 +32,10 @@ def _meta_checksum(fs: "FileSystem", meta: "Meta") -> Any: return getattr(meta, fs.PARAM_CHECKSUM) -def _onerror(src_path, dest_path, _exc): +def _onerror(cache, data, failed_keys, src_path, dest_path, exc): + if not isinstance(exc, FileNotFoundError) or cache.fs.exists(src_path): + failed_keys.add(data.fs.path.relparts(dest_path, data.path)) + logger.debug( "failed to create '%s' from '%s'", src_path, @@ -45,7 +49,7 @@ def push( callback: "Callback" = DEFAULT_CALLBACK, jobs: Optional[int] = None, ): - fetched, failed = 0, 0 + pushed, failed = 0, 0 for fs_index in idxs: data = fs_index.storage_map[()].data cache = fs_index.storage_map[()].cache @@ -75,7 +79,7 @@ def push( validate_status=_log_missing, callback=cb, ) - fetched += len(result.transferred) + pushed += len(result.transferred) failed += len(result.failed) else: old = build(data.path, data.fs) @@ -86,6 +90,9 @@ def push( meta_cmp_key=partial(_meta_checksum, data.fs), ) data.fs.makedirs(data.fs.path.parent(data.path), exist_ok=True) + + failed_keys: Set["DataIndexKey"] = set() + apply( diff, data.path, @@ -96,8 +103,11 @@ def push( jobs=jobs, callback=cb, links=["reflink", "copy"], - onerror=_onerror, + onerror=partial(_onerror, cache, data, failed_keys), ) - fetched += len(diff.changes.get("added", [])) - return fetched, failed + added_keys = {ch.key for ch in diff.changes.get("added", [])} + pushed += len(added_keys - failed_keys) + failed += len(failed_keys) + + return pushed, failed