archinstall/archinstall/lib/luks.py

231 lines
6.4 KiB
Python

from __future__ import annotations
import shlex
from dataclasses import dataclass
from pathlib import Path
from subprocess import CalledProcessError
from archinstall.lib.disk.utils import get_lsblk_info
from .exceptions import DiskError, SysCallError
from .general import SysCommand, SysCommandWorker, generate_password, run
from .output import debug, info
@dataclass
class Luks2:
luks_dev_path: Path
mapper_name: str | None = None
password: str | None = None
key_file: Path | None = None
auto_unmount: bool = False
@property
def mapper_dev(self) -> Path | None:
if self.mapper_name:
return Path(f'/dev/mapper/{self.mapper_name}')
return None
def isLuks(self) -> bool:
try:
SysCommand(f'cryptsetup isLuks {self.luks_dev_path}')
return True
except SysCallError:
return False
def erase(self) -> None:
debug(f'Erasing luks partition: {self.luks_dev_path}')
worker = SysCommandWorker(f'cryptsetup erase {self.luks_dev_path}')
worker.poll()
worker.write(b'YES\n', line_ending=False)
def __post_init__(self) -> None:
if self.luks_dev_path is None:
raise ValueError('Partition must have a path set')
def __enter__(self) -> None:
self.unlock(self.key_file)
def __exit__(self, *args: str, **kwargs: str) -> None:
if self.auto_unmount:
self.lock()
def _password_bytes(self) -> bytes:
if not self.password:
raise ValueError('Password for luks2 device was not specified')
if isinstance(self.password, bytes):
return self.password
else:
return bytes(self.password, 'UTF-8')
def _get_passphrase_args(
self,
key_file: Path | None = None
) -> tuple[list[str], bytes | None]:
key_file = key_file or self.key_file
if key_file:
return ['--key-file', str(key_file)], None
return [], self._password_bytes()
def encrypt(
self,
key_size: int = 512,
hash_type: str = 'sha512',
iter_time: int = 10000,
key_file: Path | None = None
) -> Path | None:
debug(f'Luks2 encrypting: {self.luks_dev_path}')
key_file_arg, passphrase = self._get_passphrase_args(key_file)
cmd = [
'cryptsetup',
'--batch-mode',
'--verbose',
'--type', 'luks2',
'--pbkdf', 'argon2id',
'--hash', hash_type,
'--key-size', str(key_size),
'--iter-time', str(iter_time),
*key_file_arg,
'--use-urandom',
'luksFormat', str(self.luks_dev_path)
]
debug(f'cryptsetup format: {shlex.join(cmd)}')
try:
result = run(cmd, input_data=passphrase)
except CalledProcessError as err:
output = err.stdout.decode().rstrip()
raise DiskError(f'Could not encrypt volume "{self.luks_dev_path}": {output}')
debug(f'cryptsetup luksFormat output: {result.stdout.decode().rstrip()}')
self.key_file = key_file
return key_file
def _get_luks_uuid(self) -> str:
command = f'cryptsetup luksUUID {self.luks_dev_path}'
try:
return SysCommand(command).decode()
except SysCallError as err:
info(f'Unable to get UUID for Luks device: {self.luks_dev_path}')
raise err
def is_unlocked(self) -> bool:
return self.mapper_name is not None and Path(f'/dev/mapper/{self.mapper_name}').exists()
def unlock(self, key_file: Path | None = None) -> None:
"""
Unlocks the luks device, an optional key file location for unlocking can be specified,
otherwise a default location for the key file will be used.
:param key_file: An alternative key file
:type key_file: Path
"""
debug(f'Unlocking luks2 device: {self.luks_dev_path}')
if not self.mapper_name:
raise ValueError('mapper name missing')
key_file_arg, passphrase = self._get_passphrase_args(key_file)
cmd = [
'cryptsetup', 'open',
str(self.luks_dev_path),
str(self.mapper_name),
*key_file_arg,
'--type', 'luks2'
]
result = run(cmd, input_data=passphrase)
debug(f'cryptsetup open output: {result.stdout.decode().rstrip()}')
if not self.mapper_dev or not self.mapper_dev.is_symlink():
raise DiskError(f'Failed to open luks2 device: {self.luks_dev_path}')
def lock(self) -> None:
from archinstall.lib.disk.device_handler import device_handler
device_handler.umount(self.luks_dev_path)
# Get crypt-information about the device by doing a reverse lookup starting with the partition path
# For instance: /dev/sda
lsblk_info = get_lsblk_info(self.luks_dev_path)
# For each child (sub-partition/sub-device)
for child in lsblk_info.children:
# Unmount the child location
for mountpoint in child.mountpoints:
debug(f'Unmounting {mountpoint}')
device_handler.umount(mountpoint, recursive=True)
# And close it if possible.
debug(f"Closing crypt device {child.name}")
SysCommand(f"cryptsetup close {child.name}")
def create_keyfile(self, target_path: Path, override: bool = False) -> None:
"""
Routine to create keyfiles, so it can be moved elsewhere
"""
if self.mapper_name is None:
raise ValueError('Mapper name must be provided')
# Once we store the key as ../xyzloop.key systemd-cryptsetup can
# automatically load this key if we name the device to "xyzloop"
kf_path = Path(f'/etc/cryptsetup-keys.d/{self.mapper_name}.key')
key_file = target_path / kf_path.relative_to(kf_path.root)
crypttab_path = target_path / 'etc/crypttab'
if key_file.exists():
if not override:
info(f'Key file {key_file} already exists, keeping existing')
return
else:
info(f'Key file {key_file} already exists, overriding')
key_file.parent.mkdir(parents=True, exist_ok=True)
pwd = generate_password(length=512)
key_file.write_text(pwd)
key_file.chmod(0o400)
self._add_key(key_file)
self._crypttab(crypttab_path, kf_path, options=["luks", "key-slot=1"])
def _add_key(self, key_file: Path) -> None:
debug(f'Adding additional key-file {key_file}')
command = f'cryptsetup -q -v luksAddKey {self.luks_dev_path} {key_file}'
worker = SysCommandWorker(command)
pw_injected = False
while worker.is_alive():
if b'Enter any existing passphrase' in worker and pw_injected is False:
worker.write(self._password_bytes())
pw_injected = True
if worker.exit_code != 0:
raise DiskError(f'Could not add encryption key {key_file} to {self.luks_dev_path}: {worker.decode()}')
def _crypttab(
self,
crypttab_path: Path,
key_file: Path,
options: list[str]
) -> None:
debug(f'Adding crypttab entry for key {key_file}')
with open(crypttab_path, 'a') as crypttab:
opt = ','.join(options)
uuid = self._get_luks_uuid()
row = f"{self.mapper_name} UUID={uuid} {key_file} {opt}\n"
crypttab.write(row)