Skip to content

Commit

Permalink
push: use index push
Browse files Browse the repository at this point in the history
  • Loading branch information
efiop committed Oct 25, 2023
1 parent 4a0d56a commit a7b437b
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 185 deletions.
12 changes: 7 additions & 5 deletions dvc/repo/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ def _collect_indexes( # noqa: PLR0913
types=types,
)

data = idx.data["repo"]
data.onerror = _make_index_onerror(onerror, rev)
idx.data["repo"].onerror = _make_index_onerror(onerror, rev)

indexes[idx.data_tree.hash_info.value] = data
indexes[rev or "workspace"] = idx
except Exception as exc: # pylint: disable=broad-except
if onerror:
onerror(rev, None, exc)
Expand Down Expand Up @@ -138,15 +137,18 @@ def fetch( # noqa: C901, PLR0913
onerror=onerror,
)

cache_key = ("fetch", tokenize(sorted(indexes.keys())))
cache_key = (
"fetch",
tokenize(sorted(idx.data_tree.hash_info.value for idx in indexes.values())),
)

with ui.progress(
desc="Collecting",
unit="entry",
leave=True,
) as pb:
data = collect(
indexes.values(),
[idx.data["repo"] for idx in indexes.values()],
"remote",
cache_index=self.data_index,
cache_key=cache_key,
Expand Down
174 changes: 84 additions & 90 deletions dvc/repo/push.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
from contextlib import suppress
from typing import TYPE_CHECKING, Optional, Sequence

from dvc.config import NoRemoteError
from dvc.exceptions import InvalidArgumentError, UploadError
from dvc.utils import glob_targets
from dvc.exceptions import UploadError

from . import locked

if TYPE_CHECKING:
from dvc.data_cloud import Remote
from dvc.repo import Repo
from dvc.types import TargetType
from dvc_objects.db import ObjectDB

def _update_meta(index):
from dvc.repo.index import build_data_index
from dvc.repo.worktree import _merge_push_meta, worktree_view_by_remotes

stages = set()
for remote_name, idx in worktree_view_by_remotes(index):
remote = index.repo.cloud.get_remote(remote_name)

new = build_data_index(idx, remote.path, remote.fs)

for out in idx.outs:
if not remote.fs.version_aware:
continue

_merge_push_meta(out, new, remote.name)
stages.add(out.stage)

for stage in stages:
stage.dump(with_files=True, update_pipeline=False)


@locked
Expand All @@ -28,92 +38,76 @@ def push( # noqa: C901, PLR0913
run_cache=False,
revs=None,
glob=False,
odb: Optional["ObjectDB"] = None,
include_imports=False,
):
worktree_remote: Optional["Remote"] = None
with suppress(NoRemoteError):
_remote = self.cloud.get_remote(name=remote)
if _remote and (_remote.worktree or _remote.fs.version_aware):
worktree_remote = _remote
from fsspec.utils import tokenize

from dvc.fs.callbacks import Callback
from dvc.utils import glob_targets
from dvc_data.index.fetch import collect
from dvc_data.index.push import push as ipush

pushed = 0
used_run_cache = self.stage_cache.push(remote, odb=odb) if run_cache else []
pushed += len(used_run_cache)
from .fetch import _collect_indexes

failed_count = 0
transferred_count = 0

used_run_cache = self.stage_cache.push(remote) if run_cache else []
transferred_count += len(used_run_cache)

if isinstance(targets, str):
targets = [targets]

expanded_targets = glob_targets(targets, glob=glob)

if worktree_remote is not None:
pushed += _push_worktree(
self,
worktree_remote,
revs=revs,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
targets=expanded_targets,
jobs=jobs,
with_deps=with_deps,
recursive=recursive,
)
else:
used = self.used_objs(
expanded_targets,
all_branches=all_branches,
all_tags=all_tags,
all_commits=all_commits,
with_deps=with_deps,
force=True,
remote=remote,
jobs=jobs,
recursive=recursive,
used_run_cache=used_run_cache,
revs=revs,
indexes = _collect_indexes(
self,
targets=glob_targets(targets, glob=glob),
remote=remote,
all_branches=all_branches,
with_deps=with_deps,
all_tags=all_tags,
recursive=recursive,
all_commits=all_commits,
revs=revs,
)

cache_key = (
"push",
tokenize(sorted(idx.data_tree.hash_info.value for idx in indexes.values())),
)

with Callback.as_tqdm_callback(
desc="Collecting",
unit="entry",
) as cb:
data = collect(
[idx.data["repo"] for idx in indexes.values()],
"remote",
cache_index=self.data_index,
cache_key=cache_key,
callback=cb,
push=True,
)

if odb:
all_ids = set()
for dest_odb, obj_ids in used.items():
if not include_imports and dest_odb and dest_odb.read_only:
continue
all_ids.update(obj_ids)
result = self.cloud.push(all_ids, jobs, remote=remote, odb=odb)
if result.failed:
raise UploadError(len(result.failed))
pushed += len(result.transferred)
else:
for dest_odb, obj_ids in used.items():
if dest_odb and dest_odb.read_only:
continue
result = self.cloud.push(
obj_ids, jobs, remote=remote, odb=odb or dest_odb
)
if result.failed:
raise UploadError(len(result.failed))
pushed += len(result.transferred)
return pushed


def _push_worktree(
repo: "Repo",
remote: "Remote",
revs: Optional[Sequence[str]] = None,
all_branches: bool = False,
all_tags: bool = False,
all_commits: bool = False,
targets: Optional["TargetType"] = None,
jobs: Optional[int] = None,
**kwargs,
) -> int:
from dvc.repo.worktree import push_worktree

if revs or all_branches or all_tags or all_commits:
raise InvalidArgumentError(
"Multiple rev push is unsupported for cloud versioned remotes"
)
with Callback.as_tqdm_callback(
desc="Pushing",
unit="file",
) as cb:
try:
push_transferred, push_failed = ipush(
data,
jobs=jobs,
callback=cb,
) # pylint: disable=assignment-from-no-return
finally:
ws_idx = indexes.get("workspace")
if ws_idx is not None:
_update_meta(self.index) # fixme this should be targets_view

for fs_index in data:
fs_index.close()

transferred_count += push_transferred
failed_count += push_failed
if failed_count:
raise UploadError(failed_count)

return push_worktree(repo, remote, targets=targets, jobs=jobs, **kwargs)
return transferred_count
87 changes: 1 addition & 86 deletions dvc/repo/worktree.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import logging
from functools import partial
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Set, Tuple, Union
from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Tuple, Union

from funcy import first

from dvc.exceptions import DvcException
from dvc.fs.callbacks import Callback
from dvc.stage.exceptions import StageUpdateError

Expand Down Expand Up @@ -104,90 +103,6 @@ def _get_remote(
return repo.cloud.get_remote(name, command)


def push_worktree(
repo: "Repo",
remote: "Remote",
targets: Optional["TargetType"] = None,
jobs: Optional[int] = None,
**kwargs: Any,
) -> int:
from dvc.repo.index import build_data_index
from dvc_data.index.checkout import VersioningNotSupported, apply, compare

pushed = 0
stages: Set["Stage"] = set()

for remote_name, view in worktree_view_by_remotes(
repo.index, push=True, targets=targets, **kwargs
):
remote_obj = _get_remote(repo, remote_name, remote, "push")
new_index = view.data["repo"]
if remote_obj.worktree:
logger.debug("indexing latest worktree for '%s'", remote_obj.path)
old_index = build_data_index(view, remote_obj.path, remote_obj.fs)
logger.debug("Pushing worktree changes to '%s'", remote_obj.path)
else:
old_index = None
logger.debug("Pushing version-aware files to '%s'", remote_obj.path)

if remote_obj.worktree:
diff_kwargs: Dict[str, Any] = {
"meta_only": True,
"meta_cmp_key": partial(_meta_checksum, remote_obj.fs),
}
else:
diff_kwargs = {}

with Callback.as_tqdm_callback(
unit="entry",
desc=f"Comparing indexes for remote {remote_obj.name!r}",
) as cb:
diff = compare(
old_index,
new_index,
callback=cb,
delete=remote_obj.worktree,
**diff_kwargs,
)

total = len(new_index)
with Callback.as_tqdm_callback(
unit="file",
desc=f"Pushing to remote {remote_obj.name!r}",
disable=total == 0,
) as cb:
cb.set_size(total)
try:
apply(
diff,
remote_obj.path,
remote_obj.fs,
callback=cb,
latest_only=remote_obj.worktree,
jobs=jobs,
)
pushed += len(diff.files_create)
except VersioningNotSupported:
logger.exception("")
raise DvcException(
f"remote {remote_obj.name!r} does not support versioning"
) from None

if remote_obj.index is not None:
for key, entry in new_index.iteritems():
remote_obj.index[key] = entry
remote_obj.index.commit()

for out in view.outs:
workspace, _key = out.index_key
_merge_push_meta(out, repo.index.data[workspace], remote_obj.name)
stages.add(out.stage)

for stage in stages:
stage.dump(with_files=True, update_pipeline=False)
return pushed


def _merge_push_meta(
out: "Output",
index: Union["DataIndex", "DataIndexView"],
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies = [
"configobj>=5.0.6",
"distro>=1.3",
"dpath<3,>=2.1.0",
"dvc-data>=2.18.1,<2.19.0",
"dvc-data>=2.18.2,<2.19.0",
"dvc-http>=2.29.0",
"dvc-render>=0.3.1,<1",
"dvc-studio-client>=0.13.0,<1",
Expand Down
4 changes: 2 additions & 2 deletions tests/func/test_data_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def test_hash_recalculation(mocker, dvc, tmp_dir, local_remote):
assert ret == 0
ret = main(["push"])
assert ret == 0
assert test_file_md5.mock.call_count == 1
assert test_file_md5.mock.call_count == 3


def test_missing_cache(tmp_dir, dvc, local_remote, caplog):
Expand Down Expand Up @@ -486,7 +486,7 @@ def test_pull_partial(tmp_dir, dvc, local_remote):
clean(["foo"], dvc)

stats = dvc.pull(os.path.join("foo", "bar"))
assert stats["fetched"] == 3
assert stats["fetched"] == 2
assert (tmp_dir / "foo").read_text() == {"bar": {"baz": "baz"}}


Expand Down
2 changes: 1 addition & 1 deletion tests/func/test_virtual_directory.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def test_partial_checkout_and_update(M, tmp_dir, dvc, remote):

assert dvc.pull("dir/subdir") == M.dict(
added=[join("dir", "")],
fetched=3,
fetched=2,
)
assert (tmp_dir / "dir").read_text() == {"subdir": {"lorem": "lorem"}}

Expand Down

0 comments on commit a7b437b

Please sign in to comment.