diff --git a/sdw_backup/migrate_config.py b/sdw_backup/migrate_config.py
new file mode 100644
index 00000000..aa08242e
--- /dev/null
+++ b/sdw_backup/migrate_config.py
@@ -0,0 +1,255 @@
+#!/usr/bin/python3
+"""
+SecureDrop Workstation Qubes 4.1 -> 4.2 Migration helper.
+Meant to be run in dom0, this utility will collect credentials and assets for
+migration from various parts of your QubesOS system.
+
+At the end of this script the contents will be placed in a directory called "migration",
+that should be transferred to a LUKS-encrypted drive attached to a non-networked VM (eg vault).
+
+Note: dom0 in Qubes 4.1 uses Python 3.8.
+"""
+
+from enum import Enum
+import logging
+import os
+from pathlib import Path
+import shutil
+import subprocess
+import sys
+
+SDW_CONFIG_DIR = "/usr/share/securedrop-workstation-dom0-config"
+SDW_CONFIG_FILES_DOM0 = ["config.json", "sd-journalist.sec"]
+QUBES_DIR = "/etc/qubes"
+DOM0_MIN_FREE_SPACE_BUFFER_KILOBYTES = 512000 # 500MB
+
+logger = logging.getLogger(__name__)
+
+class BackupStatus(Enum):
+ COMPLETE = 0,
+ INCOMPLETE = 1,
+
+class BackupException(Exception):
+ pass
+
+class BackupTarget:
+ def __init__(self, name, hint):
+ self.name = name
+ self.hint = hint
+
+class BackupConfig:
+ def __init__(self) -> None:
+ super().__init__()
+ self.backup_status = {}
+ self.vms = [BackupTarget("dom0", "dom0 configuration files and /etc/qubes directory"), BackupTarget("sd-app", "/home/user/.securedrop_client directory"), BackupTarget("sd-gpg", "sd-gpg GPG private keys")]
+ for vm in self.vms:
+ self.backup_status[vm.name] = BackupStatus.INCOMPLETE
+ self.migration_dir = None
+ self.fingerprint = None
+
+ def _intro(self):
+ print("SecureDrop Workstation Qubes 4.1 -> 4.2 Migration helper")
+ print("This script is meant to be run in dom0.")
+ print("It will collect credentials and assets for migration from various parts of your QubesOS system.")
+ print("Trust, but verify! Review this script before running it.")
+
+ confirmation = input("Continue? (y/Y to continue, any key to quit)")
+ if confirmation.lower() != "y":
+ print("Aborting")
+ sys.exit(1)
+
+ def _create_migration_dir(self):
+ self.migration_dir = Path(Path.cwd(), "migration")
+ Path.mkdir(self.migration_dir)
+
+ def _qvm_run_io(self, vm, args: str, output_fd = None) -> str:
+ """
+ Run command in a given qube via qvm-run and return str-formatted output.
+ If output_fd, representing an file descriptor, is provided, write directly to the
+ output file. Otherwise write to stdout.
+ """
+ # qubesadmin.tools.qvm_run.main returns the exit code but prints the output to stderr,
+ # whereas we want to capture the output, so use subprocess
+ try:
+ return subprocess.check_output(["qvm-run", "--pass-io", vm, args]).decode()
+ except subprocess.CalledProcessError as e:
+ raise BackupException from e
+
+ def _dom0(self):
+ self.dom0_dir = Path(self.migration_dir, "dom0")
+ Path.mkdir(self.dom0_dir)
+
+ # /usr/share/securedrop-workstation-dom0-config
+ for file in SDW_CONFIG_FILES_DOM0:
+
+ shutil.copy(Path(SDW_CONFIG_DIR, file), self.dom0_dir)
+
+ # Copy dom0 /etc/qubes (such as /etc/qubes/policy.d) to include in a dom0 backup
+ shutil.copytree(QUBES_DIR, Path(self.dom0_dir, "etc-qubes"))
+
+ # Store the GPG fingerprint from config.json
+ with open(Path(self.dom0_dir, "config.json")) as f:
+ for line in f:
+ if "submission_key_fpr" in line.lower():
+ self.fingerprint = line.split()[1].strip()
+
+ if len(self.fingerprint) == 40:
+ self.backup_status["dom0"] = BackupStatus.COMPLETE
+
+ def _list_fingerprints(self, gpg_console_lines: str):
+ """
+ Helper to parse console output from `gpg` and return PGP fingerprints.
+ Used with the str-formatted output of `gpg -K`, equivalent to
+ `gpg -K --with-colons | grep "fpr" | cut -d: -f10` (and `wc -l`)
+ """
+ fingerprints = []
+ gpg_console_lines = self._qvm_run_io("sd-gpg", "gpg -K --with-colons").split()
+ for line in gpg_console_lines:
+ if "fpr" in line:
+ fingerprints.append(line.split(":")[9])
+
+ return fingerprints
+
+ def _sd_gpg(self):
+ """
+ Retrieve secret keys from sd-gpg, checking that at least one matches the fingerprint
+ in config.json.
+ """
+ sd_gpg_keys_name = "sd_secret_keys_armored.asc"
+ self.sd_gpg_dir = Path(self.migration_dir, "sd-gpg")
+ Path.mkdir(self.sd_gpg_dir)
+ remote_fprs = []
+ local_fprs = []
+
+ try:
+ # qvm-run --pass-io sd-gpg "gpg -K --with-colons | grep fpr | cut -d: -f10"
+ output = self._qvm_run_io("sd-gpg", "gpg -K --with-colons").split()
+
+ remote_fprs = self._list_fingerprints(output)
+ print(f"Found {len(remote_fprs)} key(s) to export from sd-gpg")
+
+ except BackupException as e:
+ logger.error(f"Failed to check sd-gpg keyring: {e}")
+
+ # qvm-run --pass-io sd-gpg 'gpg -a --export-secret-keys' > sd-gpg/sd_secret_keys_armored.asc
+ try:
+ export_keys = self._qvm_run_io("sd-gpg", "gpg -a --export-secret-keys")
+ with open(Path(self.sd_gpg_dir, sd_gpg_keys_name), "w+") as f:
+ f.write(export_keys)
+
+ except BackupException as ex:
+ logger.error(f"Failed to export sd-gpg keyring: {ex}")
+
+ # Create emphemeral gpg home directory and importing key(s) to check that import was
+ # well-formed.C lean up afterwards by removing this directory.
+ # Path.mkdir takes mode in decimal instead of octal. 0o700 = 448.
+ tmp_gpg_dir = Path(self.sd_gpg_dir, "tmp-gpg")
+ Path.mkdir(tmp_gpg_dir, mode=448)
+ gpg_args = ["gpg", "--homedir", str(tmp_gpg_dir), "--import", "-q", f"{self.sd_gpg_dir}/{sd_gpg_keys_name}"]
+ gpg_check_args = ["gpg", "--homedir", str(tmp_gpg_dir), "-K", "--with-colons"]
+
+ try:
+ subprocess.check_call(gpg_args)
+ gpg_output = subprocess.check_output(gpg_check_args).decode().split()
+
+ local_fprs = self._list_fingerprints(gpg_output)
+ print(f"Retrieved the following key(s):\n{local_fprs}")
+
+ except subprocess.CalledProcessError as err:
+ logger.error(f"Failed to recheck sd-gpg keys on dom0: {err}")
+
+ finally:
+ self._rm_gpgdir(tmp_gpg_dir)
+
+ if self.fingerprint in local_fprs and len(local_fprs) == len(remote_fprs):
+ self.backup_status["sd-gpg"] = BackupStatus.COMPLETE
+ elif self.fingerprint not in local_fprs:
+ print("Problem: Submission Key Fingerprint in config.json does not match any keys in sd-gpg.")
+ else:
+ print("Some keys may not have been imported successfully. Recheck sd-gpg keyring.")
+
+ def _rm_gpgdir(self, target):
+ """
+ Helper. Delete gpg directory, first attempting to shred files in private_keys_v1.d.
+ """
+ try:
+ if os.path.exists(target):
+ shutil.rmtree(target)
+ except OSError:
+ print(f"Problem: Could not delete {target}. Remove it manually.")
+
+ # def _sd_app(self):
+ # self.sd_app_dir = Path(self.migration_dir, "sd-app")
+ # Path.mkdir(self.sd_app_dir)
+ # # qvm-run --pass-io sd-app 'du -sh --block-size=1k /home/user/.securedrop_client' | cut -f1
+ # # kilobytes
+ # args = "du -sh --block-size=1k /home/user/.securedrop_client"
+
+ # try:
+ # data_size = self._qvm_run_io("sd-app", args).split()[0]
+ # print(f"{data_size} of uncompressed data on sd-app")
+
+ # # free_space_dom0=$(df -h /dev/mapper/qubes_dom0-root -k --output=avail | tail -n-1)
+ # # kilobytes
+ # dom0_args = ["df", "-h", "/dev/mapper/qubes_dom0-root", "-k", "--output=avail"]
+ # dom0_space = subprocess.check_output(dom0_args).decode().split()[1]
+
+ # # Small amoumts of coercion
+ # if int(dom0_space) - int(data_size) <= DOM0_MIN_FREE_SPACE_BUFFER_KILOBYTES:
+
+ # # We'd probably be fine; the backup will compress and we won't be cutting it this close.
+ # # But err on the side of caution and don't fill up dom0.
+ # print("Problem: /home/user.securedrop_client on sd-app is too large to transfer.")
+ # print("Please back up sd-app manually using the Qubes GUI backup tool and a strong backup passphrase.")
+ # print("For assistance, contact Support.")
+
+ # else:
+ # # qvm-run --pass-io sd-app 'tar -cz -C /home/user .securedrop_client' > sd-app/securedrop_client.tar.gz
+ # # This could be a lot of data, so write it to a file instead of stdout so as not to fill the pipe.
+ # with open(Path(self.sd_app_dir, "securedrop_client.tar.gz"), "w+") as archive:
+ # popen = subprocess.Popen(["qvm-run", "--pass-io", "sd-app", "tar -cz -C /home/user .securedrop_client"], stdout=archive)
+
+ # except (BackupException, subprocess.CalledProcessError):
+ # # todo
+ # pass
+
+ def _print_final_instructions(self):
+ print("You are responsible for preserving any of your own customizations, eg via the Qubes Backup tool.")
+ print("Please transfer the migration directory, and all its contents, to a non-networked VM (vault), using qvm-copy-to-vm.")
+ print("Then, transfer the directory to a LUKS-encrypted transfer device.")
+ print("Important: at the end of this migration process, wipe and reformat or destroy that drive.")
+
+ def all(self):
+ """
+ Gathers:
+ - config.json, sd-journalist.sec from /usr/share/securedrop-workstation-dom0-config
+ - secret keys from sd-gpg
+ - .securedrop_client directory (compressed) from sd-app, if dom0 disk space permits
+
+ Checks:
+ - Fingerprint in config.json matches a secret key from sd-gpg
+ - Fingerprint in config.json matches key exported from sd-gpg into dom0 (successful export)
+ - sd-app .securedrop_client directory successfully imported into dom0
+
+ Prints success message if backup has completed successfully, otherwise provides information about
+ which part of the process failed.
+ """
+ self._intro()
+ self._create_migration_dir()
+ self._dom0()
+ self._sd_gpg()
+ # self._sd_app()
+
+ if BackupStatus.INCOMPLETE not in self.backup_status.values():
+ print("Credentials and secrets have been collected successfully in the 'migration' directory.")
+ print("This is not a system backup! Only SecureDrop-Workstation specific configuration files and dom0 files in /etc/qubes have been preserved.")
+ else:
+ print("Failed to gather all required assets - additional steps required.")
+ print("You will need to manually add the missing files to the 'migration' directory.")
+ for entry in self.backup_status.keys():
+ print(f"{entry.name}: [success, no action required]") if self.backup_status[entry] == BackupStatus.COMPLETE else print(f"{entry.name}: {entry.hint}")
+
+ self._print_final_instructions()
+
+if __name__ == "__main__":
+ BackupConfig().all()
diff --git a/sdw_backup/sdw_admin.py b/sdw_backup/sdw_admin.py
new file mode 100755
index 00000000..4f56c80b
--- /dev/null
+++ b/sdw_backup/sdw_admin.py
@@ -0,0 +1,173 @@
+#!/usr/bin/python3
+"""
+Admin wrapper script for applying salt states for staging and prod scenarios. The rpm
+packages only puts the files in place `/srv/salt` but does not apply the state, nor
+does it handle the config.
+"""
+
+import os
+import subprocess
+import sys
+from enum import Enum
+from typing import List
+from PyQt5.QtCore import QObject, pyqtSignal
+
+
+SCRIPTS_PATH = "/usr/share/securedrop-workstation-dom0-config/"
+SALT_PATH = "/srv/salt/securedrop_salt/"
+
+sys.path.insert(1, os.path.join(SCRIPTS_PATH, "scripts/"))
+from validate_config import SDWConfigValidator, ValidationError # noqa: E402
+
+class SDWAdminException(Exception):
+ pass
+
+class ConfigStatus(Enum):
+ CONNECT_SUCCESS = "CONNECT_SUCCESS",
+ CONNECT_ERROR = "CONNECT_ERROR",
+ VALIDATE_SUCCESS = "VALIDATE_SUCCESS",
+ VALIDATE_ERROR = "VALIDATE_ERROR",
+ APPLY_SUCCESS = "APPLY_SUCCESS",
+ APPLY_ERROR = "APPLY_ERROR",
+ UNKNOWN_ERROR = "UNKNONW_ERROR"
+
+class SetupWorker(QObject):
+ # QObject if we want to port to qProcess
+
+ # Emit setup state
+ state_changed = pyqtSignal(object)
+
+ def __init__(self, parent: QObject | None = ...) -> None:
+ super().__init__(parent)
+ self.path_to_config = None
+
+ def _install_pvh_support(self):
+ """
+ Installs grub2-xen-pvh in dom0 - required for PVH with AppVM local kernels
+ TODO: install this via package requirements instead if possible
+ """
+ try:
+ subprocess.check_call(["sudo", "qubes-dom0-update", "-y", "-q", "grub2-xen-pvh"])
+ except subprocess.CalledProcessError:
+ raise SDWAdminException("Error installing grub2-xen-pvh: local PVH not available.")
+
+ def _copy_config(self):
+ """
+ Copies config.json and sd-journalist.sec to /srv/salt/securedrop_salt
+ """
+ try:
+ subprocess.check_call(["sudo", "cp", os.path.join(SCRIPTS_PATH, "config.json"), SALT_PATH])
+ subprocess.check_call(
+ ["sudo", "cp", os.path.join(SCRIPTS_PATH, "sd-journalist.sec"), SALT_PATH]
+ )
+ except subprocess.CalledProcessError:
+ raise SDWAdminException("Error copying configuration")
+
+ def _provision_all(self):
+ """
+ Runs provision-all to apply the salt state.highstate on dom0 and all VMs
+ """
+ # TODO: make this separate scripts
+
+ try:
+ subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/provision-all")])
+ except subprocess.CalledProcessError:
+ raise SDWAdminException("Error during provision-all")
+
+ print("Provisioning complete. Please reboot to complete the installation.")
+
+
+ def _validate_config(self, path):
+ """
+ Calls the validate_config script to validate the config present in the staging/prod directory
+ """
+ try:
+ validator = SDWConfigValidator(path) # noqa: F841
+ except ValidationError:
+ raise SDWAdminException("Error while validating configuration")
+
+
+ def _refresh_salt(self):
+ """
+ Cleans the Salt cache and synchronizes Salt to ensure we are applying states
+ from the currently installed version
+ """
+ try:
+ subprocess.check_call(["sudo", "rm", "-rf", "/var/cache/salt"])
+ except subprocess.CalledProcessError:
+ raise SDWAdminException("Error while clearing Salt cache")
+
+ try:
+ subprocess.check_call(["sudo", "qubesctl", "saltutil.sync_all", "refresh=true"])
+ except subprocess.CalledProcessError:
+ raise SDWAdminException("Error while synchronizing Salt")
+
+
+ def _perform_uninstall(self):
+ try:
+ subprocess.check_call(
+ ["sudo", "qubesctl", "state.sls", "securedrop_salt.sd-clean-default-dispvm"]
+ )
+ print("Destroying all VMs")
+ subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/destroy-vm"), "--all"])
+ print("Reverting dom0 configuration")
+ subprocess.check_call(["sudo", "qubesctl", "state.sls", "securedrop_salt.sd-clean-all"])
+ subprocess.check_call([os.path.join(SCRIPTS_PATH, "scripts/clean-salt")])
+ print("Uninstalling dom0 config package")
+ subprocess.check_call(
+ ["sudo", "dnf", "-y", "-q", "remove", "securedrop-workstation-dom0-config"]
+ )
+ except subprocess.CalledProcessError:
+ raise SDWAdminException("Error during uninstall")
+
+ print(
+ "Instance secrets (Journalist Interface token and Submission private key) are still "
+ "present on disk. You can delete them in /usr/share/securedrop-workstation-dom0-config"
+ )
+
+ def _check_euid(self):
+ if os.geteuid() == 0:
+ raise SDWAdminException("This wizard cannot be run as root.")
+
+ def connect(self):
+ try:
+ available_vms = self.qubes_app.domains
+ for vm in available_vms:
+ if vm.name == "vault" and vm.netvm is None and vm.running: # todo
+ result = subprocess.run(["/usr/lib/qubes/qrexec-client", "-d", vm, "/etc/qubes-rpc/qubes.SelectDirectory"])
+ self.path_to_config = result.decode()
+ self.state_changed.emit(ConfigStatus.CONNECT_SUCCESS)
+ except SDWAdminException:
+ self.state_changed.emit(ConfigStatus.CONNECT_ERROR)
+
+ def validate(self):
+ try:
+ self._validate_config(SCRIPTS_PATH)
+ self.state_changed.emit(ConfigStatus.VALIDATE_SUCCESS)
+ except SDWAdminException:
+ self.state_changed.emit(ConfigStatus.VALIDATE_ERROR)
+
+ def apply(self):
+ try:
+ self._validate_config(SCRIPTS_PATH)
+ self._install_pvh_support()
+ self._copy_config()
+ self._refresh_salt()
+ self._provision_all()
+
+ self.state_changed.emit(ConfigStatus.APPLY_SUCCESS)
+ except SDWAdminException:
+ self.state_changed.emit(ConfigStatus.APPLY_ERROR)
+
+ def uninstall(self):
+ try:
+ print(
+ "Uninstalling will remove all packages and destroy all VMs associated\n"
+ "with SecureDrop Workstation. It will also remove all SecureDrop tags\n"
+ "from other VMs on the system."
+ )
+ self._refresh_salt()
+ self._perform_uninstall()
+ except SDWAdminException:
+ pass # we aren't supporting uninstall
+
\ No newline at end of file
diff --git a/sdw_backup/sdw_config_app.py b/sdw_backup/sdw_config_app.py
new file mode 100644
index 00000000..a0d37da8
--- /dev/null
+++ b/sdw_backup/sdw_config_app.py
@@ -0,0 +1,376 @@
+
+import re
+from enum import IntEnum
+from typing import List
+
+from PyQt5.QtCore import Qt, pyqtSlot
+from PyQt5.QtWidgets import QAbstractButton, QWidget, QWizard, QWizardPage, QPushButton, QMenu, QAction, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QRadioButton, QButtonGroup
+
+from sdw_admin import SetupWorker, SDWAdminException, ConfigStatus
+
+import qubesadmin
+
+class SetupPages(IntEnum):
+ PREAMBLE_PAGE = 0,
+ ERROR_PAGE = 1,
+ CONNECT_SECRETS_PAGE = 2,
+ # Optional: a Tails setup means both SVS+admin stick have the necessaries on them
+ CONNECT_TAILSCONFIG_PAGE = 3,
+ VALIDATE_PAGE = 4,
+ APPLY_PAGE = 5,
+ RESULT_PAGE_SUCCESS = 6,
+
+DISPLAY_TEXT = {
+ SetupPages.PREAMBLE_PAGE: ("SecureDrop Workstation Configuration", "Welcome to SDW Configuration Tool. This tool will guide you through installing SecureDrop Workstation."),
+ SetupPages.ERROR_PAGE: ("Setup Error", "Error during setup"),
+ SetupPages.CONNECT_SECRETS_PAGE: ("Connect Encrypted Storage", "Please connect encrypted storage device to a non-networked VM such as Vault"),
+ SetupPages.CONNECT_TAILSCONFIG_PAGE: ("Connect Encrypted Storage", "Please connect Tails Admin Workstation to a non-networked VM such as Vault"),
+ SetupPages.VALIDATE_PAGE: ("Validate Configuratin", "This will validate the configuration you provided"),
+ SetupPages.APPLY_PAGE: ("Install SecureDrop Workstation", "Applying Configuration (this will take some time...)"),
+ SetupPages.RESULT_PAGE_SUCCESS: ("Setup Complete", "Successfully configured. Please reboot now."),
+}
+
+BASE_TEMPLATE = "debian-12-minimal"
+
+class SetupWizard(QWizard):
+ """
+ SDW Setup Wizard.
+ """
+ def __init__(self, worker: SetupWorker, parent: QWidget | None = ..., flags: Qt.WindowFlags | Qt.WindowType = ...) -> None:
+ super().__init__(parent, flags)
+ self._set_layout()
+ self._set_pages()
+ self.adjustSize()
+ self.qubes_app = qubesadmin.Qubes()
+
+ # Connect handlers
+ self.worker.state_changed.connect(self.on_step_complete)
+
+ # Connect buttons
+ self.next_button: QAbstractButton = self.button(QWizard.WizardButton.NextButton)
+ self.cancel_button: QAbstractButton = self.button(QWizard.WizardButton.CancelButton)
+ self.back_button: QAbstractButton = self.button(QWizard.WizardButton.BackButton)
+ self.finish_button: QAbstractButton = self.button(QWizard.WizardButton.FinishButton)
+
+ self.next_button.clicked.connect(self.run_step)
+
+ def _set_pages(self) -> None:
+ for page in SetupPages:
+
+ # Set up and populate different pages of the wizard
+ self.setPage(page.value, SetupWizardPage(page))
+
+ def _set_layout(self) -> None:
+ title = f"SDW Configuration Tool" # todo
+ self.setWindowTitle(title)
+ self.setObjectName("SDW_Wizard") # Could use same styling as all other wizards eg
+ self.setModal(False)
+ self.setOptions(
+ QWizard.NoBackButtonOnLastPage
+ | QWizard.NoCancelButtonOnLastPage
+ | QWizard.NoBackButtonOnStartPage
+ )
+
+ @pyqtSlot()
+ def run_step(self) -> None:
+ page = self.currentPage()
+ self.next_button.setEnabled(False)
+
+ # not sure about this
+ if isinstance(page, SetupWizardPage):
+ page.set_complete(False)
+
+ if self.currentPage == SetupPages.CONNECT_SECRETS_PAGE:
+ self.worker.connect()
+ elif self.currentPage == SetupPages.VALIDATE_PAGE:
+ self.worker.validate()
+ elif self.currentPage == SetupPages.APPLY_PAGE:
+ self.worker.apply()
+
+ @pyqtSlot(object)
+ def on_step_complete(self, status: ConfigStatus):
+ self.status = status
+ self.next_button.setEnabled(True)
+
+ # Confirm
+ self.currentPage().next()
+
+ @pyqtSlot(object)
+ def on_step_error(self, err: SDWAdminException):
+ pass # TODO
+
+
+class SetupWizardPage(QWizardPage):
+ """
+ A page in the SDW Setup wizard.
+ """
+ NO_MARGIN = 0
+
+ def __init__(self, page_type: SetupPages, parent: QWidget | None = ...) -> None:
+ super().__init__(parent)
+ self.page_type = page_type
+ self.status = None
+ # By default, pages can't advance without the set_complete method being called
+ self._is_complete = False
+ self._build_layout()
+ self.header_text = DISPLAY_TEXT.get(page_type)[0] # TODO
+ self.body_text = DISPLAY_TEXT.get(page_type)[1] # TODO
+ self._layout = self._build_layout()
+ self.setLayout(self._layout)
+
+ def _build_layout(self) -> QVBoxLayout:
+ """
+ Create parent layout, draw elements, return parent layout.
+ """
+ self.setObjectName("SDW_Setup_Page")
+
+ parent_layout = QVBoxLayout(self)
+ #parent_layout.setContentsMargins(self.MARGIN, self.MARGIN, self.MARGIN, self.MARGIN)
+
+ # Header for icon and task title
+ header_container = QWidget()
+ header_container_layout = QHBoxLayout()
+ header_container.setLayout(header_container_layout)
+ header_container.setContentsMargins(
+ self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN
+ )
+
+ header = QLabel()
+ header.setObjectName("QWizard_header")
+ header_container_layout.addWidget(header, alignment=Qt.AlignCenter)
+ header_container_layout.addStretch()
+ header_line = QWidget()
+ header_line.setObjectName("QWizard_header_line") # todo
+
+ # Body to display instructions and forms
+ body = QLabel()
+ body.setObjectName("QWizard_body")
+ body.setWordWrap(True)
+ body.setScaledContents(True)
+
+ body_container = QWidget()
+ body_layout = QVBoxLayout()
+ body_layout.setContentsMargins(
+ self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN
+ )
+ body_container.setLayout(body_layout)
+ body_layout.addWidget(body)
+
+ # Widget for displaying error messages (hidden by default)
+ self.error_details = QLabel()
+ self.error_details.setObjectName("QWizard_error_details")
+ self.error_details.setWordWrap(True)
+ self.error_details.hide()
+
+ # Populate text content
+ header.setText(self.header_text)
+ body.setText(self.body_text)
+
+ # Add all the layout elements
+ parent_layout.addWidget(header_container)
+ parent_layout.addWidget(header_line)
+ parent_layout.addWidget(body_container)
+ parent_layout.addWidget(self.error_details)
+ parent_layout.addStretch()
+
+ return parent_layout
+
+ def nextId(self) -> int:
+ if self.wizard():
+ status = self.wizard().status
+ if status in [ConfigStatus.APPLY_ERROR, ConfigStatus.CONNECT_ERROR, ConfigStatus.VALIDATE_ERROR]:
+ return SetupPages.ERROR_PAGE
+ elif self.page_type == SetupPages.PREAMBLE_PAGE:
+ return SetupPages.CONNECT_SECRETS_PAGE
+
+ return super().nextId()
+
+ def isComplete(self):
+ """
+ Overrides builtin method, caps case intentional
+ """
+ return super().isComplete() and self._is_complete
+
+ def set_complete(self, is_complete: bool):
+ self._is_complete = is_complete
+
+class SelectDeviceTypePage(SetupWizardPage):
+ """
+ Wizard page that lets the user select either
+ Tails or SDW backup for importing configuration.
+ """
+ def __init__(self, page_type: SetupPages, parent: QWidget | None = ...) -> None:
+ super().__init__(page_type, parent)
+
+ def _build_layout(self) -> QVBoxLayout:
+ layout = super()._build_layout()
+
+ choose_storage_type_layout = QHBoxLayout()
+ choose_header = QLineEdit()
+ choose_header.setText("IMPORT CONFIGURATION DETAILS")
+
+ choose_radio_group = QButtonGroup()
+ self.button_tails = QRadioButton()
+ self.button_tails.setText("FROM TAILS")
+ self.button_backup = QRadioButton()
+ self.button_backup.setText("FROM A CONFIGURATION BACKUP")
+
+ choose_radio_group.addButton(self.button_tails)
+ choose_radio_group.addButton(self.button_backup)
+
+ choose_radio_group.buttonClicked.connect(self._on_radio_selection)
+
+ choose_storage_type_layout.addItem(choose_header)
+ choose_storage_type_layout.addStretch()
+ choose_storage_type_layout.addItem(choose_radio_group)
+ choose_storage_type_layout.addStretch()
+
+ layout.insertWidget(1, choose_storage_type_layout)
+ return layout
+
+ def _on_radio_selection(self):
+ """
+ The RadioGroup enforces that one button must be selected.
+ For now, just register the Tails button. If more than two
+ options are offered in future, all fields should be registered.
+ """
+ self.registerField("button_tails*", self.button_tails)
+
+ # Once a selection is made, this screen is complete
+ self.set_complete(True)
+
+class DirectoryPickerPage(SetupWizardPage):
+ def __init__(self, parent: QWidget | None = ...) -> None:
+ super().__init__(parent)
+ self.vm = None # Holds selected vm where USB is attached.
+
+ def _build_layout(self) -> QVBoxLayout:
+ layout = super()._build_layout()
+
+ # VM chooser
+ choose_vm_layout = QHBoxLayout()
+ choose_vm_label = QLabel()
+ choose_vm_label.setText("VM where storage device is mounted")
+ choose_vm_menu = QMenu()
+ vms = self._get_running_vms()
+ for entry in vms:
+ choose_vm_menu.addAction(self._menu_action(entry))
+
+ choose_vm_layout.addItem(choose_vm_label)
+ choose_vm_layout.addItem(choose_vm_menu)
+
+ # Directory
+ storage_dir_form = QWidget()
+ storage_dir_form.setObjectName("QWizard_passphrase_form")
+ storage_dir_form_layout = QVBoxLayout()
+ storage_dir_form_layout.setContentsMargins(
+ self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN, self.NO_MARGIN
+ )
+ storage_dir_form.setLayout(storage_dir_form_layout)
+ storage_dir_label = ("Select Storage Directory")
+
+ storage_dir_and_overflow = QHBoxLayout()
+ self.storage_dir = QLineEdit()
+
+ self.button_select_storage_dir = QPushButton()
+ self.button_select_storage_dir.setText("...") # TODO - icon instead
+ self.button_select_storage_dir.clicked.connect(self.get_path_from_vm)
+
+ storage_dir_and_overflow.addItem(self.storage_dir)
+ storage_dir_and_overflow.addItem(self.button_select_storage_dir)
+
+ storage_dir_form_layout.addWidget(storage_dir_label)
+ storage_dir_form_layout.addWidget(storage_dir_and_overflow)
+
+ layout.insertWidget(1, choose_vm_layout)
+ layout.insertWidget(2, storage_dir_form)
+ return layout
+
+ def _menu_action(self, vm) -> QAction:
+ action = QAction()
+ action.setText(vm.name)
+ action.triggered.connect(lambda vm: self._on_item_clicked(vm))
+ return action
+
+ @pyqtSlot()
+ def _on_item_clicked(self, vm):
+ self.vm = vm
+
+ def _get_running_vms(self):
+ vms = []
+ available_vms = self.qubes_app.domains
+ for vm in available_vms:
+ if vm.name == "vault" and vm.netvm is None and vm.running: # todo
+ vms.append(vm)
+ return vms
+
+ # From Qubes manager.py
+ def get_path_from_vm(self):
+ """
+ Displays a file/directory selection window for the given VM.
+
+ :param vm: vm from which to select path
+ :return: path to file, checked for validity
+ """
+ vm = self.target_vm
+
+ path_re = re.compile(r"[a-zA-Z0-9/:.,_+=() ?-]*")
+ path_max_len = 512
+
+ if not vm:
+ return None
+ stdout, _stderr = vm.run_service_for_stdio("qubes.SelectDirectory")
+
+ stdout = stdout.strip()
+
+ untrusted_path = stdout.decode(encoding='ascii')[:path_max_len]
+
+ if untrusted_path and path_re.fullmatch(untrusted_path):
+ assert '../' not in untrusted_path
+ assert '\0' not in untrusted_path
+ self.storage_dir.setText(untrusted_path.strip())
+ self.set_complete(True)
+
+ self.registerField("storage_dir_secrets*", self.storage_dir)
+
+ raise ValueError("Unexpected characters in path.")
+
+class ConfirmationPage(SetupWizardPage):
+ def __init__(self, page_type: SetupPages, parent: QWidget | None = ...) -> None:
+ body_text = self._confirm_running_vms()
+ super().__init__(page_type, parent=parent)
+ self.set_complete(True)
+
+ def _build_layout(self):
+ layout = super()._build_layout()
+
+ def _confirm_running_vms(self):
+ body_text = (
+ "SecureDrop Workstation should always be installed on a fresh Qubes OS install. "
+ "The installation process will overwrite any user modifications to the "
+ f"{BASE_TEMPLATE} TemplateVM, and will disable old-format qubes-rpc "
+ "policy directives.\n"
+ )
+ affected_appvms = self._get_appvms_for_template(BASE_TEMPLATE)
+ if len(affected_appvms) > 0:
+ body_text += (
+ f"{BASE_TEMPLATE} is already in use by the following AppVMS:\n"
+ f"{affected_appvms}\n"
+ "Applications and configurations in use by these AppVMs will be\n"
+ f"removed from {BASE_TEMPLATE}."
+ )
+
+ return body_text
+
+ def _get_appvms_for_template(self, vm_name: str) -> List[str]:
+ """
+ Return a list of AppVMs that use the specified VM as a template
+ """
+ try:
+ template_vm = self.qubes_app.domains[vm_name]
+ except KeyError:
+ # No VM implies no appvms, return an empty list
+ # (The template may just not be installed yet)
+ return []
+ return [x.name for x in list(template_vm.appvms)]
+
diff --git a/sdw_backup/validate_config.py b/sdw_backup/validate_config.py
new file mode 100755
index 00000000..bf87f455
--- /dev/null
+++ b/sdw_backup/validate_config.py
@@ -0,0 +1,154 @@
+#!/usr/bin/python3
+"""
+Utility to verify that SecureDrop Workstation config is properly structured.
+Checks for
+"""
+
+import json
+import os
+import re
+import subprocess
+import sys
+import tempfile
+
+from qubesadmin import Qubes
+
+TOR_V3_HOSTNAME_REGEX = r"^[a-z2-7]{56}\.onion$"
+TOR_V3_AUTH_REGEX = r"^[A-Z2-7]{52}$"
+
+# CONFIG_FILEPATH = "/srv/salt/securedrop_salt/config.json"
+CONFIG_FILEPATH = "config.json"
+SECRET_KEY_FILEPATH = "sd-journalist.sec"
+
+
+class ValidationError(Exception):
+ pass
+
+
+class SDWConfigValidator:
+ def __init__(self, config_base_dir=None):
+ if config_base_dir:
+ self.config_filepath = os.path.join(config_base_dir, CONFIG_FILEPATH)
+ self.secret_key_filepath = os.path.join(config_base_dir, SECRET_KEY_FILEPATH)
+ else:
+ self.config_filepath = CONFIG_FILEPATH
+ self.secret_key_filepath = SECRET_KEY_FILEPATH
+ self.confirm_config_file_exists()
+ self.config = self.read_config_file()
+ self.confirm_onion_config_valid()
+ self.confirm_submission_privkey_file()
+ self.confirm_submission_privkey_fingerprint()
+ self.confirm_environment_valid()
+ self.validate_existing_size()
+
+ def confirm_config_file_exists(self):
+ if not os.path.exists(self.config_filepath):
+ raise ValidationError(
+ f"Config file does not exist: {self.config_filepath}. "
+ "Create from config.json.example"
+ )
+
+ def confirm_environment_valid(self):
+ """
+ The 'environment' config item is required to determine
+ whether prod or dev URLs are used for installing packages.
+ """
+ if "environment" not in self.config:
+ raise ValidationError
+ if self.config["environment"] not in ("prod", "dev", "staging"):
+ raise ValidationError(f"Invalid environment: {self.config['environment']}")
+
+ def confirm_onion_config_valid(self):
+ """
+ Only v3 onion services are supported.
+ """
+ if "hidserv" not in self.config:
+ raise ValidationError('"hidserv" is not defined in config.json')
+
+ # Verify the hostname
+ if "hostname" not in self.config["hidserv"]:
+ raise ValidationError("hidden service hostname is not defined in config.json")
+ if not re.match(TOR_V3_HOSTNAME_REGEX, self.config["hidserv"]["hostname"]):
+ raise ValidationError("Invalid hidden service hostname specified")
+
+ # Verify the key
+ if "key" not in self.config["hidserv"]:
+ raise ValidationError("hidden service key is not defined in config.json")
+ if not re.match(TOR_V3_AUTH_REGEX, self.config["hidserv"]["key"]):
+ raise ValidationError("Invalid hidden service key specified")
+
+ def confirm_submission_privkey_file(self):
+ """
+ Import privkey into temporary keyring, to validate.
+ """
+ if not os.path.exists(self.secret_key_filepath):
+ raise ValidationError(f"PGP secret key file not found: {self.secret_key_filepath}")
+ gpg_cmd = ["gpg", "--import", self.secret_key_filepath]
+ result = False
+ with tempfile.TemporaryDirectory() as d:
+ gpg_env = {"GNUPGHOME": d}
+ # Call out to gpg to confirm it's a valid keyfile
+ try:
+ subprocess.check_call(
+ gpg_cmd, env=gpg_env, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL
+ )
+ result = True
+ except subprocess.CalledProcessError:
+ # suppress error since "result" is checked next
+ pass
+
+ if not result:
+ raise ValidationError(f"PGP secret key is not valid: {self.secret_key_filepath}")
+
+ def confirm_submission_privkey_fingerprint(self):
+ if "submission_key_fpr" not in self.config:
+ raise ValidationError('"submission_key_fpr" is not defined in config.json')
+ if not re.match("^[a-fA-F0-9]{40}$", self.config["submission_key_fpr"]):
+ raise ValidationError("Invalid PGP key fingerprint specified")
+ gpg_cmd = ["gpg2", "--show-keys", self.secret_key_filepath]
+ try:
+ out = subprocess.check_output(gpg_cmd, stderr=subprocess.STDOUT).decode(
+ sys.stdout.encoding
+ )
+ match = " {}".format(self.config["submission_key_fpr"])
+ if not re.search(match, out):
+ raise ValidationError("Configured fingerprint does not match key!")
+
+ except subprocess.CalledProcessError as e:
+ raise ValidationError(f"Key validation failed: {e.output.decode(sys.stdout.encoding)}")
+
+ def read_config_file(self):
+ with open(self.config_filepath) as f:
+ return json.load(f)
+
+ def validate_existing_size(self):
+ """This method checks for existing private volume size and new
+ values in the config.json"""
+ if "vmsizes" not in self.config:
+ raise ValidationError('Private volume sizes ("vmsizes") are not defined in config.json')
+ if "sd_app" not in self.config["vmsizes"]:
+ raise ValidationError("Private volume size of sd-app must be defined in config.json")
+ if "sd_log" not in self.config["vmsizes"]:
+ raise ValidationError("Private volume size of sd-log must be defined in config.json")
+
+ if not isinstance(self.config["vmsizes"]["sd_app"], int):
+ raise ValidationError("Private volume size of sd-app must be an integer value.")
+ if not isinstance(self.config["vmsizes"]["sd_log"], int):
+ raise ValidationError("Private volume size of sd-log must be an integer value.")
+
+ app = Qubes()
+ if "sd-app" in app.domains:
+ vm = app.domains["sd-app"]
+ vol = vm.volumes["private"]
+ if not (vol.size <= self.config["vmsizes"]["sd_app"] * 1024 * 1024 * 1024):
+ raise ValidationError("sd-app private volume is already bigger than configuration.")
+
+ if "sd-log" in app.domains:
+ vm = app.domains["sd-log"]
+ vol = vm.volumes["private"]
+ if not (vol.size <= self.config["vmsizes"]["sd_log"] * 1024 * 1024 * 1024):
+ raise ValidationError("sd-log private volume is already bigger than configuration.")
+
+
+if __name__ == "__main__":
+ validator = SDWConfigValidator()