diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..8166c39 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +max-line-length = 120 +ignore = E203, W503 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..e37ac6f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,62 @@ +name: CI + +on: [push, pull_request] + +jobs: + test: + name: Test Python ${{ matrix.python-version }} + strategy: + fail-fast: false + matrix: + python-version: ['3.7', '3.8', '3.9'] + env: + PYTHON: ${{ matrix.python-version }} + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: set up python + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: install + run: make install-dev + - name: Lint + run: make check + - name: test + run: make test + - name: coverage + run: make coverage + + - uses: codecov/codecov-action@v1 + with: + file: coverage.xml + env_vars: PYTHON + + deploy: + name: Build and Deploy + needs: test + if: "success() && startsWith(github.ref, 'refs/tags/')" + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: set up python + uses: actions/setup-python@v2 + with: + python-version: 3.7 + + - name: install + run: make install-dev + + - name: build + run: make build + + - name: upload to pypi + run: twine upload dist/* + env: + TWINE_USERNAME: __token__ + TWINE_PASSWORD: ${{ secrets.pypi_token }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68e786a --- /dev/null +++ b/.gitignore @@ -0,0 +1,29 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +build/ +dist/ +pip-wheel-metadata/ +wheels/ +share/python-wheels/ +*.egg-info/ +*.egg +MANIFEST + +# Unit test / coverage reports +.coverage +.coverage.* +coverage.xml +.pytest_cache/ + +# Environments +.venv + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..93532d6 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## 0.1.0 2021-07-29 + +- Initial release diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..86361c4 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Naya Verdier + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..4b1f8f7 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,4 @@ +include README.md +include CHANGELOG.md +include LICENSE +include instater/VERSION diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..20eba3a --- /dev/null +++ b/Makefile @@ -0,0 +1,55 @@ +.DEFAULT_GOAL := all + +black = black instater tests +flake8 = flake8 instater tests +isort = isort instater tests +mypy = mypy instater +install-pip = python -m pip install -U setuptools pip wheel +test = pytest --cov=instater --cov-report term-missing tests/ + +.PHONY: install +install: + $(install-pip) + pip install -e . + +.PHONY: install-dev +install-dev: + $(install-pip) + pip install -e ".[dev]" + +.PHONY: format +format: + $(isort) + $(black) + +.PHONY: check +check: + $(isort) --check-only --df + $(black) --check --diff + $(flake8) + $(mypy) + +.PHONY: test +test: + $(test) + +.PHONY: coverage +coverage: + coverage xml + +.PHONY: build +build: + python setup.py sdist bdist_wheel + twine check dist/* + +.PHONY: clean +clean: + rm -rf `find . -name __pycache__` + rm -rf .pytest_cache + rm -rf .mypy_cache + rm -rf build + rm -rf dist + rm -rf *.egg-info + rm -f .coverage + rm -f .coverage.* + rm -f coverage.xml diff --git a/README.md b/README.md new file mode 100644 index 0000000..bdd3028 --- /dev/null +++ b/README.md @@ -0,0 +1,123 @@ +# instater + +An easy solution for system/dotfile configuration + +Loosely based off of Ansible for the task and file organization + +## Installation + +```bash +pip3 install instater +``` + +## Usage + +See the File Structure Example below to set up variables, files, and tasks. + +Once a `setup.yml` file is created, it can be run using + +```bash +instater + +# or: +instater --setup-file setup.yml +``` + +To see what changed will be made, but not actually make then, use `--dry-run`: + +```bash +instater --dry-run +``` + +For a complete example, see [dotfiles](https://github.com/nayaverdier/dotfiles) + +### File Structure Example + +First, create a `setup.yml` file: + +```yaml +# Lots of ways to prompt for data at the beginning of execution +vars_prompt: +- name: my_var +- name: custom_prompt + prompt: Enter something here +- name: private_var + private: true +- name: private_confirm_var + private: true + confirm: true +- name: allow_empty_var + allow_empty: true + +# variables that can be used within tasks/files can be populated +# from a static file, in this case vars/common.yml +vars_files: +- vars/common.yml +# variables can be used within the file names +- "vars/{{ vars_file }}.yml" + +# All of the tasks to perform are enumerated +tasks: +- name: Copy file + # {{ username }} is replaced with the variable `username` + copy: + content: "The contents of a new file in here" + dest: "/home/{{ username }}/Downloads/file1" + mode: "600" + # if desired, the output of this task can be registered to use as + # a condition for subsequent tasks + register: file1_copy +- name: Run a command if file1 written + command: "touch /home/{{ username }}/testfile" + when: file1_copy.changed +``` + +Then, create a `vars/` directory and `common.yml` within: + +```yaml +my_test: 1 +some_var: "{{ my_test + 2 }}" +vars_file: "second" +username: something +``` + +And `vars/second.yml` (since `common.yml` set `vars_file` to `second`): + +```yaml +from_second_yml: data in here +``` + +Now in all of the tasks, `my_test`, `username`, `from_second_yml`, etc will be +present and accessible. + +### Tasks + +All currently defined tasks are listed below + +#### aur + +#### command + +#### copy + +#### debug + +#### directory + +#### file + +#### get_url (alias of `copy`) + +#### git + +#### group + +#### include + +#### pacman + +#### service + +#### template (alias of `copy`) + +#### user diff --git a/instater/VERSION b/instater/VERSION new file mode 100644 index 0000000..6e8bf73 --- /dev/null +++ b/instater/VERSION @@ -0,0 +1 @@ +0.1.0 diff --git a/instater/__init__.py b/instater/__init__.py new file mode 100644 index 0000000..5d7e658 --- /dev/null +++ b/instater/__init__.py @@ -0,0 +1,18 @@ +from importlib import resources + +from rich.traceback import install + +from . import tasks +from .exceptions import InstaterError +from .main import run_tasks + +install() + +VERSION = resources.read_text("instater", "VERSION").strip() + +__all__ = [ + "InstaterError", + "VERSION", + "run_tasks", + "tasks", +] diff --git a/instater/cli.py b/instater/cli.py new file mode 100644 index 0000000..50295ac --- /dev/null +++ b/instater/cli.py @@ -0,0 +1,57 @@ +import json +from argparse import ArgumentParser + +from rich.console import Console + +from instater import InstaterError, run_tasks + + +def _parse_variables(vars: str) -> dict: + if vars is None: + return {} + + try: + parsed = json.loads(vars) + if not isinstance(parsed, dict): + raise InstaterError(f"JSON variables from --vars must be a dictionary, found '{type(parsed)}'") + return parsed + except json.JSONDecodeError: + pass + + split = vars.replace(";", " ").split(" ") + parsed = {} + for item in split: + item = item.strip() + if not item: + continue + + if "=" not in item: + raise InstaterError(f"Invalid argument to --vars: '{item}' (missing '=')") + key, value = item.split("=") + parsed[key] = value + + return parsed + + +def main(): + parser = ArgumentParser(description="An easy solution for system/dotfile configuration") + parser.add_argument("--setup-file", default="setup.yml", help="The setup file to execute") + parser.add_argument("--tags", nargs="*", help="Run only a subset of tasks by their tag") + parser.add_argument("--vars", help="Variables to override prompts or variable files") + parser.add_argument( + "--dry-run", + action="store_true", + help="Display operations that would be performed without actually running them", + ) + + args = parser.parse_args() + + tags = args.tags + + try: + variables = _parse_variables(args.vars) + run_tasks(args.setup_file, variables, tags, args.dry_run) + except InstaterError as e: + console = Console() + console.print(e, style="red") + console.print("Exiting...", style="red bold") diff --git a/instater/context.py b/instater/context.py new file mode 100644 index 0000000..fbd8c1a --- /dev/null +++ b/instater/context.py @@ -0,0 +1,66 @@ +import typing +from collections import Counter +from pathlib import Path +from typing import Iterable, List + +from jinja2 import Environment, FileSystemLoader +from rich.console import Console + +import instater + +from . import util + + +def _jinja_environment(root_directory: Path) -> Environment: + env = Environment(loader=FileSystemLoader(root_directory)) + env.filters["password_hash"] = util.password_hash + return env + + +class Context: + def __init__(self, root_directory: Path, extra_vars: dict, tags: Iterable[str], dry_run: bool = False): + self.root_directory = root_directory + self.tags = set(tags) + self.dry_run = dry_run + + extra_vars["instater_dir"] = str(root_directory) + self.variables = extra_vars + + self.jinja_env = _jinja_environment(root_directory) + self.tasks: List[instater.tasks.Task] = [] + self.statuses: typing.Counter[str] = Counter() + + self.console = Console() + self.print = self.console.print + + def jinja_object(self, template: object, extra_vars: dict = None) -> object: + if isinstance(template, str): + return self.jinja_string(template, extra_vars) + elif isinstance(template, list): + return [self.jinja_object(item, extra_vars) for item in template] + else: + return template + + def jinja_string(self, template: str, extra_vars: dict = None) -> str: + if isinstance(template, str): + vars = self.variables + if extra_vars: + vars = {**vars, **extra_vars} + return self.jinja_env.from_string(template).render(vars) + else: + return template + + def jinja_file(self, template_path: str, extra_vars: dict = None) -> str: + vars = self.variables + if extra_vars: + vars = {**vars, **extra_vars} + return self.jinja_env.get_template(template_path).render(vars) + + def print_summary(self): + skipped = self.statuses["skipped"] + changed = self.statuses["changed"] + + self.print("Summary:", style="bold") + self.print(f" skipped: {skipped}", style="blue") + # if this used style="yellow", the integer count would be turned blue by rich + self.print(f" [yellow]changed: {changed}[/yellow]") diff --git a/instater/exceptions.py b/instater/exceptions.py new file mode 100644 index 0000000..0328c18 --- /dev/null +++ b/instater/exceptions.py @@ -0,0 +1,2 @@ +class InstaterError(Exception): + pass diff --git a/instater/importer.py b/instater/importer.py new file mode 100644 index 0000000..72a5e50 --- /dev/null +++ b/instater/importer.py @@ -0,0 +1,30 @@ +import pkgutil +import sys +from pathlib import Path + + +def _import_exception_handler(module_name: str): + print(f"Exception when importing module {module_name}") + raise + + +_loaded = set() + + +def import_submodules(module_name: str): + if module_name in _loaded: + return + + module = sys.modules[module_name] + + path = Path(module.__file__).parent.absolute() + prefix = module.__name__ + paths = [str(path)] + + for loader, submodule_name, is_pkg in pkgutil.walk_packages(paths, prefix + ".", onerror=_import_exception_handler): + if is_pkg or submodule_name.split(".")[-1].startswith("_"): + continue + + loader.find_module(submodule_name).load_module(submodule_name) # type: ignore + + _loaded.add(module_name) diff --git a/instater/main.py b/instater/main.py new file mode 100644 index 0000000..2332e64 --- /dev/null +++ b/instater/main.py @@ -0,0 +1,216 @@ +import getpass +from glob import glob +from pathlib import Path +from typing import Iterable, List, Union + +import yaml # type: ignore + +from . import util +from .context import Context +from .exceptions import InstaterError +from .tasks import TASKS + + +def _print_start(context: Context, setup_file: Path): + context.print(f"Beginning instater execution from {setup_file.absolute()}", style="green bold") + if context.tags: + colored_tags = ", ".join(f"[bold]{tag}[/bold]" for tag in context.tags) + context.print("Only executing specified tags:", colored_tags, style="blue") + + if context.variables: + formatted_vars = " ".join(f"{key}={value!r}" for key, value in context.variables.items()) + context.print("Overridden variables:", formatted_vars, style="blue") + + print() + + +def _get_raw_input(prompt: str, private: bool) -> str: + if private: + return getpass.getpass(prompt) + else: + return input(prompt) + + +def _get_input(prompt: str, private: bool, allow_empty: bool) -> str: + while True: + raw_input = _get_raw_input(prompt, private) + if allow_empty or raw_input: + return raw_input + + +def _do_prompt(prompt: str, private, confirm, allow_empty) -> str: + prompt += ": " + while True: + input = _get_input(prompt, private, allow_empty) + if not confirm: + return input + + confirm_input = _get_input("Confirm: ", private, allow_empty) + if input == confirm_input: + return input + print("Inputs did not match") + + +def _prompt_variables(prompt_vars, context: Context): + if not prompt_vars: + return + + if not isinstance(prompt_vars, list): + prompt_vars = [prompt_vars] + + for prompt_var in prompt_vars: + if isinstance(prompt_var, str): + prompt_var = {"name": prompt_var} + + name = prompt_var.get("name") + if not name: + raise InstaterError("Missing 'name' in vars_prompt") + + # skip variables that were defined in the override_variables parameter + # to run_tasks (enables skipping user input from the command line) + if name in context.variables: + continue + + prompt = prompt_var.get("prompt") or f"Enter a value for {name}" + private = prompt_var.get("private") + confirm = prompt_var.get("confirm") + allow_empty = prompt_var.get("allow_empty") + context.variables[name] = _do_prompt(prompt, private, confirm, allow_empty) + + +def _file_variables(files, context: Context): + if not files: + return + + if not isinstance(files, list): + files = [files] + + for file in files: + file = context.root_directory / context.jinja_string(file) + with file.open() as f: + raw_vars = yaml.safe_load(f) + + for var, value in raw_vars.items(): + context.variables[var] = context.jinja_string(value) + + +def _extract_with(task_args: dict) -> List[dict]: + fileglob = task_args.pop("with_fileglob", None) + + if not util.single_truthy(fileglob, allow_zero=True): + raise InstaterError("Must provide at most one `with_*` looping attribute") + + if fileglob: + return [{"item": path} for path in glob(fileglob, recursive=True)] + + return [{}] + + +def _load_task_item(args: dict, tags: List[str], item: dict, context: Context): + tags = list(map(context.jinja_string, tags)) + + # special handling for "include" tasks since it needs to be loaded prior to + # actually running the task (and prior to filtering out tags) + if "include" in args: + replaced = {key: context.jinja_object(value, extra_vars=item) for key, value in args.items()} + _include(context, tags, **replaced) # type: ignore + return + + # If a list of tags to execute is given, don't even load tasks that don't match. + if context.tags and not any(tag in context.tags for tag in tags): + return + + # match against a Task class by finding a key in the arguments with the name + # (a key called 'copy', 'pacman', etc) + task_name = next((name for name in TASKS if name in args), None) + if task_name is None: + raise InstaterError(f"No task matched: {args}") + + TaskClass = TASKS[task_name] + action_args = args.pop(task_name) + + # If there aren't nested args, just a scalar arg, keep it under the task name + # (e.g. `command: ""` doesn't have nested task arguments) + if not isinstance(action_args, dict): + action_args = {task_name: action_args} + + all_args = {**args, **action_args} + replaced_args = {key: context.jinja_object(value, extra_vars=item) for key, value in all_args.items()} + + try: + context.tasks.append(TaskClass(**replaced_args)) + except (InstaterError, TypeError) as e: + name = args.get("name") + error_name = f"'{name}' ({task_name})" if name else f"'{task_name}'" + raise InstaterError(f"Error loading task {error_name}: {e}") + + +def _load_task(task_args: dict, tags: List[str], context: Context): + with_items = _extract_with(task_args) + for item in with_items: + _load_task_item(task_args.copy(), tags, item, context) + + +def _load_tasks(task_list, context: Context, extra_tags: List[str] = None): + if not task_list: + return + + if not isinstance(task_list, list): + raise InstaterError("Task definitions must be in a list") + + for task_args in task_list: + tags = task_args.pop("tags", None) or [] + if isinstance(tags, str): + tags = [tags] + + if extra_tags: + tags.extend(extra_tags) + + _load_task(task_args, tags, context) + + +# TODO: keep track of previously included files to prevent circular dependencies +def _include(context: Context, parent_tags: List[str], include: str, tags: Union[str, List[str]] = None): + include_file = context.root_directory / include + if not include_file.exists(): + raise InstaterError(f"Included file does not exist: {include_file}") + + with include_file.open() as f: + tasks = yaml.safe_load(f) + + tags = tags or [] + if isinstance(tags, str): + tags = [tags] + + tags.extend(parent_tags) + + _load_tasks(tasks, context, tags) + + +def run_tasks(setup_file, override_variables: dict = None, tags: Iterable[str] = None, dry_run: bool = False): + setup_file = Path(setup_file) + context = Context(setup_file.parent, override_variables or {}, tags or (), dry_run) + + if not setup_file.exists(): + raise InstaterError(f"Setup file does not exist: {setup_file}") + + _print_start(context, setup_file) + + with setup_file.open() as f: + setup_data = yaml.safe_load(f) + + if isinstance(setup_data, list): + if len(setup_data) > 1: + raise InstaterError(f"Cannot specify multiple root list items in {setup_file}") + setup_data = setup_data[0] + + _prompt_variables(setup_data.get("vars_prompt"), context) + _file_variables(setup_data.get("vars_files"), context) + _load_tasks(setup_data.get("tasks"), context) + + for task in context.tasks: + task.run_task(context) + print() + + context.print_summary() + return context diff --git a/instater/tasks/__init__.py b/instater/tasks/__init__.py new file mode 100644 index 0000000..f934df5 --- /dev/null +++ b/instater/tasks/__init__.py @@ -0,0 +1,6 @@ +from ..importer import import_submodules +from ._task import TASKS, Task + +import_submodules(__name__) + +__all__ = ["TASKS", "Task"] diff --git a/instater/tasks/_task.py b/instater/tasks/_task.py new file mode 100644 index 0000000..b0ed9f3 --- /dev/null +++ b/instater/tasks/_task.py @@ -0,0 +1,45 @@ +from ..context import Context +from ..exceptions import InstaterError +from ..util import snake_case + +TASKS = {} + + +class Task: + def __init__( + self, + name: str = None, + when: str = None, + register: str = None, + ): + self.name = name or "Unnamed " + snake_case(type(self).__name__) + self.when = when + self.register = register + + def __init_subclass__(cls): + TASKS[snake_case(cls.__name__)] = cls + + def run_task(self, context: Context) -> bool: + context.print(f"TASK [{self.name}]", style="black bold on blue", justify="left") + + if self.when and context.jinja_string("{{ (" + self.when + ") == False }}") == "True": + changed = False + else: + changed = self.run_action(context) + + if changed: + context.statuses["changed"] += 1 + context.print("changed", style="yellow bold") + else: + context.statuses["skipped"] += 1 + context.print("skipped", style="blue") + + if self.register: + if self.register in context.variables: + raise InstaterError(f"Task registered as {self.register} conflicts with an existing variable") + context.variables[self.register] = {"changed": changed} + + return changed + + def run_action(self, context: Context) -> bool: + raise NotImplementedError diff --git a/instater/tasks/command.py b/instater/tasks/command.py new file mode 100644 index 0000000..bac23e6 --- /dev/null +++ b/instater/tasks/command.py @@ -0,0 +1,49 @@ +import shlex +from typing import List, Union + +from .. import util +from ..context import Context +from ..exceptions import InstaterError +from . import Task + + +class Command(Task): + def __init__( + self, + command: Union[str, List[str]], + become: str = None, + condition: str = None, + condition_code: str = "0", + directory: str = None, + **kwargs, + ): + super().__init__(**kwargs) + + if isinstance(command, str): + command = [command] + + self.commands = list(map(self._parse_command, command)) + self.condition = self._parse_command(condition) if condition else None + self.condition_code = int(condition_code) + self.become = become + self.directory = directory + + def _parse_command(self, command: str) -> List[str]: + cmd = shlex.split(command) + if not cmd: + raise InstaterError(f"[{self.name}] No valid command specified") + + return cmd + + def run_action(self, context: Context) -> bool: + if self.condition: + # TODO: only consider specific return codes as valid? + result = util.shell(self.condition, self.directory, valid_return_codes=None) + if result.return_code != self.condition_code: + return False + + if not context.dry_run: + for command in self.commands: + util.shell(command, self.directory, become=self.become) + + return True diff --git a/instater/tasks/copy.py b/instater/tasks/copy.py new file mode 100644 index 0000000..556b503 --- /dev/null +++ b/instater/tasks/copy.py @@ -0,0 +1,175 @@ +import shlex +import shutil +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Union +from urllib.request import urlopen + +from .. import util +from ..context import Context +from ..exceptions import InstaterError +from . import Task + + +def _read(path: Path) -> bytes: + with path.open("rb") as file: + return file.read() + + +class Copy(Task): + def __init__( + self, + *, + src: str = None, + content: str = None, + url: str = None, + dest: str, + owner: str = None, + group: str = None, + mode: Union[str, int] = None, + is_template: util.Bool = False, + validate: str = None, + **kwargs, + ): + super().__init__(**kwargs) + + if not util.single_truthy(src, content, url): + raise InstaterError("Must provide exactly one source of data to copy") + + if isinstance(mode, str): + mode = int(mode, 8) + + self.src = Path(src) if src else None + self.content = content + self.url = url + self.dest = Path(dest) + self.owner = owner + self.group = group + self.mode = mode + self.is_template = util.boolean(is_template) + self.validate = validate + + def _update_metadata(self, file: Path, context: Context) -> bool: + return util.update_file_metadata(file, self.owner, self.group, self.mode, context.dry_run) + + def _validate(self, path: Path): + if not self.validate: + return + + validate_command = shlex.split(self.validate % path) + # util.shell will raise an error on exit codes > 0 + util.shell(validate_command) + + def _update_file_direct(self, src: Path, dest: Path, context: Context) -> bool: + updated = False + + self._validate(src) + + if not dest.exists(): + if not context.dry_run: + dest.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(src, dest) + updated = True + elif not src.samefile(dest) and _read(src) != _read(dest): + if not context.dry_run: + shutil.copy(src, dest) + updated = True + + updated |= self._update_metadata(dest, context) + return updated + + def _update_file_content(self, content: str, dest: Path, context: Context) -> bool: + updated = False + + if content is not None and not content.endswith("\n"): + content += "\n" + + if self.validate: + with NamedTemporaryFile() as temp_file: + temp_file.write(content.encode("utf-8")) + temp_file.flush() + self._validate(Path(temp_file.name)) + + if not dest.exists(): + if not context.dry_run: + dest.parent.mkdir(parents=True, exist_ok=True) + with dest.open("w") as f: + f.write(content) + updated = True + elif content.encode("utf-8") != _read(dest): + if not context.dry_run: + with dest.open("w") as f: + f.write(content) + updated = True + + updated |= self._update_metadata(dest, context) + return updated + + def _update_file_template(self, src: Path, dest: Path, context: Context) -> bool: + with src.open() as f: + content = context.jinja_string(f.read()) + + return self._update_file_content(content, dest, context) + + def _update_file(self, src: Path, dest: Path, context: Context): + if self.is_template: + return self._update_file_template(src, dest, context) + else: + return self._update_file_direct(src, dest, context) + + def _update_dir(self, context: Context) -> bool: + updated = False + src: Path = self.src # type: ignore + + for path in src.glob("**/*"): + if path.is_file(): + dest_path = self.dest / path.relative_to(src) + if self._update_file(path, dest_path, context): + updated = True + + return updated + + def run_action(self, context: Context) -> bool: + src = self.src + if src and not src.is_absolute(): + src = context.root_directory / src + + content = self.content + if self.url: + content = urlopen(self.url).read().decode("utf-8") + + dest = self.dest + if not self.dest.is_absolute(): + dest = context.root_directory / dest + + if src and not src.exists(): + raise InstaterError(f"Source to copy does not exist: {src}") + + if content is not None or (src and src.is_file()): + if dest.exists() and not dest.is_file(): + raise InstaterError(f"Destination is a directory, expected file: {dest}") + + if src: + return self._update_file(src, dest, context) + else: + content_str: str = content # type: ignore + if self.is_template: + content_str = context.jinja_string(content_str) + + return self._update_file_content(content_str, dest, context) + else: + if dest.exists() and not dest.is_dir(): + raise InstaterError(f"Destination is a file, expected directory: {dest}") + + return self._update_dir(context) + + +class Template(Copy): + def __init__(self, **kwargs): + kwargs.setdefault("is_template", True) + super().__init__(**kwargs) + + +class GetUrl(Copy): + def __init__(self, **kwargs): + super().__init__(**kwargs) diff --git a/instater/tasks/debug.py b/instater/tasks/debug.py new file mode 100644 index 0000000..01d6e4f --- /dev/null +++ b/instater/tasks/debug.py @@ -0,0 +1,13 @@ +from ..context import Context +from . import Task + + +class Debug(Task): + def __init__(self, msg: str, **kwargs): + super().__init__(**kwargs) + + self.msg = msg + + def run_action(self, context: Context) -> bool: + context.print(self.msg, style="white bold") + return False diff --git a/instater/tasks/file.py b/instater/tasks/file.py new file mode 100644 index 0000000..11f947a --- /dev/null +++ b/instater/tasks/file.py @@ -0,0 +1,107 @@ +import os +from pathlib import Path +from typing import Union + +from instater.exceptions import InstaterError + +from .. import util +from ..context import Context +from . import Task + + +class File(Task): + def __init__( + self, + *, + path: str, + target: str = None, + owner: str = None, + group: str = None, + mode: Union[str, int] = None, + directory: util.Bool = False, + symlink: util.Bool = False, + hard_link: util.Bool = False, + **kwargs, + ): + super().__init__(**kwargs) + + if isinstance(mode, str): + mode = int(mode, 8) + + if not util.single_truthy(directory, symlink, hard_link, allow_zero=True): + raise InstaterError(f"[{self.name}] Must only provide ony of directory, symlink, or hard_link") + + if target and (not symlink and not hard_link): + raise InstaterError("Must provide a target with symlink/hard_link") + + self.path = Path(path) + self.target = Path(target) if target is not None else None + self.owner = owner + self.group = group + self.mode = mode + self.directory = util.boolean(directory) + self.symlink = util.boolean(symlink) + self.hard_link = util.boolean(hard_link) + + def _create_file(self): + self.path.parent.mkdir(parents=True, exist_ok=True) + self.path.touch() + + def _create_directory(self): + self.path.mkdir(parents=True, exist_ok=True) + + def _create_symlink(self): + target: Path = self.target # type: ignore + os.symlink(target, self.path) + + def _create_hard_link(self): + target: Path = self.target # type: ignore + os.link(target, self.path) + + def run_action(self, context: Context): + updated = False + + if not self.path.exists(): + if not context.dry_run: + if self.directory: + self._create_directory() + elif self.symlink: + self._create_symlink() + elif self.hard_link: + self._create_hard_link() + else: + self._create_file() + updated = True + elif self.symlink: + if not self.path.is_symlink(): + raise InstaterError(f"Path exists but is not a symlink: {self.path}") + elif self.directory: + if not self.path.is_dir(): + raise InstaterError(f"Path exists but is not a directory: {self.path}") + elif self.hard_link: + pass + else: + if not self.path.is_file(): + raise InstaterError(f"Path exists but is not a file: {self.path}") + + updated |= util.update_file_metadata(self.path, self.owner, self.group, self.mode, context.dry_run) + + return updated + + +class Directory(File): + def __init__(self, **kwargs): + kwargs["directory"] = True + super().__init__(**kwargs) + + +class Symlink(File): + def __init__(self, **kwargs): + kwargs["symlink"] = True + super().__init__(**kwargs) + + +class HardLink(File): + def __init__(self, **kwargs): + kwargs["hard_link"] = True + super().__init__(**kwargs) diff --git a/instater/tasks/git.py b/instater/tasks/git.py new file mode 100644 index 0000000..cca56ea --- /dev/null +++ b/instater/tasks/git.py @@ -0,0 +1,55 @@ +from pathlib import Path + +from instater.exceptions import InstaterError + +from .. import util +from ..context import Context +from . import Task + + +class Git(Task): + def __init__(self, *, repo: str, depth: int = None, dest: str, become: str = None, **kwargs): + super().__init__(**kwargs) + self.repo = repo + self.depth = str(depth) + self.dest = Path(dest) + self.become = become + + def _clone(self): + command = ["git", "clone", self.repo] + if self.depth is not None: + command += ["--depth", self.depth] + + command.append(str(self.dest)) + util.shell(command, become=self.become) + + def _get_remote(self) -> str: + result = util.shell(["git", "config", "--get", "remote.origin.url"], directory=self.dest, become=self.become) + return result.stdout + + def _should_pull(self) -> bool: + result = util.shell(["git", "fetch", "--dry-run"], directory=self.dest, become=self.become) + return result.stdout != "" or result.stderr != "" + + def _pull(self): + branch = util.shell(["git", "branch", "--show-current"], directory=self.dest, become=self.become).stdout + util.shell(["git", "pull", "origin", branch], directory=self.dest, become=self.become) + + def run_action(self, context: Context): + if not self.dest.exists(): + if not context.dry_run: + self._clone() + return True + + if not (self.dest / ".git").exists(): + raise InstaterError(f"Git destination directory exists, but is not a git repo: {self.dest}") + + if self._get_remote() != self.repo: + raise InstaterError(f"Git remote does not match current local git repo: {self.dest}") + + if self._should_pull(): + if not context.dry_run: + self._pull() + return True + + return False diff --git a/instater/tasks/group.py b/instater/tasks/group.py new file mode 100644 index 0000000..35589e5 --- /dev/null +++ b/instater/tasks/group.py @@ -0,0 +1,23 @@ +from .. import util +from ..context import Context +from . import Task + + +class Group(Task): + def __init__(self, group: str, **kwargs): + super().__init__(**kwargs) + + self.group = group + + def _group_exists(self) -> bool: + result = util.shell(["getent", "group", self.group], valid_return_codes=(0, 2)) + return result.return_code == 0 + + def run_action(self, context: Context) -> bool: + if self._group_exists(): + return False + + if not context.dry_run: + util.shell(["groupadd", self.group]) + + return True diff --git a/instater/tasks/pacman.py b/instater/tasks/pacman.py new file mode 100644 index 0000000..af6557c --- /dev/null +++ b/instater/tasks/pacman.py @@ -0,0 +1,59 @@ +from typing import List, Union + +from .. import util +from ..context import Context +from . import Task + + +def _not_installed(package: str) -> bool: + result = util.shell(["pacman", "-Qi", package], valid_return_codes=(0, 1)) + return result.return_code != 0 + + +class Pacman(Task): + def __init__(self, *, packages: Union[str, List[str]], **kwargs): + super().__init__(**kwargs) + + if isinstance(packages, str): + packages = [packages] + + self.packages = packages + + def _install(self, packages: List[str]): + util.shell(["pacman", "-Sy", "--noconfirm", "--noprogressbar", "--needed", *packages]) + + def run_action(self, context: Context) -> bool: + not_installed = list(filter(_not_installed, self.packages)) + + if not not_installed: + return False + + if not context.dry_run: + self._install(not_installed) + + return True + + +# TODO: possible to just combine this with Pacman class, with +# a flag to decide makepkg/yay/pacman for installation method? +class Aur(Task): + def __init__(self, *, packages: Union[str, List[str]], become: str = None, **kwargs): + super().__init__(**kwargs) + + if isinstance(packages, str): + packages = [packages] + + self.packages = packages + self.become = become + + def run_action(self, context: Context) -> bool: + not_installed = list(filter(_not_installed, self.packages)) + + if not not_installed: + return False + + if not context.dry_run: + # TODO: support makepkg directly instead of just yay? + util.shell(["yay", "-Sy", "--noconfirm", "--needed", "--cleanafter", *not_installed], become=self.become) + + return True diff --git a/instater/tasks/service.py b/instater/tasks/service.py new file mode 100644 index 0000000..7bd7667 --- /dev/null +++ b/instater/tasks/service.py @@ -0,0 +1,39 @@ +from .. import util +from ..context import Context +from . import Task + + +class Service(Task): + def __init__(self, service: str, started: util.Bool = False, enabled: util.Bool = False, **kwargs): + super().__init__(**kwargs) + + self.service = service + self.started = util.boolean(started) + self.enabled = util.boolean(enabled) + + def _is_started(self) -> bool: + result = util.shell(["systemctl", "is-active", self.service], valid_return_codes=(0, 3)) + return result.stdout == "active" + + def _is_enabled(self) -> bool: + result = util.shell(["systemctl", "is-enabled", self.service], valid_return_codes=(0, 1)) + return result.stdout == "enabled" + + def run_action(self, context: Context) -> bool: + updated = False + + if self.started and not self._is_started(): + if not context.dry_run: + util.shell(["systemctl", "start", self.service]) + updated = True + + if self.enabled and not self._is_enabled(): + if not context.dry_run: + util.shell(["systemctl", "enable", self.service]) + updated = True + elif not self.enabled and self._is_enabled(): + if not context.dry_run: + util.shell(["systemctl", "disable", self.service]) + updated = True + + return updated diff --git a/instater/tasks/user.py b/instater/tasks/user.py new file mode 100644 index 0000000..ed00f31 --- /dev/null +++ b/instater/tasks/user.py @@ -0,0 +1,90 @@ +from typing import Iterable, List, Optional, Union + +from .. import util +from ..context import Context +from . import Task + + +def _user_exists(user: str) -> bool: + result = util.shell(["getent", "passwd", user], valid_return_codes=(0, 2)) + return result.return_code == 0 + + +def _create_user(user: str, system: bool, password: Optional[str]): + command = ["useradd", user] + + if system: + command.append("--system") + + if password: + command.append("--password") + command.append(password) + + util.shell(command) + + +def _list_groups(user: str) -> List[str]: + result = util.shell(["groups", user]) + return result.stdout.split(" ") + + +def _add_groups(user: str, groups: Iterable[str]): + group_str = ",".join(groups) + util.shell(["usermod", "-a", "-G", group_str, user]) + + +def _get_shell(user: str) -> str: + result = util.shell(["getent", "passwd", user]) + return result.stdout.split(":")[-1] + + +def _set_shell(user: str, shell: str): + util.shell(["usermod", "-s", shell, user]) + + +class User(Task): + def __init__( + self, + user: str, + system: util.Bool = None, + password: str = None, + shell: str = None, + groups: Union[str, List[str]] = None, + **kwargs + ): + super().__init__(**kwargs) + + if isinstance(groups, str): + groups = [groups] + + self.user = user + self.system = util.boolean(system) + self.password = password + self.shell = shell + self.groups = groups or [] + + def run_action(self, context: Context) -> bool: + updated = False + + user_exists = _user_exists(self.user) + if not user_exists: + if not context.dry_run: + _create_user(self.user, self.system, self.password) + updated = True + missing_groups: Iterable[str] = self.groups + else: + all_groups = _list_groups(self.user) + missing_groups = set(self.groups) - set(all_groups) + + if missing_groups: + if not context.dry_run: + _add_groups(self.user, self.groups) + updated = True + + if self.shell is not None and self.shell != _get_shell(self.user): + _set_shell(self.user, self.shell) + updated = True + + # TODO: detect if password needs to change? + + return updated diff --git a/instater/util.py b/instater/util.py new file mode 100644 index 0000000..0a83c08 --- /dev/null +++ b/instater/util.py @@ -0,0 +1,90 @@ +import re +import shutil +import subprocess +from pathlib import Path +from typing import Iterable, List, Optional, Union + +from passlib.hash import sha512_crypt # type: ignore + +from .exceptions import InstaterError + +Bool = Union[str, bool, int, None] + + +class ShellResult: + def __init__(self, return_code: int, stdout: str, stderr: str): + self.return_code = return_code + self.stdout = stdout + self.stderr = stderr + + +def shell( + command: List[str], + directory: Union[str, Path] = None, + become: str = None, + valid_return_codes: Optional[Iterable[int]] = (0,), +): + if become: + command = ["sudo", "-u", become] + command + + result = subprocess.run(command, cwd=directory, capture_output=True) + if valid_return_codes and result.returncode not in valid_return_codes: + command_str = " ".join(command) + error = result.stderr.decode("utf-8") + raise InstaterError(f"Unexpected error from '{command_str}' (exit code {result.returncode}):\n\n{error}") + + return ShellResult(result.returncode, result.stdout.decode("utf-8").strip(), result.stderr.decode("utf-8").strip()) + + +def update_file_metadata( + path: Path, + owner: Optional[str], + group: Optional[str], + mode: Optional[int], + dry_run: bool, +) -> bool: + updated = False + + # If running with dry_run and the path does not exist, the below checks error + if dry_run and not path.exists(): + return True + + if (owner or group) and (path.owner() != owner or path.group() != group): + if not dry_run: + shutil.chown(path, owner, group) + updated = True + + current_mode = path.stat().st_mode & 0o777 + if mode is not None and (current_mode != mode): + if not dry_run: + path.chmod(mode) + updated = True + + return updated + + +def boolean(input: Bool) -> bool: + # converting to string covers int (0) and bool (False) cases + return bool(input) and str(input).lower() not in ("no", "false", "0") + + +def password_hash(password: str, hashtype: str = "sha512") -> str: + if hashtype != "sha512": + raise InstaterError(f"password_hash hashtype must be sha512, found '{hashtype}'") + + return sha512_crypt.hash(password) + + +_SNAKE_CASE = re.compile(r"(? str: + return _SNAKE_CASE.sub("_", camel_case).lower() + + +def single_truthy(*args, allow_zero: bool = False) -> bool: + count = tuple(map(bool, args)).count(True) + if allow_zero: + return count <= 1 + else: + return count == 1 diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..96fa357 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,6 @@ +[tool.black] +line-length = 120 + +[tool.isort] +profile = "black" +line_length = 120 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..91cbed4 --- /dev/null +++ b/setup.py @@ -0,0 +1,54 @@ +from pathlib import Path + +from setuptools import setup + + +ROOT_DIRECTORY = Path(__file__).resolve().parent + +description = "An easy solution for system/dotfile configuration" +readme = (ROOT_DIRECTORY / "README.md").read_text() +changelog = (ROOT_DIRECTORY / "CHANGELOG.md").read_text() +long_description = readme + "\n\n" + changelog + +version = (ROOT_DIRECTORY / "instater" / "VERSION").read_text().strip() + + +DEV_REQUIRES = [ + "black==21.9b0", + "coverage==6.0.2", + "flake8==4.0.1", + "flake8-bugbear==21.9.2", + "isort==5.9.3", + "mypy==0.910", + "pytest==6.2.5", + "pytest-cov==3.0.0", + "twine==3.4.2", +] + +setup( + name="instater", + version=version, + description=description, + long_description=long_description, + long_description_content_type="text/markdown", + classifiers=[ + "Programming Language :: Python", + "Programming Language :: Python :: 3", + ], + author="Naya Verdier", + url="https://github.com/nayaverdier/instater", + license="MIT", + packages=["instater"], + entry_points={"console_scripts": ["instater = instater.cli:main"]}, + install_requires=[ + "Jinja2==3.0.2", + "PyYAML==6.0", + "passlib==1.7.4", + "rich==10.12.0", + ], + python_requires=">=3.7", + extras_require={ + "dev": DEV_REQUIRES, + }, + include_package_data=True, +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_imports.py b/tests/test_imports.py new file mode 100644 index 0000000..3c699ea --- /dev/null +++ b/tests/test_imports.py @@ -0,0 +1,10 @@ +def test_module_import(): + import instater + + assert instater.__all__ == ["InstaterError", "VERSION", "run_tasks", "tasks"] + + +def test_direct_import(): + from instater import VERSION # noqa: F401 + from instater import InstaterError # noqa: F401 + from instater import tasks # noqa: F401