Skip to content
This repository has been archived by the owner on Jan 5, 2024. It is now read-only.

Support VeraCrypt export devices #126

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 69 additions & 33 deletions securedrop_export/disk/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def _get_dev_mapper_entries(self) -> List[str]:
"""
try:
ls = subprocess.check_output(["ls", "/dev/mapper/"], stderr=subprocess.PIPE)
cfm marked this conversation as resolved.
Show resolved Hide resolved
entries = ls.decode().rstrip().split("\n")
entries = ls.decode("utf-8").rstrip().split("\n")

return [r for r in entries if r not in _DEVMAPPER_SYSTEM]

Expand All @@ -322,49 +322,85 @@ def _get_dev_mapper_entries(self) -> List[str]:

def _attempt_get_unlocked_veracrypt_volume(self, device_name: str) -> MountedVolume:
"""
Looks for an already-unlocked VeraCrypt volume in /dev/mapper and see if the
device ID matches given device name.
Looks for an already-unlocked volume in /dev/mapper to see if the name matches
given device name.
Returns MountedVolume object if a drive is found. Otherwise, raises ExportException.
"""
try:
for entry in self._get_dev_mapper_entries():
res = subprocess.check_output(
[
"lsblk",
"--noheadings",
"-o",
"NAME,TYPE,MOUNTPOINT",
f"/dev/mapper/{entry}",
]
devmapper_entries = self._get_dev_mapper_entries()
for item in devmapper_entries:
# Check it out with cryptsetup, see if it's a VeraCrypt/TrueCrypt drive.
# Example format (some lines ommitted for brevity):
#
# b'/dev/mapper/vc is active and is in use.\n type: TCRYPT\n cipher:
# aes-xts-plain64\n keysize: 512 bits\n key location: dm-crypt\n device:
# /dev/sdc\n sector size: 512\noffset: 256 sectors\n size:
# 1968640 sectors\n skipped: 256 sectors\n mode: read/write\n'
#
# (A mapped entry can also have a null device, if it wasn't properly removed
# from /dev/mapper using `cryptsetup close`.)
status = (
subprocess.check_output(
["sudo", "cryptsetup", "status", f"/dev/mapper/{item}"]
cfm marked this conversation as resolved.
Show resolved Hide resolved
)
.decode("utf-8")
.split("\n ")
)
name, crypt_type, mountpoint = (
res.decode().rstrip().split()
) # Space-separated

# Notes for our future selves: there is also a `tcrypt-system` identifier.
# It *should* only be used for FDE with TrueCrypt, not for a non-bootable drive.
if crypt_type == "tcrypt" and name == device_name:
vol = Volume(
device_name=name,
mapped_name=entry,

logger.debug(f"{status}")

if "type: TCRYPT" in status and f"device: {device_name}" in status:
logger.info("Unlocked VeraCrypt volume detected")
volume = Volume(
device_name=device_name,
mapped_name=item,
encryption=EncryptionScheme.VERACRYPT,
)

# Is it mounted?
mountpoint = (
subprocess.check_output(
[
"lsblk",
f"/dev/mapper/{item}",
"--noheadings",
"-o",
cfm marked this conversation as resolved.
Show resolved Hide resolved
"MOUNTPOINT",
]
)
.decode()
.strip()
)
if mountpoint:
return MountedVolume.from_volume(vol, mountpoint)
# Note: Here we're accepting the user's choice of how they
# have mounted the drive, including whatever permissions/
# options they have set.
logger.info(f"Drive is already mounted at {mountpoint}")
return MountedVolume.from_volume(volume, mountpoint)
else:
return self._mount_at_mountpoint(vol, self._DEFAULT_MOUNTPOINT)
logger.info(
"Drive is not mounted; mounting at default mountpoint"
)

# If we got here, there is no unlocked VC drive present. Not an error, but not
# a state we can continue the workflow in, so raise ExportException.
logger.info("No unlocked Veracrypt drive found.")
raise ExportException(sdstatus=Status.UNKNOWN_DEVICE_DETECTED)
# Fixme: we can't reliably use chown as we do with luks+ext4,
# since we don't know what filesystem is inside the veracrypt container.
return self._mount_at_mountpoint(
volume, self._DEFAULT_MOUNTPOINT
)

# If we got here, there were no entries in `/dev/mapper/` for us to look at.
raise ExportException(sdstatus=Status.DEVICE_ERROR)
else: # somehow it didn't work. dump the device info for now.
# fixme: this isn't necessarily. an error
logger.error(f"Did not parse veracrypt drive from: {status}")

except subprocess.CalledProcessError as e:
logger.error(e)
raise ExportException(sdstatus=Status.DEVICE_ERROR)
# If we got here, there is no unlocked VC drive present. Not an error, but not
# a state we can continue the workflow in, so raise ExportException.
logger.info("No unlocked Veracrypt drive found.")
raise ExportException(sdstatus=Status.UNKNOWN_DEVICE_DETECTED)

except subprocess.CalledProcessError as ex:
logger.error("Encountered exception while checking /dev/mapper entries")
logger.debug(ex)
raise ExportException(sdstatus=Status.DEVICE_ERROR) from ex

def attempt_unlock_veracrypt(
self, volume: Volume, encryption_key: str
Expand Down
70 changes: 66 additions & 4 deletions tests/disk/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
_SAMPLE_OUTPUT_USB = b"/dev/sda" # noqa

_SAMPLE_LUKS_HEADER = b"\n\nUUID:\t123456-DEADBEEF" # noqa
_SAMPLE_CRYPTSETUP_STATUS_OUTPUT = b"/dev/mapper/vc is active and is in use.\n type: \
TCRYPT\n cipher: aes-xts-plain64\n keysize: 512 bits\n key location: dm-crypt\n device: \
/dev/sdc\n sector size: 512\n mode: read/write\n"
_SAMPLE_MOUNTED_VC_OUTPUT = b"/media/custom_mount"


class TestCli:
Expand Down Expand Up @@ -288,7 +292,7 @@ def test_unlock_luks_volume_luksOpen_exception(self, mocked_subprocess):
@mock.patch("os.path.exists", return_value=True)
@mock.patch("subprocess.check_output", return_value=b"\n")
@mock.patch("subprocess.check_call", return_value=0)
def testmount_volume(self, mocked_call, mocked_output, mocked_path):
def test_mount_volume(self, mocked_call, mocked_output, mocked_path):
vol = Volume(
device_name=_DEFAULT_USB_DEVICE_ONE_PART,
mapped_name=_PRETEND_LUKS_ID,
Expand All @@ -303,7 +307,9 @@ def testmount_volume(self, mocked_call, mocked_output, mocked_path):
"subprocess.check_output", return_value=b"/dev/pretend/luks-id-123456\n"
)
@mock.patch("subprocess.check_call", return_value=0)
def testmount_volume_already_mounted(self, mocked_output, mocked_call, mocked_path):
def test_mount_volume_already_mounted(
self, mocked_output, mocked_call, mocked_path
):
md = Volume(
device_name=_DEFAULT_USB_DEVICE_ONE_PART,
mapped_name=_PRETEND_LUKS_ID,
Expand All @@ -316,7 +322,7 @@ def testmount_volume_already_mounted(self, mocked_output, mocked_call, mocked_pa
@mock.patch("os.path.exists", return_value=True)
@mock.patch("subprocess.check_output", return_value=b"\n")
@mock.patch("subprocess.check_call", return_value=0)
def testmount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path):
def test_mount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path):
md = Volume(
device_name=_DEFAULT_USB_DEVICE_ONE_PART,
mapped_name=_PRETEND_LUKS_ID,
Expand All @@ -331,7 +337,7 @@ def testmount_volume_mkdir(self, mocked_output, mocked_subprocess, mocked_path):
"subprocess.check_call",
side_effect=subprocess.CalledProcessError(1, "check_call"),
)
def testmount_volume_error(self, mocked_subprocess, mocked_output):
def test_mount_volume_error(self, mocked_subprocess, mocked_output):
md = Volume(
device_name=_DEFAULT_USB_DEVICE_ONE_PART,
mapped_name=_PRETEND_LUKS_ID,
Expand Down Expand Up @@ -577,3 +583,59 @@ def test_mount_fails_with_locked_device(self):
self.cli.mount_volume(vol)

assert ex.value.sdstatus == Status.ERROR_MOUNT

@mock.patch("os.path.exists", return_value=True)
@mock.patch(
"subprocess.check_output",
side_effect=[b"vc", _SAMPLE_CRYPTSETUP_STATUS_OUTPUT, b""],
)
@mock.patch("subprocess.check_call")
def test_get_unlocked_veracrypt_unmounted(
self, mock_call, mock_subprocess, mock_os
):
vol = self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc")

assert vol.unlocked
assert vol.mountpoint == self.cli._DEFAULT_MOUNTPOINT

@mock.patch("os.path.exists", return_value=True)
@mock.patch(
"subprocess.check_output",
side_effect=[
b"vc",
_SAMPLE_CRYPTSETUP_STATUS_OUTPUT,
_SAMPLE_MOUNTED_VC_OUTPUT,
],
)
def test_get_unlocked_veracrypt_mounted(self, mock_subprocess, mock_os):
v = self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc")

assert v.unlocked
assert v.mountpoint == _SAMPLE_MOUNTED_VC_OUTPUT.decode("utf-8")

@mock.patch("os.path.exists", return_value=True)
@mock.patch(
"subprocess.check_output",
side_effect=subprocess.CalledProcessError(1, "Oh no!"),
)
def test_get_unlocked_veracrypt_lsblk_error(self, mock_subprocess, mock_os):
with pytest.raises(ExportException) as ex:
self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc")

assert ex.value.sdstatus == Status.DEVICE_ERROR

@mock.patch("os.path.exists", return_value=True)
@mock.patch("subprocess.check_output", return_value=b"sdc disk\n")
def test_get_unlocked_veracrypt_no_vc_drive(self, mock_subprocess, mock_os):
with pytest.raises(ExportException) as ex:
self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc")

assert ex.value.sdstatus == Status.UNKNOWN_DEVICE_DETECTED

@mock.patch("os.path.exists", return_value=True)
@mock.patch("subprocess.check_output", return_value=b"\n")
def test_get_unlocked_veracrypt_empty_lsblk_error(self, mock_subprocess, mock_os):
with pytest.raises(ExportException) as ex:
self.cli._attempt_get_unlocked_veracrypt_volume("/dev/sdc")

assert ex.value.sdstatus == Status.UNKNOWN_DEVICE_DETECTED