diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 9e1ebc7b..6fe2c616 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,9 +32,11 @@ repos: [ "mypy==1.11.1", "types-PyYAML", - "types-requests", - "types-python-slugify", + "types-docker", + "types-paramiko", "types-psycopg2", + "types-python-slugify", + "types-requests", ] - repo: https://github.com/python-jsonschema/check-jsonschema rev: 0.28.6 diff --git a/pixl_core/pyproject.toml b/pixl_core/pyproject.toml index fa8b98bc..a37e2f72 100644 --- a/pixl_core/pyproject.toml +++ b/pixl_core/pyproject.toml @@ -7,24 +7,26 @@ readme = "README.md" requires-python = ">=3.11" classifiers = ["Programming Language :: Python :: 3"] dependencies = [ - "aio_pika==9.5.3", - "azure-identity==1.19.0", - "azure-keyvault==4.2.0", - "fastapi==0.115.6", - "jsonpickle==4.0.0", - "loguru==0.7.3", - "pandas==2.2.3", - "pika==1.3.2", - "psycopg2-binary==2.9.10", - "pyarrow==18.1.0", - "pydantic==2.10.3", - "python-decouple==3.8", - "python-slugify==8.0.4", - "PyYAML==6.0.2", - "requests==2.32.3", - "sqlalchemy==2.0.36", - "token-bucket==0.3.0", - "xnat==0.6.2", + "aio_pika==9.5.3", + "azure-identity==1.19.0", + "azure-keyvault==4.2.0", + "docker==7.1.0", + "fastapi==0.115.6", + "jsonpickle==4.0.0", + "loguru==0.7.3", + "pandas==2.2.3", + "paramiko==3.5.1", + "pika==1.3.2", + "psycopg2-binary==2.9.10", + "pyarrow==18.1.0", + "pydantic==2.10.3", + "python-decouple==3.8", + "python-slugify==8.0.4", + "PyYAML==6.0.2", + "requests==2.32.3", + "sqlalchemy==2.0.36", + "token-bucket==0.3.0", + "xnat==0.6.2", ] @@ -33,9 +35,7 @@ requires = ["hatchling>=1.0.0"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -dev-mode-dirs = [ - "src" -] +dev-mode-dirs = ["src"] [tool.pytest.ini_options] @@ -50,16 +50,16 @@ extend = "../ruff.toml" [tool.coverage.report] exclude_also = [ - "def __repr__", - "if self.debug:", - "if settings.DEBUG", - "except subprocess.CalledProcessError as exception:", - "raise AssertionError", - "raise NotImplementedError", - "if 0:", - "if __name__ == .__main__.:", - "if TYPE_CHECKING:", - "if typing.TYPE_CHECKING", - "class .*\\bProtocol\\):", - "@(abc\\.)?abstractmethod", + "def __repr__", + "if self.debug:", + "if settings.DEBUG", + "except subprocess.CalledProcessError as exception:", + "raise AssertionError", + "raise NotImplementedError", + "if 0:", + "if __name__ == .__main__.:", + "if TYPE_CHECKING:", + "if typing.TYPE_CHECKING", + "class .*\\bProtocol\\):", + "@(abc\\.)?abstractmethod", ] diff --git a/pixl_core/src/core/uploader/_sftp.py b/pixl_core/src/core/uploader/_sftp.py new file mode 100644 index 00000000..db26d74d --- /dev/null +++ b/pixl_core/src/core/uploader/_sftp.py @@ -0,0 +1,148 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Uploader subclass for SFTP.""" + +import os +from collections.abc import Generator +from contextlib import contextmanager +from typing import BinaryIO, Optional + +import paramiko +from loguru import logger + +from core.exports import ParquetExport +from core.uploader._orthanc import StudyTags, get_study_zip_archive +from core.uploader.base import Uploader + + +class SFTPUploader(Uploader): + """Upload strategy for an SFTP server.""" + + def __init__(self, project_slug: str, keyvault_alias: Optional[str]) -> None: + """Create instance of parent class""" + super().__init__(project_slug, keyvault_alias) + + def _set_config(self) -> None: + """Set the configuration for the SFTP uploader.""" + # Use the Azure KV alias as prefix if it exists, otherwise use the project name + az_prefix = self.keyvault_alias + az_prefix = az_prefix if az_prefix else self.project_slug + + # Get SFTP connection details from keyvault + self.host = self.keyvault.fetch_secret(f"{az_prefix}--sftp--host") + self.username = self.keyvault.fetch_secret(f"{az_prefix}--sftp--username") + self.password = self.keyvault.fetch_secret(f"{az_prefix}--sftp--password") + self.port = int(self.keyvault.fetch_secret(f"{az_prefix}--sftp--port")) + self.known_hosts_path = self.keyvault.fetch_secret(f"{az_prefix}--sftp--known-hosts-path") + + def _upload_dicom_image(self, study_id: str, study_tags: StudyTags) -> None: + """ + Upload DICOM image via SFTP. + + :param study_id: Orthanc Study ID + :param study_tags: Study tags containing metadata + """ + # Get DICOM zip archive from Orthanc + zip_content = get_study_zip_archive(study_id) + self.send_via_sftp( + zip_content, + study_tags.pseudo_anon_image_id, + remote_directory=self.project_slug, + ) + + def send_via_sftp( + self, zip_content: BinaryIO, pseudo_anon_image_id: str, remote_directory: str + ) -> None: + """Send the zip content to the SFTP server.""" + filename = f"{pseudo_anon_image_id}.zip" + + with self._connect_client() as sftp_client: + _sftp_create_remote_directory(sftp_client, remote_directory) + sftp_client.chdir(remote_directory) + sftp_client.putfo(zip_content, filename) + + def upload_parquet_files(self, parquet_export: ParquetExport, remote_directory: str) -> None: + """ + Upload parquet to SFTP under //parquet. + :param parquet_export: instance of the ParquetExport class + The final directory structure will look like this: + + ├── + │ └── parquet + │ ├── omop + │ │ └── public + │ │ └── PROCEDURE_OCCURRENCE.parquet + │ └── radiology + │ └── IMAGE_LINKER.parquet + ├── .zip + └── .zip + ... + """ + logger.info("Starting SFTP upload of files for '{}'", parquet_export.project_slug) + + source_root_dir = parquet_export.current_extract_base + source_files = [f for f in source_root_dir.rglob("*.parquet") if f.is_file()] + if not source_files: + msg = f"No files found in {source_root_dir}" + raise FileNotFoundError(msg) + + parquet_dir = f"{parquet_export.project_slug}/{parquet_export.extract_time_slug}/parquet" + upload_dir = f"{remote_directory}/{parquet_dir}" + with self._connect_client() as sftp_client: + _sftp_create_remote_directory(sftp_client, upload_dir) + for source_path in source_files: + sftp_client.chdir(None) # reset + sftp_client.chdir(upload_dir) + + source_rel_path = source_path.relative_to(source_root_dir) + source_rel_dir = source_rel_path.parent + source_filename_only = source_rel_path.relative_to(source_rel_dir) + _sftp_create_remote_directory(sftp_client, str(source_rel_dir)) + sftp_client.chdir(str(source_rel_dir)) + sftp_client.put(source_path, str(source_filename_only)) + + logger.info("Finished SFTP upload of files for '{}'", parquet_export.project_slug) + + @contextmanager + def _connect_client(self) -> Generator[paramiko.SFTPClient, None, None]: + """Connect to the SFTP client as a context manager""" + ssh_client = paramiko.SSHClient() + ssh_client.set_missing_host_key_policy(paramiko.RejectPolicy()) + ssh_client.load_host_keys(self.known_hosts_path) + ssh_client.connect( + self.host, port=self.port, username=self.username, password=self.password + ) + sftp_client = ssh_client.open_sftp() + try: + yield sftp_client + finally: + sftp_client.close() + ssh_client.close() + + +def _sftp_create_remote_directory(sftp_client: paramiko.SFTPClient, directory: str) -> None: + """ + Create remote directory and its parents if it doesn't exist. + + :param sftp_client: SFTP client instance + :param directory: Directory path to create + """ + try: + sftp_client.stat(directory) + except OSError: + parent_dir = os.path.dirname(directory) # noqa: PTH120 + _sftp_create_remote_directory(sftp_client, str(parent_dir)) + sftp_client.mkdir(directory) + logger.debug(f"Created remote directory: {directory}") diff --git a/pixl_core/tests/uploader/helpers/sftpserver.py b/pixl_core/tests/uploader/helpers/sftpserver.py new file mode 100644 index 00000000..f4f6cb25 --- /dev/null +++ b/pixl_core/tests/uploader/helpers/sftpserver.py @@ -0,0 +1,164 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import shutil +import tempfile +import time +from pathlib import Path +from typing import Optional + +import docker +import paramiko +from decouple import config +from loguru import logger + + +class SFTPServer: + """SFTP server running in a Docker container for testing""" + + def __init__(self, host_key_path: Path) -> None: + """Initialize the DockerSFTPServer""" + self.username = config("SFTP_USERNAME", default="testuser") + self.password = config("SFTP_PASSWORD", default="testpass") + self.port = int(config("SFTP_PORT", default=2222)) + self.docker_client: docker.DockerClient = docker.from_env() + self.host_key_path = host_key_path + self.container: Optional[docker.models.containers.Container] = None + self.mounted_upload_dir: Optional[Path] = None + + def start(self) -> dict: + """Start the SFTP server container""" + temp_dir = tempfile.mkdtemp() + + # Create users.conf for the SFTP server + users_conf = f"{self.username}:{self.password}:::upload" + users_conf_path = Path(temp_dir) / "users.conf" + users_conf_path.write_text(users_conf) + + self.mounted_upload_dir = Path(temp_dir) / "upload" + self.mounted_upload_dir.mkdir(parents=True, exist_ok=True) + + # Start container + self.container = self.docker_client.containers.run( + "atmoz/sftp:alpine", + command=f"{self.username}:{self.password}:::upload", + ports={"22/tcp": self.port}, + volumes={ + str(self.mounted_upload_dir): { + "bind": f"/home/{self.username}/upload", + "mode": "rw", + } + }, + detach=True, + remove=True, + ) + + # Wait for container to be ready and extract host keys + self._wait_for_server() + self._extract_host_keys() + + return { + "host": "localhost", + "port": self.port, + "username": self.username, + "password": self.password, + "upload_dir": self.mounted_upload_dir, + } + + def _wait_for_server(self, timeout: int = 30) -> None: + """Wait for SFTP server to be ready""" + start_time = time.time() + while time.time() - start_time < timeout: + try: + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # noqa: S507 + ssh.connect( + "localhost", + port=self.port, + username=self.username, + password=self.password, + timeout=5, + ) + sftp = ssh.open_sftp() + sftp.close() + ssh.close() + except (paramiko.SSHException, OSError, ConnectionError, docker.errors.NotFound) as e: + logger.info(f"Retrying SFTP connection: {e}") + time.sleep(1) + else: + return # Connection successful + + err_str = f"SFTP server did not start within {timeout} seconds" + self.stop() + raise TimeoutError(err_str) + + def _extract_host_keys(self) -> None: + """Extract host keys from the running container""" + if not self.container: + msg = "Container not started" + raise RuntimeError(msg) + + key_types = ["ssh_host_ed25519_key", "ssh_host_rsa_key", "ssh_host_ecdsa_key"] + host_keys = [] + + for key_type in key_types: + exit_code, output = self.container.exec_run(f"cat /etc/ssh/{key_type}.pub") + if exit_code == 0: + host_key_content = output.decode().strip() + host_keys.append(host_key_content) + + if not host_keys: + msg = "No host keys found in container" + raise RuntimeError(msg) + + # Create known_hosts file with all available keys + known_hosts_path = self.host_key_path / "known_hosts" + known_hosts_content = "" + + for host_key_content in host_keys: + parts = host_key_content.split() + if len(parts) >= 2: + key_type = parts[0] # e.g., "ssh-ed25519", "ssh-rsa" + key_data = parts[1] # base64 encoded key + known_hosts_content += f"[localhost]:{self.port} {key_type} {key_data}\n" + + known_hosts_path.write_text(known_hosts_content) + Path.chmod(known_hosts_path, 0o644) + + # Also save the first public key for reference + if host_keys: + public_key_path = self.host_key_path / "ssh_host_key.pub" + public_key_path.write_text(host_keys[0] + "\n") + Path.chmod(public_key_path, 0o644) + + def stop(self) -> None: + """Stop the SFTP server container""" + if self.container: + self.container.stop() + self.container = None + + if self.mounted_upload_dir: + shutil.rmtree(self.mounted_upload_dir, ignore_errors=True) + self.mounted_upload_dir = None + + def is_running(self) -> bool: + """Check if the SFTP server is running""" + if not self.container: + return False + try: + self.container.reload() + except docker.errors.NotFound: + return False + else: + return self.container.status == "running" diff --git a/pixl_core/tests/uploader/test_sftp.py b/pixl_core/tests/uploader/test_sftp.py new file mode 100644 index 00000000..797e434a --- /dev/null +++ b/pixl_core/tests/uploader/test_sftp.py @@ -0,0 +1,177 @@ +# Copyright (c) University College London Hospitals NHS Foundation Trust +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Test uploading files to an SFTP endpoint.""" + +import filecmp +import os +from collections.abc import Generator +from datetime import UTC, datetime +from pathlib import Path + +import pandas as pd +import pytest +from core.db.models import Image +from core.db.queries import update_exported_at +from core.exports import ParquetExport +from core.uploader._sftp import SFTPUploader +from pixl_core.tests.uploader.helpers.sftpserver import SFTPServer +from pydicom.uid import generate_uid + +TEST_DIR = Path(__file__).parents[1] +SFTP_UPLOAD_DIR = "upload" + +os.environ["SFTP_HOST"] = "localhost" +os.environ["SFTP_USERNAME"] = "testuser" +os.environ["SFTP_PASSWORD"] = "testpass" +os.environ["SFTP_PORT"] = "2222" + + +class MockSFTPUploader(SFTPUploader): + """Mock SFTPUploader for testing.""" + + def __init__(self, known_hosts_path: Path) -> None: + """Initialise the mock uploader with hardcoded values for SFTP config.""" + self.host = os.environ["SFTP_HOST"] + self.username = os.environ["SFTP_USERNAME"] + self.password = os.environ["SFTP_PASSWORD"] + self.port = int(os.environ["SFTP_PORT"]) + self.known_hosts_path = known_hosts_path + self.project_slug = "test-project" + + def _set_config(self) -> None: + """Override to avoid Azure Key Vault dependency in tests.""" + + +@pytest.fixture(scope="module") +def host_keys(tmp_path_factory) -> Path: + """Creates temporary directory for host keys (will be populated by server)""" + return tmp_path_factory.mktemp("host_keys") + + +@pytest.fixture(scope="module") +def sftp_server(host_keys) -> Generator[SFTPServer, None, None]: + """Return a running SFTP server container.""" + server = SFTPServer(host_keys) + server.start() + yield server + server.stop() + + +@pytest.fixture() +def sftp_uploader(host_keys) -> MockSFTPUploader: + """Return a MockSFTPUploader object.""" + return MockSFTPUploader(host_keys / "known_hosts") + + +@pytest.fixture() +def zip_content() -> Generator: + """Directory containing the test data for uploading to the sftp server.""" + test_zip_file = TEST_DIR / "data" / "public.zip" + with test_zip_file.open("rb") as file_content: + yield file_content + + +def test_send_via_sftp( + zip_content, not_yet_exported_dicom_image, sftp_uploader, sftp_server +) -> None: + """Tests that DICOM image can be uploaded to the correct location via SFTP""" + # ARRANGE + pseudo_anon_id = not_yet_exported_dicom_image.pseudo_study_uid + project_slug = "some-project-slug" + expected_output_file = sftp_server.mounted_upload_dir / project_slug / (pseudo_anon_id + ".zip") + + # The mock SFTP server requires files to be uploaded to the upload/ directory + remote_directory = f"{SFTP_UPLOAD_DIR}/{project_slug}" + + # ACT + sftp_uploader.send_via_sftp(zip_content, pseudo_anon_id, remote_directory) + + # ASSERT + assert expected_output_file.exists() + + +def test_update_exported_and_save(rows_in_session) -> None: + """Tests that the exported_at field is updated when a file is uploaded""" + # ARRANGE + expected_export_time = datetime.now(tz=UTC) + + # ACT + pseudo_study_uid = generate_uid(entropy_srcs=["not_yet_exported"]) + update_exported_at(pseudo_study_uid, expected_export_time) + new_row = ( + rows_in_session.query(Image) + .filter(Image.pseudo_study_uid == pseudo_study_uid) + .one() + ) + actual_export_time = new_row.exported_at.replace(tzinfo=UTC) + + # ASSERT + assert actual_export_time == expected_export_time + + +@pytest.fixture() +def parquet_export(export_dir) -> ParquetExport: + """ + Return a ParquetExport object. + + This fixture is deliberately not definied in conftest, because it imports the ParquetExport + class, which in turn loads the PixlConfig class, which in turn requres the PROJECT_CONFIGS_DIR + environment to be set. This environment variable is set in conftest, so the import needs to + happen after that. + """ + return ParquetExport( + project_name_raw="i-am-a-project", + extract_datetime=datetime.now(tz=UTC), + export_dir=export_dir, + ) + + +def test_upload_parquet(parquet_export, sftp_uploader, sftp_server) -> None: + """Tests that parquet files are uploaded to the correct location via SFTP""" + # ARRANGE + # Set up the mock server directory + parquet_export.copy_to_exports(Path(__file__).parents[3] / "test" / "resources" / "omop") + parquet_export.export_radiology_linker(pd.DataFrame(list("dummy"), columns=["D"])) + + # ACT + sftp_uploader.upload_parquet_files(parquet_export, SFTP_UPLOAD_DIR) + + # ASSERT + expected_public_parquet_dir = ( + sftp_server.mounted_upload_dir + / parquet_export.project_slug + / parquet_export.extract_time_slug + / "parquet" + ) + assert expected_public_parquet_dir.exists() + + # Print difference report to aid debugging (it doesn't actually assert anything) + dc = filecmp.dircmp(parquet_export.current_extract_base, expected_public_parquet_dir) + dc.report_full_closure() + assert ( + expected_public_parquet_dir / "omop" / "public" / "PROCEDURE_OCCURRENCE.parquet" + ).exists(), "Public PROCEDURE_OCCURRENCE.parquet file not found" + assert ( + expected_public_parquet_dir / "radiology" / "IMAGE_LINKER.parquet" + ).exists(), "Radiology IMAGE_LINKER.parquet file not found" + + +def test_no_export_to_upload(parquet_export, sftp_uploader, sftp_server) -> None: + """If there is nothing in the export directory, an exception is thrown""" + # ARRANGE + parquet_export.public_output.mkdir(parents=True, exist_ok=True) + + # ACT & ASSERT + with pytest.raises(FileNotFoundError): + sftp_uploader.upload_parquet_files(parquet_export, SFTP_UPLOAD_DIR)