Skip to content

Commit

Permalink
pythonGH-127807: pathlib ABCs: move private copying methods to dedica…
Browse files Browse the repository at this point in the history
…ted class

Move 9 private `PathBase` attributes and methods into a new `CopyWorker`
class. Change `PathBase.copy` from a method to a `CopyWorker` instance.

The methods remain private in the `CopyWorker` class. In future we might
make some/all of them public so that user subclasses of `PathBase` can
customize the copying process (in particular reading/writing of metadata,)
but we'd need to make `PathBase` public first.
  • Loading branch information
barneygale committed Dec 11, 2024
1 parent 12b4f1a commit c4ef762
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 247 deletions.
234 changes: 133 additions & 101 deletions Lib/pathlib/_abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,138 @@ def concat_path(path, text):
return path.with_segments(str(path) + text)


class CopyWorker:
"""
Class that implements copying between path objects. An instance of this
class is available from the PathBase.copy property; it's made callable so
that PathBase.copy() can be treated as a method.
The target path's CopyWorker drives the process from its _create() method.
Data is exchanged by calling methods on the source and target paths, and
metadata is exchanged by calling source.copy._read_metadata() and
target.copy._write_metadata().
In user subclasses of PathBase, users may wish override PathBase.copy to
provide their own customized version of CopyWorker. For example, in a
virtual filesystem where directories are implied to exist by their
contents, but usually not directly recorded, it might be reasonable to
re-implement _create_dir() and skip calling mkdir().
"""
__slots__ = ('_path',)

def __init__(self, path):
self._path = path

def __call__(self, target, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not isinstance(target, PathBase):
target = self._path.with_segments(target)

# Delegate to the target path's CopyWorker object.
return target.copy._create(self._path, follow_symlinks, dirs_exist_ok, preserve_metadata)

_readable_metakeys = frozenset()

def _read_metadata(self, metakeys, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise NotImplementedError

_writable_metakeys = frozenset()

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise NotImplementedError

def _create(self, source, follow_symlinks, dirs_exist_ok, preserve_metadata):
self._ensure_distinct_path(source)
if preserve_metadata:
metakeys = self._writable_metakeys & source.copy._readable_metakeys
else:
metakeys = None
if not follow_symlinks and source.is_symlink():
self._create_symlink(source, metakeys)
elif source.is_dir():
self._create_dir(source, metakeys, follow_symlinks, dirs_exist_ok)
else:
self._create_file(source, metakeys)
return self._path

def _create_dir(self, source, metakeys, follow_symlinks, dirs_exist_ok):
"""Copy the given directory to our path."""
children = list(source.iterdir())
self._path.mkdir(exist_ok=dirs_exist_ok)
for src in children:
dst = self._path.joinpath(src.name)
if not follow_symlinks and src.is_symlink():
dst.copy._create_symlink(src, metakeys)
elif src.is_dir():
dst.copy._create_dir(src, metakeys, follow_symlinks, dirs_exist_ok)
else:
dst.copy._create_file(src, metakeys)
if metakeys:
metadata = source.copy._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_file(self, source, metakeys):
"""Copy the given file to our path."""
self._ensure_different_file(source)
with source.open('rb') as source_f:
try:
with self._path.open('wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not self._path.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {self._path}') from e
raise
if metakeys:
metadata = source.copy._read_metadata(metakeys)
if metadata:
self._write_metadata(metadata)

def _create_symlink(self, source, metakeys):
"""Copy the given symbolic link to our path."""
self._path.symlink_to(source.readlink())
if metakeys:
metadata = source.copy._read_metadata(metakeys, follow_symlinks=False)
if metadata:
self._write_metadata(metadata, follow_symlinks=False)

def _ensure_different_file(self, source):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, source):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if source == self._path:
err = OSError(EINVAL, "Source and target are the same path")
elif source in self._path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(source)
err.filename2 = str(self._path)
raise err


class PurePathBase:
"""Base class for pure path objects.
Expand Down Expand Up @@ -417,31 +549,6 @@ def is_symlink(self):
except (OSError, ValueError):
return False

def _ensure_different_file(self, other_path):
"""
Raise OSError(EINVAL) if both paths refer to the same file.
"""
pass

def _ensure_distinct_path(self, other_path):
"""
Raise OSError(EINVAL) if the other path is within this path.
"""
# Note: there is no straightforward, foolproof algorithm to determine
# if one directory is within another (a particularly perverse example
# would be a single network share mounted in one location via NFS, and
# in another location via CIFS), so we simply checks whether the
# other path is lexically equal to, or within, this path.
if self == other_path:
err = OSError(EINVAL, "Source and target are the same path")
elif self in other_path.parents:
err = OSError(EINVAL, "Source path is a parent of target path")
else:
return
err.filename = str(self)
err.filename2 = str(other_path)
raise err

def open(self, mode='r', buffering=-1, encoding=None,
errors=None, newline=None):
"""
Expand Down Expand Up @@ -590,13 +697,6 @@ def symlink_to(self, target, target_is_directory=False):
"""
raise UnsupportedOperation(self._unsupported_msg('symlink_to()'))

def _symlink_to_target_of(self, link):
"""
Make this path a symlink with the same target as the given link. This
is used by copy().
"""
self.symlink_to(link.readlink())

def hardlink_to(self, target):
"""
Make this path a hard link pointing to the same file as *target*.
Expand All @@ -617,75 +717,7 @@ def mkdir(self, mode=0o777, parents=False, exist_ok=False):
"""
raise UnsupportedOperation(self._unsupported_msg('mkdir()'))

# Metadata keys supported by this path type.
_readable_metadata = _writable_metadata = frozenset()

def _read_metadata(self, keys=None, *, follow_symlinks=True):
"""
Returns path metadata as a dict with string keys.
"""
raise UnsupportedOperation(self._unsupported_msg('_read_metadata()'))

def _write_metadata(self, metadata, *, follow_symlinks=True):
"""
Sets path metadata from the given dict with string keys.
"""
raise UnsupportedOperation(self._unsupported_msg('_write_metadata()'))

def _copy_metadata(self, target, *, follow_symlinks=True):
"""
Copies metadata (permissions, timestamps, etc) from this path to target.
"""
# Metadata types supported by both source and target.
keys = self._readable_metadata & target._writable_metadata
if keys:
metadata = self._read_metadata(keys, follow_symlinks=follow_symlinks)
target._write_metadata(metadata, follow_symlinks=follow_symlinks)

def _copy_file(self, target):
"""
Copy the contents of this file to the given target.
"""
self._ensure_different_file(target)
with self.open('rb') as source_f:
try:
with target.open('wb') as target_f:
copyfileobj(source_f, target_f)
except IsADirectoryError as e:
if not target.exists():
# Raise a less confusing exception.
raise FileNotFoundError(
f'Directory does not exist: {target}') from e
else:
raise

def copy(self, target, *, follow_symlinks=True, dirs_exist_ok=False,
preserve_metadata=False):
"""
Recursively copy this file or directory tree to the given destination.
"""
if not isinstance(target, PathBase):
target = self.with_segments(target)
self._ensure_distinct_path(target)
stack = [(self, target)]
while stack:
src, dst = stack.pop()
if not follow_symlinks and src.is_symlink():
dst._symlink_to_target_of(src)
if preserve_metadata:
src._copy_metadata(dst, follow_symlinks=False)
elif src.is_dir():
children = src.iterdir()
dst.mkdir(exist_ok=dirs_exist_ok)
stack.extend((child, dst.joinpath(child.name))
for child in children)
if preserve_metadata:
src._copy_metadata(dst)
else:
src._copy_file(dst)
if preserve_metadata:
src._copy_metadata(dst)
return target
copy = property(CopyWorker, doc=CopyWorker.__call__.__doc__)

def copy_into(self, target_dir, *, follow_symlinks=True,
dirs_exist_ok=False, preserve_metadata=False):
Expand Down
Loading

0 comments on commit c4ef762

Please sign in to comment.