Skip to content

Commit

Permalink
index: fetch: check for changed files
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Jan 24, 2024
1 parent c5a89dd commit 118c4cb
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 19 deletions.
48 changes: 44 additions & 4 deletions src/dvc_data/index/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from dvc_objects.fs.callbacks import DEFAULT_CALLBACK, TqdmCallback

from dvc_data.hashfile.db import get_index
from dvc_data.hashfile.meta import Meta
from dvc_data.hashfile.transfer import transfer

from .build import build
from .checkout import apply, compare
from .collect import collect # noqa: F401
from .index import ObjectStorage
from .index import DataIndex, ObjectStorage
from .save import md5, save

if TYPE_CHECKING:
Expand Down Expand Up @@ -45,6 +46,44 @@ def _onerror(data, cache, failed_keys, src_path, dest_path, exc):
)


def _filter_changed(index):

ret = DataIndex()
ret.storage_map = index.storage_map

for _, entry in index.items():
if entry.meta and entry.meta.isdir:
ret.add(entry)
continue

if not entry.meta or entry.meta.version_id:
ret.add(entry)
continue

try:
data_fs, data_path = index.storage_map.get_data(entry)
except ValueError:
continue

try:
info = data_fs.info(data_path)
except FileNotFoundError:
continue

if getattr(data_fs, "immutable", None):
ret.add(entry)
continue

meta = Meta.from_info(info)
old = getattr(entry.meta, data_fs.PARAM_CHECKSUM, None) if entry.meta else None
new = getattr(meta, data_fs.PARAM_CHECKSUM, None)
if old and new:
if old == new:
ret.add(entry)

return ret


def fetch(
idxs,
callback: "Callback" = DEFAULT_CALLBACK,
Expand Down Expand Up @@ -94,7 +133,7 @@ def fetch(
fetched += len(result.transferred)
failed += len(result.failed)
elif isinstance(cache, ObjectStorage):
md5(fs_index, check_meta=False)
updated = md5(fs_index, check_meta=True)

def _on_error(failed, oid, exc):
if isinstance(exc, FileNotFoundError):
Expand All @@ -107,14 +146,15 @@ def _on_error(failed, oid, exc):
)

fetched += save(
fs_index,
updated,
jobs=jobs,
callback=cb,
on_error=partial(_on_error, failed),
)
else:
old = build(cache.path, cache.fs)
diff = compare(old, fs_index)
filtered = _filter_changed(fs_index)
diff = compare(old, filtered)
cache.fs.makedirs(cache.fs.parent(cache.path), exist_ok=True)

failed_keys: Set["DataIndexKey"] = set()
Expand Down
48 changes: 33 additions & 15 deletions src/dvc_data/index/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dvc_data.hashfile.db import HashFileDB
from dvc_data.hashfile.state import StateBase

from .index import BaseDataIndex, DataIndexKey
from .index import BaseDataIndex, DataIndex, DataIndexKey


def md5(
Expand All @@ -23,16 +23,22 @@ def md5(
storage: str = "data",
name: str = DEFAULT_ALGORITHM,
check_meta: bool = True,
) -> None:
from .index import DataIndexEntry
) -> "DataIndex":
from .index import DataIndex, DataIndexEntry

entries = {}
ret = DataIndex()

for key, entry in index.iteritems():
for _, entry in index.iteritems():
if entry.meta and entry.meta.isdir:
ret.add(entry)
continue

hash_info = None
if entry.hash_info and entry.hash_info.name in ("md5", "md5-dos2unix"):
hash_info = entry.hash_info

if hash_info and not check_meta:
ret.add(entry)
continue

try:
Expand All @@ -47,23 +53,35 @@ def md5(
except FileNotFoundError:
continue

meta = Meta.from_info(info, fs.protocol)
if entry.meta != meta:
continue
if getattr(fs, "immutable", False):
ret.add(entry)
else:
meta = Meta.from_info(info, fs.protocol)
old = getattr(entry.meta, fs.PARAM_CHECKSUM, None) if entry.meta else None
new = getattr(meta, fs.PARAM_CHECKSUM, None)
if old and new:
if old == new:
ret.add(entry)
continue

try:
_, hash_info = hash_file(path, fs, name, state=state, info=info)
_, hi = hash_file(path, fs, name, state=state, info=info)
except FileNotFoundError:
continue

entries[key] = DataIndexEntry(
key=entry.key,
meta=entry.meta,
hash_info=hash_info,
if hash_info and hi != hash_info:
continue

ret.add(
DataIndexEntry(
key=entry.key,
meta=entry.meta,
hash_info=hi,
)
)

for key, entry in entries.items():
index[key] = entry
ret.storage_map = index.storage_map
return ret


def build_tree(
Expand Down

0 comments on commit 118c4cb

Please sign in to comment.