From 9a089624f80b22075c3e09eaf3c2575b7adab8a7 Mon Sep 17 00:00:00 2001 From: Ro Date: Tue, 13 Aug 2024 13:17:26 -0400 Subject: [PATCH] wip: add backup gui --- sdw_backup/migrate_config.py | 255 +++++++++++++++++++++++ sdw_backup/sdw_admin.py | 173 ++++++++++++++++ sdw_backup/sdw_config_app.py | 376 ++++++++++++++++++++++++++++++++++ sdw_backup/validate_config.py | 154 ++++++++++++++ 4 files changed, 958 insertions(+) create mode 100644 sdw_backup/migrate_config.py create mode 100755 sdw_backup/sdw_admin.py create mode 100644 sdw_backup/sdw_config_app.py create mode 100755 sdw_backup/validate_config.py diff --git a/sdw_backup/migrate_config.py b/sdw_backup/migrate_config.py new file mode 100644 index 00000000..aa08242e --- /dev/null +++ b/sdw_backup/migrate_config.py @@ -0,0 +1,255 @@ +#!/usr/bin/python3 +""" +SecureDrop Workstation Qubes 4.1 -> 4.2 Migration helper. +Meant to be run in dom0, this utility will collect credentials and assets for +migration from various parts of your QubesOS system. + +At the end of this script the contents will be placed in a directory called "migration", +that should be transferred to a LUKS-encrypted drive attached to a non-networked VM (eg vault). + +Note: dom0 in Qubes 4.1 uses Python 3.8. +""" + +from enum import Enum +import logging +import os +from pathlib import Path +import shutil +import subprocess +import sys + +SDW_CONFIG_DIR = "/usr/share/securedrop-workstation-dom0-config" +SDW_CONFIG_FILES_DOM0 = ["config.json", "sd-journalist.sec"] +QUBES_DIR = "/etc/qubes" +DOM0_MIN_FREE_SPACE_BUFFER_KILOBYTES = 512000 # 500MB + +logger = logging.getLogger(__name__) + +class BackupStatus(Enum): + COMPLETE = 0, + INCOMPLETE = 1, + +class BackupException(Exception): + pass + +class BackupTarget: + def __init__(self, name, hint): + self.name = name + self.hint = hint + +class BackupConfig: + def __init__(self) -> None: + super().__init__() + self.backup_status = {} + self.vms = [BackupTarget("dom0", "dom0 configuration files and /etc/qubes directory"), BackupTarget("sd-app", "/home/user/.securedrop_client directory"), BackupTarget("sd-gpg", "sd-gpg GPG private keys")] + for vm in self.vms: + self.backup_status[vm.name] = BackupStatus.INCOMPLETE + self.migration_dir = None + self.fingerprint = None + + def _intro(self): + print("SecureDrop Workstation Qubes 4.1 -> 4.2 Migration helper") + print("This script is meant to be run in dom0.") + print("It will collect credentials and assets for migration from various parts of your QubesOS system.") + print("Trust, but verify! Review this script before running it.") + + confirmation = input("Continue? (y/Y to continue, any key to quit)") + if confirmation.lower() != "y": + print("Aborting") + sys.exit(1) + + def _create_migration_dir(self): + self.migration_dir = Path(Path.cwd(), "migration") + Path.mkdir(self.migration_dir) + + def _qvm_run_io(self, vm, args: str, output_fd = None) -> str: + """ + Run command in a given qube via qvm-run and return str-formatted output. + If output_fd, representing an file descriptor, is provided, write directly to the + output file. Otherwise write to stdout. + """ + # qubesadmin.tools.qvm_run.main returns the exit code but prints the output to stderr, + # whereas we want to capture the output, so use subprocess + try: + return subprocess.check_output(["qvm-run", "--pass-io", vm, args]).decode() + except subprocess.CalledProcessError as e: + raise BackupException from e + + def _dom0(self): + self.dom0_dir = Path(self.migration_dir, "dom0") + Path.mkdir(self.dom0_dir) + + # /usr/share/securedrop-workstation-dom0-config + for file in SDW_CONFIG_FILES_DOM0: + + shutil.copy(Path(SDW_CONFIG_DIR, file), self.dom0_dir) + + # Copy dom0 /etc/qubes (such as /etc/qubes/policy.d) to include in a dom0 backup + shutil.copytree(QUBES_DIR, Path(self.dom0_dir, "etc-qubes")) + + # Store the GPG fingerprint from config.json + with open(Path(self.dom0_dir, "config.json")) as f: + for line in f: + if "submission_key_fpr" in line.lower(): + self.fingerprint = line.split()[1].strip() + + if len(self.fingerprint) == 40: + self.backup_status["dom0"] = BackupStatus.COMPLETE + + def _list_fingerprints(self, gpg_console_lines: str): + """ + Helper to parse console output from `gpg` and return PGP fingerprints. + Used with the str-formatted output of `gpg -K`, equivalent to + `gpg -K --with-colons | grep "fpr" | cut -d: -f10` (and `wc -l`) + """ + fingerprints = [] + gpg_console_lines = self._qvm_run_io("sd-gpg", "gpg -K --with-colons").split() + for line in gpg_console_lines: + if "fpr" in line: + fingerprints.append(line.split(":")[9]) + + return fingerprints + + def _sd_gpg(self): + """ + Retrieve secret keys from sd-gpg, checking that at least one matches the fingerprint + in config.json. + """ + sd_gpg_keys_name = "sd_secret_keys_armored.asc" + self.sd_gpg_dir = Path(self.migration_dir, "sd-gpg") + Path.mkdir(self.sd_gpg_dir) + remote_fprs = [] + local_fprs = [] + + try: + # qvm-run --pass-io sd-gpg "gpg -K --with-colons | grep fpr | cut -d: -f10" + output = self._qvm_run_io("sd-gpg", "gpg -K --with-colons").split() + + remote_fprs = self._list_fingerprints(output) + print(f"Found {len(remote_fprs)} key(s) to export from sd-gpg") + + except BackupException as e: + logger.error(f"Failed to check sd-gpg keyring: {e}") + + # qvm-run --pass-io sd-gpg 'gpg -a --export-secret-keys' > sd-gpg/sd_secret_keys_armored.asc + try: + export_keys = self._qvm_run_io("sd-gpg", "gpg -a --export-secret-keys") + with open(Path(self.sd_gpg_dir, sd_gpg_keys_name), "w+") as f: + f.write(export_keys) + + except BackupException as ex: + logger.error(f"Failed to export sd-gpg keyring: {ex}") + + # Create emphemeral gpg home directory and importing key(s) to check that import was + # well-formed.C lean up afterwards by removing this directory. + # Path.mkdir takes mode in decimal instead of octal. 0o700 = 448. + tmp_gpg_dir = Path(self.sd_gpg_dir, "tmp-gpg") + Path.mkdir(tmp_gpg_dir, mode=448) + gpg_args = ["gpg", "--homedir", str(tmp_gpg_dir), "--import", "-q", f"{self.sd_gpg_dir}/{sd_gpg_keys_name}"] + gpg_check_args = ["gpg", "--homedir", str(tmp_gpg_dir), "-K", "--with-colons"] + + try: + subprocess.check_call(gpg_args) + gpg_output = subprocess.check_output(gpg_check_args).decode().split() + + local_fprs = self._list_fingerprints(gpg_output) + print(f"Retrieved the following key(s):\n{local_fprs}") + + except subprocess.CalledProcessError as err: + logger.error(f"Failed to recheck sd-gpg keys on dom0: {err}") + + finally: + self._rm_gpgdir(tmp_gpg_dir) + + if self.fingerprint in local_fprs and len(local_fprs) == len(remote_fprs): + self.backup_status["sd-gpg"] = BackupStatus.COMPLETE + elif self.fingerprint not in local_fprs: + print("Problem: Submission Key Fingerprint in config.json does not match any keys in sd-gpg.") + else: + print("Some keys may not have been imported successfully. Recheck sd-gpg keyring.") + + def _rm_gpgdir(self, target): + """ + Helper. Delete gpg directory, first attempting to shred files in private_keys_v1.d. + """ + try: + if os.path.exists(target): + shutil.rmtree(target) + except OSError: + print(f"Problem: Could not delete {target}. Remove it manually.") + + # def _sd_app(self): + # self.sd_app_dir = Path(self.migration_dir, "sd-app") + # Path.mkdir(self.sd_app_dir) + # # qvm-run --pass-io sd-app 'du -sh --block-size=1k /home/user/.securedrop_client' | cut -f1 + # # kilobytes + # args = "du -sh --block-size=1k /home/user/.securedrop_client" + + # try: + # data_size = self._qvm_run_io("sd-app", args).split()[0] + # print(f"{data_size} of uncompressed data on sd-app") + + # # free_space_dom0=$(df -h /dev/mapper/qubes_dom0-root -k --output=avail | tail -n-1) + # # kilobytes + # dom0_args = ["df", "-h", "/dev/mapper/qubes_dom0-root", "-k", "--output=avail"] + # dom0_space = subprocess.check_output(dom0_args).decode().split()[1] + + # # Small amoumts of coercion + # if int(dom0_space) - int(data_size) <= DOM0_MIN_FREE_SPACE_BUFFER_KILOBYTES: + + # # We'd probably be fine; the backup will compress and we won't be cutting it this close. + # # But err on the side of caution and don't fill up dom0. + # print("Problem: /home/user.securedrop_client on sd-app is too large to transfer.") + # print("Please back up sd-app manually using the Qubes GUI backup tool and a strong backup passphrase.") + # print("For assistance, contact Support.") + + # else: + # # qvm-run --pass-io sd-app 'tar -cz -C /home/user .securedrop_client' > sd-app/securedrop_client.tar.gz + # # This could be a lot of data, so write it to a file instead of stdout so as not to fill the pipe. + # with open(Path(self.sd_app_dir, "securedrop_client.tar.gz"), "w+") as archive: + # popen = subprocess.Popen(["qvm-run", "--pass-io", "sd-app", "tar -cz -C /home/user .securedrop_client"], stdout=archive) + + # except (BackupException, subprocess.CalledProcessError): + # # todo + # pass + + def _print_final_instructions(self): + print("You are responsible for preserving any of your own customizations, eg via the Qubes Backup tool.") + print("Please transfer the migration directory, and all its contents, to a non-networked VM (vault), using qvm-copy-to-vm.") + print("Then, transfer the directory to a LUKS-encrypted transfer device.") + print("Important: at the end of this migration process, wipe and reformat or destroy that drive.") + + def all(self): + """ + Gathers: + - config.json, sd-journalist.sec from /usr/share/securedrop-workstation-dom0-config + - secret keys from sd-gpg + - .securedrop_client directory (compressed) from sd-app, if dom0 disk space permits + + Checks: + - Fingerprint in config.json matches a secret key from sd-gpg + - Fingerprint in config.json matches key exported from sd-gpg into dom0 (successful export) + - sd-app .securedrop_client directory successfully imported into dom0 + + Prints success message if backup has completed successfully, otherwise provides information about + which part of the process failed. + """ + self._intro() + self._create_migration_dir() + self._dom0() + self._sd_gpg() + # self._sd_app() + + if BackupStatus.INCOMPLETE not in self.backup_status.values(): + print("Credentials and secrets have been collected successfully in the 'migration' directory.") + print("This is not a system backup! Only SecureDrop-Workstation specific configuration files and dom0 files in /etc/qubes have been preserved.") + else: + print("Failed to gather all required assets - additional steps required.") + print("You will need to manually add the missing files to the 'migration' directory.") + for entry in self.backup_status.keys(): + print(f"{entry.name}: [success, no action required]") if self.backup_status[entry] == BackupStatus.COMPLETE else print(f"{entry.name}: {entry.hint}") + + self._print_final_instructions() + +if __name__ == "__main__": + BackupConfig().all() diff --git a/sdw_backup/sdw_admin.py b/sdw_backup/sdw_admin.py new file mode 100755 index 00000000..4f56c80b --- /dev/null +++ b/sdw_backup/sdw_admin.py @@ -0,0 +1,173 @@ +#!/usr/bin/python3 +""" +Admin wrapper script for applying salt states for staging and prod scenarios. The rpm +packages only puts the files in place `/srv/salt` but does not apply the state, nor +does it handle the config. +""" + +import os +import subprocess +import sys +from enum import Enum +from typing import List +from PyQt5.QtCore import QObject, pyqtSignal + + +SCRIPTS_PATH = "/usr/share/securedrop-workstation-dom0-config/" +SALT_PATH = "/srv/salt/securedrop_salt/" + +sys.path.insert(1, os.path.join(SCRIPTS_PATH, "scripts/")) +from validate_config import SDWConfigValidator, ValidationError # noqa: E402 + +class SDWAdminException(Exception): + pass + +class ConfigStatus(Enum): + CONNECT_SUCCESS = "CONNECT_SUCCESS", + CONNECT_ERROR = "CONNECT_ERROR", + VALIDATE_SUCCESS = "VALIDATE_SUCCESS", + VALIDATE_ERROR = "VALIDATE_ERROR", + APPLY_SUCCESS = "APPLY_SUCCESS", + APPLY_ERROR = "APPLY_ERROR", + UNKNOWN_ERROR = "UNKNONW_ERROR" + +class SetupWorker(QObject): + # QObject if we want to port to qProcess + + # Emit setup state + state_changed = pyqtSignal(object) + + def __init__(self, parent: QObject | None = ...) -> None: + super().__init__(parent) + self.path_to_config = None + + def _install_pvh_support(self): + """ + Installs grub2-xen-pvh in dom0 - required for PVH with AppVM local kernels + TODO: install this via package requirements instead if possible + """ + try: + subprocess.check_call(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"]) + except subprocess.CalledProcessError: + raise SDWAdminException("Error installing grub2-xen-pvh: local PVH not available.") + + def _copy_config(self): + """ + Copies config.json and sd-journalist.sec to /srv/salt/securedrop_salt + """ + try: + subprocess.check_call(["sudo", "cp", os.path.join(SCRIPTS_PATH, "config.json"), SALT_PATH]) + subprocess.check_call( + ["sudo", "cp", os.path.join(SCRIPTS_PATH, "sd-journalist.sec"), SALT_PATH] + ) + except subprocess.CalledProcessError: + raise SDWAdminException("Error copying configuration") + + def _provision_all(self): + """ + Runs provision-all to apply the salt state.highstate on dom0 and all VMs + """ + # TODO: make this separate scripts + + try: + subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/provision-all")]) + except subprocess.CalledProcessError: + raise SDWAdminException("Error during provision-all") + + print("Provisioning complete. Please reboot to complete the installation.") + + + def _validate_config(self, path): + """ + Calls the validate_config script to validate the config present in the staging/prod directory + """ + try: + validator = SDWConfigValidator(path) # noqa: F841 + except ValidationError: + raise SDWAdminException("Error while validating configuration") + + + def _refresh_salt(self): + """ + Cleans the Salt cache and synchronizes Salt to ensure we are applying states + from the currently installed version + """ + try: + subprocess.check_call(["sudo", "rm", "-rf", "/var/cache/salt"]) + except subprocess.CalledProcessError: + raise SDWAdminException("Error while clearing Salt cache") + + try: + subprocess.check_call(["sudo", "qubesctl", "saltutil.sync_all", "refresh=true"]) + except subprocess.CalledProcessError: + raise SDWAdminException("Error while synchronizing Salt") + + + def _perform_uninstall(self): + try: + subprocess.check_call( + ["sudo", "qubesctl", "state.sls", "securedrop_salt.sd-clean-default-dispvm"] + ) + print("Destroying all VMs") + subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/destroy-vm"), "--all"]) + print("Reverting dom0 configuration") + subprocess.check_call(["sudo", "qubesctl", "state.sls", "securedrop_salt.sd-clean-all"]) + subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/clean-salt")]) + print("Uninstalling dom0 config package") + subprocess.check_call( + ["sudo", "dnf", "-y", "-q", "remove", "securedrop-workstation-dom0-config"] + ) + except subprocess.CalledProcessError: + raise SDWAdminException("Error during uninstall") + + print( + "Instance secrets (Journalist Interface token and Submission private key) are still " + "present on disk. You can delete them in /usr/share/securedrop-workstation-dom0-config" + ) + + def _check_euid(self): + if os.geteuid() == 0: + raise SDWAdminException("This wizard cannot be run as root.") + + def connect(self): + try: + available_vms = self.qubes_app.domains + for vm in available_vms: + if vm.name == "vault" and vm.netvm is None and vm.running: # todo + result = subprocess.run(["/usr/lib/qubes/qrexec-client", "-d", vm, "/etc/qubes-rpc/qubes.SelectDirectory"]) + self.path_to_config = result.decode() + self.state_changed.emit(ConfigStatus.CONNECT_SUCCESS) + except SDWAdminException: + self.state_changed.emit(ConfigStatus.CONNECT_ERROR) + + def validate(self): + try: + self._validate_config(SCRIPTS_PATH) + self.state_changed.emit(ConfigStatus.VALIDATE_SUCCESS) + except SDWAdminException: + self.state_changed.emit(ConfigStatus.VALIDATE_ERROR) + + def apply(self): + try: + self._validate_config(SCRIPTS_PATH) + self._install_pvh_support() + self._copy_config() + self._refresh_salt() + self._provision_all() + + self.state_changed.emit(ConfigStatus.APPLY_SUCCESS) + except SDWAdminException: + self.state_changed.emit(ConfigStatus.APPLY_ERROR) + + def uninstall(self): + try: + print( + "Uninstalling will remove all packages and destroy all VMs associated\n" + "with SecureDrop Workstation. It will also remove all SecureDrop tags\n" + "from other VMs on the system." + ) + self._refresh_salt() + self._perform_uninstall() + except SDWAdminException: + pass # we aren't supporting uninstall + \ No newline at end of file diff --git a/sdw_backup/sdw_config_app.py b/sdw_backup/sdw_config_app.py new file mode 100644 index 00000000..a0d37da8 --- /dev/null +++ b/sdw_backup/sdw_config_app.py @@ -0,0 +1,376 @@ + +import re +from enum import IntEnum +from typing import List + +from PyQt5.QtCore import Qt, pyqtSlot +from PyQt5.QtWidgets import QAbstractButton, QWidget, QWizard, QWizardPage, QPushButton, QMenu, QAction, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QRadioButton, QButtonGroup + +from sdw_admin import SetupWorker, SDWAdminException, ConfigStatus + +import qubesadmin + +class SetupPages(IntEnum): + PREAMBLE_PAGE = 0, + ERROR_PAGE = 1, + CONNECT_SECRETS_PAGE = 2, + # Optional: a Tails setup means both SVS+admin stick have the necessaries on them + CONNECT_TAILSCONFIG_PAGE = 3, + VALIDATE_PAGE = 4, + APPLY_PAGE = 5, + RESULT_PAGE_SUCCESS = 6, + +DISPLAY_TEXT = { + SetupPages.PREAMBLE_PAGE: ("SecureDrop Workstation Configuration", "Welcome to SDW Configuration Tool. This tool will guide you through installing SecureDrop Workstation."), + SetupPages.ERROR_PAGE: ("Setup Error", "Error during setup"), + SetupPages.CONNECT_SECRETS_PAGE: ("Connect Encrypted Storage", "Please connect encrypted storage device to a non-networked VM such as Vault"), + SetupPages.CONNECT_TAILSCONFIG_PAGE: ("Connect Encrypted Storage", "Please connect Tails Admin Workstation to a non-networked VM such as Vault"), + SetupPages.VALIDATE_PAGE: ("Validate Configuratin", "This will validate the configuration you provided"), + SetupPages.APPLY_PAGE: ("Install SecureDrop Workstation", "Applying Configuration (this will take some time...)"), + SetupPages.RESULT_PAGE_SUCCESS: ("Setup Complete", "Successfully configured. Please reboot now."), +} + +BASE_TEMPLATE = "debian-12-minimal" + +class SetupWizard(QWizard): + """ + SDW Setup Wizard. + """ + def __init__(self, worker: SetupWorker, parent: QWidget | None = ..., flags: Qt.WindowFlags | Qt.WindowType = ...) -> None: + super().__init__(parent, flags) + self._set_layout() + self._set_pages() + self.adjustSize() + self.qubes_app = qubesadmin.Qubes() + + # Connect handlers + self.worker.state_changed.connect(self.on_step_complete) + + # Connect buttons + self.next_button: QAbstractButton = self.button(QWizard.WizardButton.NextButton) + self.cancel_button: QAbstractButton = self.button(QWizard.WizardButton.CancelButton) + self.back_button: QAbstractButton = self.button(QWizard.WizardButton.BackButton) + self.finish_button: QAbstractButton = self.button(QWizard.WizardButton.FinishButton) + + self.next_button.clicked.connect(self.run_step) + + def _set_pages(self) -> None: + for page in SetupPages: + + # Set up and populate different pages of the wizard + self.setPage(page.value, SetupWizardPage(page)) + + def _set_layout(self) -> None: + title = f"SDW Configuration Tool" # todo + self.setWindowTitle(title) + self.setObjectName("SDW_Wizard") # Could use same styling as all other wizards eg + self.setModal(False) + self.setOptions( + QWizard.NoBackButtonOnLastPage + | QWizard.NoCancelButtonOnLastPage + | QWizard.NoBackButtonOnStartPage + ) + + @pyqtSlot() + def run_step(self) -> None: + page = self.currentPage() + self.next_button.setEnabled(False) + + # not sure about this + if isinstance(page, SetupWizardPage): + page.set_complete(False) + + if self.currentPage == SetupPages.CONNECT_SECRETS_PAGE: + self.worker.connect() + elif self.currentPage == SetupPages.VALIDATE_PAGE: + self.worker.validate() + elif self.currentPage == SetupPages.APPLY_PAGE: + self.worker.apply() + + @pyqtSlot(object) + def on_step_complete(self, status: ConfigStatus): + self.status = status + self.next_button.setEnabled(True) + + # Confirm + self.currentPage().next() + + @pyqtSlot(object) + def on_step_error(self, err: SDWAdminException): + pass # TODO + + +class SetupWizardPage(QWizardPage): + """ + A page in the SDW Setup wizard. + """ + NO_MARGIN = 0 + + def __init__(self, page_type: SetupPages, parent: QWidget | None = ...) -> None: + super().__init__(parent) + self.page_type = page_type + self.status = None + # By default, pages can't advance without the set_complete method being called + self._is_complete = False + self._build_layout() + self.header_text = DISPLAY_TEXT.get(page_type)[0] # TODO + self.body_text = DISPLAY_TEXT.get(page_type)[1] # TODO + self._layout = self._build_layout() + self.setLayout(self._layout) + + def _build_layout(self) -> QVBoxLayout: + """ + Create parent layout, draw elements, return parent layout. + """ + self.setObjectName("SDW_Setup_Page") + + parent_layout = QVBoxLayout(self) + #parent_layout.setContentsMargins(self.MARGIN, self.MARGIN, self.MARGIN, self.MARGIN) + + # Header for icon and task title + header_container = QWidget() + header_container_layout = QHBoxLayout() + header_container.setLayout(header_container_layout) + header_container.setContentsMargins( + self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN + ) + + header = QLabel() + header.setObjectName("QWizard_header") + header_container_layout.addWidget(header, alignment=Qt.AlignCenter) + header_container_layout.addStretch() + header_line = QWidget() + header_line.setObjectName("QWizard_header_line") # todo + + # Body to display instructions and forms + body = QLabel() + body.setObjectName("QWizard_body") + body.setWordWrap(True) + body.setScaledContents(True) + + body_container = QWidget() + body_layout = QVBoxLayout() + body_layout.setContentsMargins( + self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN + ) + body_container.setLayout(body_layout) + body_layout.addWidget(body) + + # Widget for displaying error messages (hidden by default) + self.error_details = QLabel() + self.error_details.setObjectName("QWizard_error_details") + self.error_details.setWordWrap(True) + self.error_details.hide() + + # Populate text content + header.setText(self.header_text) + body.setText(self.body_text) + + # Add all the layout elements + parent_layout.addWidget(header_container) + parent_layout.addWidget(header_line) + parent_layout.addWidget(body_container) + parent_layout.addWidget(self.error_details) + parent_layout.addStretch() + + return parent_layout + + def nextId(self) -> int: + if self.wizard(): + status = self.wizard().status + if status in [ConfigStatus.APPLY_ERROR, ConfigStatus.CONNECT_ERROR, ConfigStatus.VALIDATE_ERROR]: + return SetupPages.ERROR_PAGE + elif self.page_type == SetupPages.PREAMBLE_PAGE: + return SetupPages.CONNECT_SECRETS_PAGE + + return super().nextId() + + def isComplete(self): + """ + Overrides builtin method, caps case intentional + """ + return super().isComplete() and self._is_complete + + def set_complete(self, is_complete: bool): + self._is_complete = is_complete + +class SelectDeviceTypePage(SetupWizardPage): + """ + Wizard page that lets the user select either + Tails or SDW backup for importing configuration. + """ + def __init__(self, page_type: SetupPages, parent: QWidget | None = ...) -> None: + super().__init__(page_type, parent) + + def _build_layout(self) -> QVBoxLayout: + layout = super()._build_layout() + + choose_storage_type_layout = QHBoxLayout() + choose_header = QLineEdit() + choose_header.setText("IMPORT CONFIGURATION DETAILS") + + choose_radio_group = QButtonGroup() + self.button_tails = QRadioButton() + self.button_tails.setText("FROM TAILS") + self.button_backup = QRadioButton() + self.button_backup.setText("FROM A CONFIGURATION BACKUP") + + choose_radio_group.addButton(self.button_tails) + choose_radio_group.addButton(self.button_backup) + + choose_radio_group.buttonClicked.connect(self._on_radio_selection) + + choose_storage_type_layout.addItem(choose_header) + choose_storage_type_layout.addStretch() + choose_storage_type_layout.addItem(choose_radio_group) + choose_storage_type_layout.addStretch() + + layout.insertWidget(1, choose_storage_type_layout) + return layout + + def _on_radio_selection(self): + """ + The RadioGroup enforces that one button must be selected. + For now, just register the Tails button. If more than two + options are offered in future, all fields should be registered. + """ + self.registerField("button_tails*", self.button_tails) + + # Once a selection is made, this screen is complete + self.set_complete(True) + +class DirectoryPickerPage(SetupWizardPage): + def __init__(self, parent: QWidget | None = ...) -> None: + super().__init__(parent) + self.vm = None # Holds selected vm where USB is attached. + + def _build_layout(self) -> QVBoxLayout: + layout = super()._build_layout() + + # VM chooser + choose_vm_layout = QHBoxLayout() + choose_vm_label = QLabel() + choose_vm_label.setText("VM where storage device is mounted") + choose_vm_menu = QMenu() + vms = self._get_running_vms() + for entry in vms: + choose_vm_menu.addAction(self._menu_action(entry)) + + choose_vm_layout.addItem(choose_vm_label) + choose_vm_layout.addItem(choose_vm_menu) + + # Directory + storage_dir_form = QWidget() + storage_dir_form.setObjectName("QWizard_passphrase_form") + storage_dir_form_layout = QVBoxLayout() + storage_dir_form_layout.setContentsMargins( + self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN + ) + storage_dir_form.setLayout(storage_dir_form_layout) + storage_dir_label = ("Select Storage Directory") + + storage_dir_and_overflow = QHBoxLayout() + self.storage_dir = QLineEdit() + + self.button_select_storage_dir = QPushButton() + self.button_select_storage_dir.setText("...") # TODO - icon instead + self.button_select_storage_dir.clicked.connect(self.get_path_from_vm) + + storage_dir_and_overflow.addItem(self.storage_dir) + storage_dir_and_overflow.addItem(self.button_select_storage_dir) + + storage_dir_form_layout.addWidget(storage_dir_label) + storage_dir_form_layout.addWidget(storage_dir_and_overflow) + + layout.insertWidget(1, choose_vm_layout) + layout.insertWidget(2, storage_dir_form) + return layout + + def _menu_action(self, vm) -> QAction: + action = QAction() + action.setText(vm.name) + action.triggered.connect(lambda vm: self._on_item_clicked(vm)) + return action + + @pyqtSlot() + def _on_item_clicked(self, vm): + self.vm = vm + + def _get_running_vms(self): + vms = [] + available_vms = self.qubes_app.domains + for vm in available_vms: + if vm.name == "vault" and vm.netvm is None and vm.running: # todo + vms.append(vm) + return vms + + # From Qubes manager.py + def get_path_from_vm(self): + """ + Displays a file/directory selection window for the given VM. + + :param vm: vm from which to select path + :return: path to file, checked for validity + """ + vm = self.target_vm + + path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() ?-]*") + path_max_len = 512 + + if not vm: + return None + stdout, _stderr = vm.run_service_for_stdio("qubes.SelectDirectory") + + stdout = stdout.strip() + + untrusted_path = stdout.decode(encoding='ascii')[:path_max_len] + + if untrusted_path and path_re.fullmatch(untrusted_path): + assert '../' not in untrusted_path + assert '\0' not in untrusted_path + self.storage_dir.setText(untrusted_path.strip()) + self.set_complete(True) + + self.registerField("storage_dir_secrets*", self.storage_dir) + + raise ValueError("Unexpected characters in path.") + +class ConfirmationPage(SetupWizardPage): + def __init__(self, page_type: SetupPages, parent: QWidget | None = ...) -> None: + body_text = self._confirm_running_vms() + super().__init__(page_type, parent=parent) + self.set_complete(True) + + def _build_layout(self): + layout = super()._build_layout() + + def _confirm_running_vms(self): + body_text = ( + "SecureDrop Workstation should always be installed on a fresh Qubes OS install. " + "The installation process will overwrite any user modifications to the " + f"{BASE_TEMPLATE} TemplateVM, and will disable old-format qubes-rpc " + "policy directives.\n" + ) + affected_appvms = self._get_appvms_for_template(BASE_TEMPLATE) + if len(affected_appvms) > 0: + body_text += ( + f"{BASE_TEMPLATE} is already in use by the following AppVMS:\n" + f"{affected_appvms}\n" + "Applications and configurations in use by these AppVMs will be\n" + f"removed from {BASE_TEMPLATE}." + ) + + return body_text + + def _get_appvms_for_template(self, vm_name: str) -> List[str]: + """ + Return a list of AppVMs that use the specified VM as a template + """ + try: + template_vm = self.qubes_app.domains[vm_name] + except KeyError: + # No VM implies no appvms, return an empty list + # (The template may just not be installed yet) + return [] + return [x.name for x in list(template_vm.appvms)] + diff --git a/sdw_backup/validate_config.py b/sdw_backup/validate_config.py new file mode 100755 index 00000000..bf87f455 --- /dev/null +++ b/sdw_backup/validate_config.py @@ -0,0 +1,154 @@ +#!/usr/bin/python3 +""" +Utility to verify that SecureDrop Workstation config is properly structured. +Checks for +""" + +import json +import os +import re +import subprocess +import sys +import tempfile + +from qubesadmin import Qubes + +TOR_V3_HOSTNAME_REGEX = r"^[a-z2-7]{56}\.onion$" +TOR_V3_AUTH_REGEX = r"^[A-Z2-7]{52}$" + +# CONFIG_FILEPATH = "/srv/salt/securedrop_salt/config.json" +CONFIG_FILEPATH = "config.json" +SECRET_KEY_FILEPATH = "sd-journalist.sec" + + +class ValidationError(Exception): + pass + + +class SDWConfigValidator: + def __init__(self, config_base_dir=None): + if config_base_dir: + self.config_filepath = os.path.join(config_base_dir, CONFIG_FILEPATH) + self.secret_key_filepath = os.path.join(config_base_dir, SECRET_KEY_FILEPATH) + else: + self.config_filepath = CONFIG_FILEPATH + self.secret_key_filepath = SECRET_KEY_FILEPATH + self.confirm_config_file_exists() + self.config = self.read_config_file() + self.confirm_onion_config_valid() + self.confirm_submission_privkey_file() + self.confirm_submission_privkey_fingerprint() + self.confirm_environment_valid() + self.validate_existing_size() + + def confirm_config_file_exists(self): + if not os.path.exists(self.config_filepath): + raise ValidationError( + f"Config file does not exist: {self.config_filepath}. " + "Create from config.json.example" + ) + + def confirm_environment_valid(self): + """ + The 'environment' config item is required to determine + whether prod or dev URLs are used for installing packages. + """ + if "environment" not in self.config: + raise ValidationError + if self.config["environment"] not in ("prod", "dev", "staging"): + raise ValidationError(f"Invalid environment: {self.config['environment']}") + + def confirm_onion_config_valid(self): + """ + Only v3 onion services are supported. + """ + if "hidserv" not in self.config: + raise ValidationError('"hidserv" is not defined in config.json') + + # Verify the hostname + if "hostname" not in self.config["hidserv"]: + raise ValidationError("hidden service hostname is not defined in config.json") + if not re.match(TOR_V3_HOSTNAME_REGEX, self.config["hidserv"]["hostname"]): + raise ValidationError("Invalid hidden service hostname specified") + + # Verify the key + if "key" not in self.config["hidserv"]: + raise ValidationError("hidden service key is not defined in config.json") + if not re.match(TOR_V3_AUTH_REGEX, self.config["hidserv"]["key"]): + raise ValidationError("Invalid hidden service key specified") + + def confirm_submission_privkey_file(self): + """ + Import privkey into temporary keyring, to validate. + """ + if not os.path.exists(self.secret_key_filepath): + raise ValidationError(f"PGP secret key file not found: {self.secret_key_filepath}") + gpg_cmd = ["gpg", "--import", self.secret_key_filepath] + result = False + with tempfile.TemporaryDirectory() as d: + gpg_env = {"GNUPGHOME": d} + # Call out to gpg to confirm it's a valid keyfile + try: + subprocess.check_call( + gpg_cmd, env=gpg_env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL + ) + result = True + except subprocess.CalledProcessError: + # suppress error since "result" is checked next + pass + + if not result: + raise ValidationError(f"PGP secret key is not valid: {self.secret_key_filepath}") + + def confirm_submission_privkey_fingerprint(self): + if "submission_key_fpr" not in self.config: + raise ValidationError('"submission_key_fpr" is not defined in config.json') + if not re.match("^[a-fA-F0-9]{40}$", self.config["submission_key_fpr"]): + raise ValidationError("Invalid PGP key fingerprint specified") + gpg_cmd = ["gpg2", "--show-keys", self.secret_key_filepath] + try: + out = subprocess.check_output(gpg_cmd, stderr=subprocess.STDOUT).decode( + sys.stdout.encoding + ) + match = " {}".format(self.config["submission_key_fpr"]) + if not re.search(match, out): + raise ValidationError("Configured fingerprint does not match key!") + + except subprocess.CalledProcessError as e: + raise ValidationError(f"Key validation failed: {e.output.decode(sys.stdout.encoding)}") + + def read_config_file(self): + with open(self.config_filepath) as f: + return json.load(f) + + def validate_existing_size(self): + """This method checks for existing private volume size and new + values in the config.json""" + if "vmsizes" not in self.config: + raise ValidationError('Private volume sizes ("vmsizes") are not defined in config.json') + if "sd_app" not in self.config["vmsizes"]: + raise ValidationError("Private volume size of sd-app must be defined in config.json") + if "sd_log" not in self.config["vmsizes"]: + raise ValidationError("Private volume size of sd-log must be defined in config.json") + + if not isinstance(self.config["vmsizes"]["sd_app"], int): + raise ValidationError("Private volume size of sd-app must be an integer value.") + if not isinstance(self.config["vmsizes"]["sd_log"], int): + raise ValidationError("Private volume size of sd-log must be an integer value.") + + app = Qubes() + if "sd-app" in app.domains: + vm = app.domains["sd-app"] + vol = vm.volumes["private"] + if not (vol.size <= self.config["vmsizes"]["sd_app"] * 1024 * 1024 * 1024): + raise ValidationError("sd-app private volume is already bigger than configuration.") + + if "sd-log" in app.domains: + vm = app.domains["sd-log"] + vol = vm.volumes["private"] + if not (vol.size <= self.config["vmsizes"]["sd_log"] * 1024 * 1024 * 1024): + raise ValidationError("sd-log private volume is already bigger than configuration.") + + +if __name__ == "__main__": + validator = SDWConfigValidator()