diff --git a/README.md b/README.md index 8a84aa4d..3b8c326c 100644 --- a/README.md +++ b/README.md @@ -153,11 +153,20 @@ specfile.add_changelog_entry( #### Sources and patches ```python -print(specfile.sources) -print(specfile.patches) -print(specfile.sources[0].filename) -specfile.sources.append('tests.tar.gz') -specfile.patches[0] = 'downstream.patch' +with specfile.sources() as sources: + # expanded URL of the first source + print(sources[0].expanded_url) + # adding a source + sources.append('tests.tar.gz') + +with specfile.patches() as patches: + # modifying URL of the first patch + patches[0].url = 'downstream.patch' + # removing associated comments of the last patch + patches[-1].comments.clear() + # adding and removing patches + patches.append('another.patch') + del patches[2] # fetching non-local sources (including patches) specfile.download_remote_sources() diff --git a/specfile/sourcelist.py b/specfile/sourcelist.py new file mode 100644 index 00000000..62569e42 --- /dev/null +++ b/specfile/sourcelist.py @@ -0,0 +1,116 @@ +# Copyright Contributors to the Packit project. +# SPDX-License-Identifier: MIT + +import collections +from typing import List, Optional, overload + +from specfile.rpm import Macros +from specfile.sections import Section +from specfile.tags import Comments + + +class Source: + """ + Class that represents a spec file source/patch. + + Attributes: + url: Literal URL of the source/patch as stored in the spec file. + comments: List of comments associated with the source/patch. + """ + + def __init__(self, url: str, comments: Comments) -> None: + self.url = url + self.comments = comments.copy() + + def __repr__(self) -> str: + comments = repr(self.comments) + return f"Source('{self.url}', {comments})" + + @property + def expanded_url(self) -> str: + """URL of the source/patch after expanding macros.""" + return Macros.expand(self.url) + + +class SourceList(collections.UserList): + """ + Class that represents entries in a %sourcelist/%patchlist section. + + Attributes: + data: List of individual sources/patches. + """ + + def __init__( + self, data: Optional[List[Source]] = None, remainder: Optional[List[str]] = None + ) -> None: + """ + Constructs a `SourceList` object. + + Args: + data: List of individual sources/patches. + remainder: Leftover lines in a section that can't be parsed into sources/patches. + + Returns: + Constructed instance of `SourceList` class. + """ + super().__init__() + if data is not None: + self.data = data.copy() + self._remainder = remainder.copy() if remainder is not None else [] + + def __repr__(self) -> str: + data = repr(self.data) + remainder = repr(self._remainder) + return f"SourceList({data}, {remainder})" + + @overload + def __getitem__(self, i: int) -> Source: + pass + + @overload + def __getitem__(self, i: slice) -> "SourceList": + pass + + def __getitem__(self, i): + if isinstance(i, slice): + return SourceList(self.data[i], self._remainder) + else: + return self.data[i] + + def copy(self) -> "SourceList": + return SourceList(self.data, self._remainder) + + @staticmethod + def parse(section: Section) -> "SourceList": + """ + Parses a section into sources/patches. + + Args: + section: %sourcelist/%patchlist section. + + Returns: + Constructed instance of `SourceList` class. + """ + data = [] + buffer: List[str] = [] + for line in section: + if line and not line.lstrip().startswith("#"): + data.append(Source(line, Comments.parse(buffer))) + buffer = [] + else: + buffer.append(line) + return SourceList(data, buffer) + + def get_raw_section_data(self) -> List[str]: + """ + Reconstructs section data from sources/patches. + + Returns: + List of lines forming the reconstructed section data. + """ + result = [] + for source in self.data: + result.extend(source.comments.get_raw_data()) + result.append(source.url) + result.extend(self._remainder) + return result diff --git a/specfile/sources.py b/specfile/sources.py new file mode 100644 index 00000000..d6f6883e --- /dev/null +++ b/specfile/sources.py @@ -0,0 +1,398 @@ +# Copyright Contributors to the Packit project. +# SPDX-License-Identifier: MIT + +import re +from typing import List, Tuple, Union, overload + +from specfile.exceptions import SpecfileException +from specfile.rpm import Macros +from specfile.sourcelist import Source, SourceList +from specfile.tags import Comments, Tag, Tags + + +class GenericSource: + """Class that represents a source.""" + + @property + def index(self) -> int: + """Numeric index of the source.""" + raise NotImplementedError() + + @property + def url(self) -> str: + """Literal URL of the source as stored in the spec file.""" + raise NotImplementedError() + + @url.setter + def url(self, value: str) -> None: + raise NotImplementedError() + + @property + def expanded_url(self) -> str: + """URL of the source after expanding macros.""" + raise NotImplementedError() + + @property + def comments(self) -> Comments: + """List of comments associated with the source.""" + raise NotImplementedError() + + # TODO: filename + + +class TagSource(GenericSource): + """ + Class that represents a source backed by a spec file tag. + + Attributes: + tag: Tag that this source represents. + """ + + def __init__(self, tag: Tag) -> None: + self.tag = tag + + def __repr__(self) -> str: + tag = repr(self.tag) + return f"TagSource({tag})" + + def _get_index(self): + """ + Extracts numeric index from tag name. + + Returns: + Extracted index or None if there isn't one. + """ + tokens = re.split(r"(\d+)", self.tag.name, maxsplit=1) + if len(tokens) > 1: + return tokens[1] + return None + + @property + def index(self) -> int: + """Numeric index of the source.""" + return int(self._get_index()) + + @property + def index_digits(self) -> int: + """Number of digits the index is formed by.""" + return len(self._get_index()) + + @property + def url(self) -> str: + """Literal URL of the source as stored in the spec file.""" + return self.tag.value + + @url.setter + def url(self, value: str) -> None: + self.tag.value = value + + @property + def expanded_url(self) -> str: + """URL of the source after expanding macros.""" + return self.tag.expanded_value + + @property + def comments(self) -> Comments: + """List of comments associated with the source.""" + return self.tag.comments + + +class ListSource(GenericSource): + """ + Class that represents a source backed by a line in a %sourcelist/%patchlist section. + + Attributes: + source: Source object that this source represents. + """ + + def __init__(self, source: Source, index: int) -> None: + self.source = source + self._index = index + + def __repr__(self) -> str: + source = repr(self.source) + return f"ListSource({source}, {self._index})" + + @property + def index(self) -> int: + """Numeric index of the source.""" + return self._index + + @property + def url(self) -> str: + """Literal URL of the source as stored in the spec file.""" + return self.source.url + + @url.setter + def url(self, value: str) -> None: + self.source.url = value + + @property + def expanded_url(self) -> str: + """URL of the source after expanding macros.""" + return self.source.expanded_url + + @property + def comments(self) -> Comments: + """List of comments associated with the source.""" + return self.source.comments + + +class Sources: + """ + Class that represents a list of all sources. + + Attributes: + tags: All spec file tags. + sourcelists: List of all %sourcelist sections. + allow_duplicates: Whether to allow duplicate entries when adding new sources. + """ + + prefix = "Source" + + def __init__( + self, tags: Tags, sourcelists: List[SourceList], allow_duplicates: bool = False + ) -> None: + self.tags = tags + self.sourcelists = sourcelists + self.allow_duplicates = allow_duplicates + + def __repr__(self) -> str: + tags = repr(self.tags) + sourcelists = repr(self.sourcelists) + allow_duplicates = repr(self.allow_duplicates) + return f"{self.__class__.__name__}({tags}, {sourcelists}, {allow_duplicates})" + + def __contains__(self, url: str) -> bool: + items = self._get_items() + if not items: + return False + return url in [s.url for s in list(zip(*items))[0]] + + def __len__(self) -> int: + return len(self._get_items()) + + @overload + def __getitem__(self, i: int) -> GenericSource: + pass + + @overload + def __getitem__(self, i: slice) -> "Sources": + pass + + def __getitem__(self, i): + items = self._get_items() + if isinstance(i, slice): + return list(zip(*items[i]))[0] + else: + return items[i][0] + + def __delitem__(self, i: Union[int, slice]) -> None: + items = self._get_items() + if isinstance(i, slice): + for _, container, index in reversed(items[i]): + del container[index] + else: + _, container, index = items[i] + del container[index] + + def _get_tags(self) -> List[Tuple[GenericSource, Union[Tags, SourceList], int]]: + """ + Gets all tag sources. + + Returns: + List of tuples in the form of (source, container, index), + where source is an instance of `TagSource` representing a tag, + container is the container the tag is part of and index + is its index within that container. + """ + return [ + (TagSource(t), self.tags, i) + for i, t in enumerate(self.tags) + if t.name.capitalize().startswith(self.prefix.capitalize()) + ] + + def _get_items(self) -> List[Tuple[GenericSource, Union[Tags, SourceList], int]]: + """ + Gets all sources. + + Returns: + List of tuples in the form of (source, container, index), + where source is an instance of `TagSource` or `ListSource` + representing a source, container is the container the source + is part of and index is its index within that container. + """ + result = self._get_tags() + last_index = result[-1][0].index if result else -1 + result.extend( + (ListSource(sl[i], last_index + 1 + i), sl, i) + for sl in self.sourcelists + for i in range(len(sl)) + ) + return result + + def _get_tag_format(self, reference: TagSource, index: int) -> Tuple[str, str]: + """ + Determines name and separator of a new source tag based on + a reference tag and the requested index. + + The new name has the same number of digits as the reference + and the length of the separator is adjusted accordingly. + + Args: + reference: Reference tag source. + index: Requested index. + + Returns: + Tuple in the form of (name, separator). + """ + prefix = self.prefix.capitalize() + name = f"{prefix}{index:0{reference.index_digits}}" + diff = len(name) - len(reference.tag.name) + if diff >= 0: + return name, reference.tag._separator + " " * diff + return name, reference.tag._separator[:diff] or ":" + + def _get_initial_tag_setup(self) -> Tuple[int, str, str]: + """ + Determines the initial placement, name and separator of + a new source tag. The placement is expressed as an index + in the list of all tags. + + Returns: + Tuple in the form of (index, name, separator). + """ + prefix = self.prefix.capitalize() + return len(self.tags) if self.tags else 0, f"{prefix}0", ": " + + def _deduplicate_tag_names(self) -> None: + """Eliminates duplicate indexes in source tag names.""" + tags = self._get_tags() + if not tags: + return + tag_sources = sorted(list(zip(*tags))[0], key=lambda ts: ts.index) + for ts0, ts1 in zip(tag_sources, tag_sources[1:]): + if ts1.index <= ts0.index: + ts1.tag.name, ts1.tag._separator = self._get_tag_format( + ts0, ts0.index + 1 + ) + + def append(self, url: str) -> None: + """ + Adds a new source. + + Args: + url: URL of the new source. + """ + self.insert(len(self), url) + + def insert(self, i: int, url: str) -> None: + """ + Inserts a new source at a specified index. + + Args: + i: Requested index. + url: URL of the new source. + """ + if not self.allow_duplicates and url in self: + raise SpecfileException(f"Source '{url}' already exists") + items = self._get_items() + if i > len(items): + i = len(items) + if items: + if i == len(items): + source, container, index = items[-1] + index += 1 + source_index = source.index + 1 + else: + source, container, index = items[i] + source_index = source.index + if isinstance(source, TagSource): + name, separator = self._get_tag_format(source, source_index) + container.insert( + index, Tag(name, url, Macros.expand(url), separator, Comments()) + ) + self._deduplicate_tag_names() + else: + container.insert(index, Source(url, Comments())) + elif self.sourcelists: + self.sourcelists[-1].append(Source(url, Comments())) + else: + index, name, separator = self._get_initial_tag_setup() + self.tags.insert( + index, Tag(name, url, Macros.expand(url), separator, Comments()) + ) + + def remove(self, url: str) -> None: + """ + Removes sources by URL. + + Args: + url: URL of the sources to be removed. + """ + for source, container, index in reversed(self._get_items()): + if source.url == url: + del container[index] + + def clear(self) -> None: + """Removes all sources.""" + for _, container, index in reversed(self._get_items()): + del container[index] + + def count(self, url: str) -> int: + """ + Counts sources by URL. + + Args: + url: URL of the sources to be counted. + + Returns: + Number of sources with the specified URL. + """ + items = self._get_items() + if not items: + return 0 + return len([s for s in list(zip(*items))[0] if s.url == url]) + + def extend(self, urls: List[str]) -> None: + """ + Extends the sources by a list of URLs. + + Args: + urls: List of URLs of the sources to be added. + """ + for url in urls: + self.append(url) + + +class Patches(Sources): + """ + Class that represents a list of all patches. + + Attributes: + tags: All spec file tags. + sourcelists: List of all %patchlist sections. + allow_duplicates: Whether to allow duplicate entries when adding new patches. + """ + + prefix = "Patch" + + def _get_initial_tag_setup(self) -> Tuple[int, str, str]: + """ + Determines the initial placement, name and separator of + a new source tag. The placement is expressed as an index + in the list of all tags. + + Returns: + Tuple in the form of (index, name, separator). + """ + try: + index, source = [ + (i, TagSource(t)) + for i, t in enumerate(self.tags) + if t.name.capitalize().startswith("Source") + ][-1] + except IndexError: + return super()._get_initial_tag_setup() + name, separator = self._get_tag_format(source, 0) + return index + 1, name, separator diff --git a/specfile/specfile.py b/specfile/specfile.py index 2a714b38..db6bc076 100644 --- a/specfile/specfile.py +++ b/specfile/specfile.py @@ -14,6 +14,8 @@ from specfile.exceptions import SpecfileException from specfile.rpm import RPM, Macros from specfile.sections import Sections +from specfile.sourcelist import SourceList +from specfile.sources import Patches, Sources from specfile.tags import Tags @@ -136,6 +138,56 @@ def changelog(self) -> Iterator[Optional[Changelog]]: finally: section.data = changelog.get_raw_section_data() + @contextlib.contextmanager + def sources(self, allow_duplicates: bool = False) -> Iterator[Sources]: + """ + Context manager for accessing sources. + + Args: + allow_duplicates: Whether to allow duplicate entries when adding new sources. + + Yields: + Spec file sources as `Sources` object. + """ + with self.sections() as sections, self.tags() as tags: + sourcelists = [ + (s, SourceList.parse(s)) for s in sections if s.name == "sourcelist" + ] + try: + yield Sources( + tags, + list(zip(*sourcelists))[1] if sourcelists else [], + allow_duplicates, + ) + finally: + for section, sourcelist in sourcelists: + section.data = sourcelist.get_raw_section_data() + + @contextlib.contextmanager + def patches(self, allow_duplicates: bool = False) -> Iterator[Patches]: + """ + Context manager for accessing patches. + + Args: + allow_duplicates: Whether to allow duplicate entries when adding new patches. + + Yields: + Spec file patches as `Patches` object. + """ + with self.sections() as sections, self.tags() as tags: + patchlists = [ + (s, SourceList.parse(s)) for s in sections if s.name == "patchlist" + ] + try: + yield Patches( + tags, + list(zip(*patchlists))[1] if patchlists else [], + allow_duplicates, + ) + finally: + for section, patchlist in patchlists: + section.data = patchlist.get_raw_section_data() + def add_changelog_entry( self, entry: Union[str, List[str]],