Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ repos:
[
"mypy==1.11.1",
"types-PyYAML",
"types-requests",
"types-python-slugify",
"types-docker",
"types-paramiko",
"types-psycopg2",
"types-python-slugify",
"types-requests",
]
- repo: https://github.com/python-jsonschema/check-jsonschema
rev: 0.28.6
Expand Down
66 changes: 33 additions & 33 deletions pixl_core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,26 @@ readme = "README.md"
requires-python = ">=3.11"
classifiers = ["Programming Language :: Python :: 3"]
dependencies = [
"aio_pika==9.5.3",
"azure-identity==1.19.0",
"azure-keyvault==4.2.0",
"fastapi==0.115.6",
"jsonpickle==4.0.0",
"loguru==0.7.3",
"pandas==2.2.3",
"pika==1.3.2",
"psycopg2-binary==2.9.10",
"pyarrow==18.1.0",
"pydantic==2.10.3",
"python-decouple==3.8",
"python-slugify==8.0.4",
"PyYAML==6.0.2",
"requests==2.32.3",
"sqlalchemy==2.0.36",
"token-bucket==0.3.0",
"xnat==0.6.2",
"aio_pika==9.5.3",
"azure-identity==1.19.0",
"azure-keyvault==4.2.0",
"docker==7.1.0",
"fastapi==0.115.6",
"jsonpickle==4.0.0",
"loguru==0.7.3",
"pandas==2.2.3",
"paramiko==3.5.1",
"pika==1.3.2",
"psycopg2-binary==2.9.10",
"pyarrow==18.1.0",
"pydantic==2.10.3",
"python-decouple==3.8",
"python-slugify==8.0.4",
"PyYAML==6.0.2",
"requests==2.32.3",
"sqlalchemy==2.0.36",
"token-bucket==0.3.0",
"xnat==0.6.2",
]


Expand All @@ -33,9 +35,7 @@ requires = ["hatchling>=1.0.0"]
build-backend = "hatchling.build"

[tool.hatch.build.targets.wheel]
dev-mode-dirs = [
"src"
]
dev-mode-dirs = ["src"]


[tool.pytest.ini_options]
Expand All @@ -50,16 +50,16 @@ extend = "../ruff.toml"

[tool.coverage.report]
exclude_also = [
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"except subprocess.CalledProcessError as exception:",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
"if typing.TYPE_CHECKING",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"except subprocess.CalledProcessError as exception:",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
"if typing.TYPE_CHECKING",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]
148 changes: 148 additions & 0 deletions pixl_core/src/core/uploader/_sftp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright (c) University College London Hospitals NHS Foundation Trust
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Uploader subclass for SFTP."""

import os
from collections.abc import Generator
from contextlib import contextmanager
from typing import BinaryIO, Optional

import paramiko
from loguru import logger

from core.exports import ParquetExport
from core.uploader._orthanc import StudyTags, get_study_zip_archive
from core.uploader.base import Uploader


class SFTPUploader(Uploader):
"""Upload strategy for an SFTP server."""

def __init__(self, project_slug: str, keyvault_alias: Optional[str]) -> None:
"""Create instance of parent class"""
super().__init__(project_slug, keyvault_alias)

def _set_config(self) -> None:
"""Set the configuration for the SFTP uploader."""
# Use the Azure KV alias as prefix if it exists, otherwise use the project name
az_prefix = self.keyvault_alias
az_prefix = az_prefix if az_prefix else self.project_slug

# Get SFTP connection details from keyvault
self.host = self.keyvault.fetch_secret(f"{az_prefix}--sftp--host")
self.username = self.keyvault.fetch_secret(f"{az_prefix}--sftp--username")
self.password = self.keyvault.fetch_secret(f"{az_prefix}--sftp--password")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably makes more sense to pass in a private SSH key here for authentication, instead of password.

self.port = int(self.keyvault.fetch_secret(f"{az_prefix}--sftp--port"))
self.known_hosts_path = self.keyvault.fetch_secret(f"{az_prefix}--sftp--known-hosts-path")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make more sense to pass in the host key itself, instead of a known_hosts file. On the other hand, the known_hosts file is what you get from the TRE.


def _upload_dicom_image(self, study_id: str, study_tags: StudyTags) -> None:
"""
Upload DICOM image via SFTP.

:param study_id: Orthanc Study ID
:param study_tags: Study tags containing metadata
"""
# Get DICOM zip archive from Orthanc
zip_content = get_study_zip_archive(study_id)
self.send_via_sftp(
zip_content,
study_tags.pseudo_anon_image_id,
remote_directory=self.project_slug,
)

def send_via_sftp(
self, zip_content: BinaryIO, pseudo_anon_image_id: str, remote_directory: str
) -> None:
"""Send the zip content to the SFTP server."""
filename = f"{pseudo_anon_image_id}.zip"

with self._connect_client() as sftp_client:
_sftp_create_remote_directory(sftp_client, remote_directory)
sftp_client.chdir(remote_directory)
sftp_client.putfo(zip_content, filename)

def upload_parquet_files(self, parquet_export: ParquetExport, remote_directory: str) -> None:
"""
Upload parquet to SFTP under <project name>/<extract datetime>/parquet.
:param parquet_export: instance of the ParquetExport class
The final directory structure will look like this:
<project-slug>
├── <extract_datetime_slug>
│ └── parquet
│ ├── omop
│ │ └── public
│ │ └── PROCEDURE_OCCURRENCE.parquet
│ └── radiology
│ └── IMAGE_LINKER.parquet
├── <pseudonymised_ID_DICOM_dataset_1>.zip
└── <pseudonymised_ID_DICOM_dataset_2>.zip
...
"""
logger.info("Starting SFTP upload of files for '{}'", parquet_export.project_slug)

source_root_dir = parquet_export.current_extract_base
source_files = [f for f in source_root_dir.rglob("*.parquet") if f.is_file()]
if not source_files:
msg = f"No files found in {source_root_dir}"
raise FileNotFoundError(msg)

parquet_dir = f"{parquet_export.project_slug}/{parquet_export.extract_time_slug}/parquet"
upload_dir = f"{remote_directory}/{parquet_dir}"
with self._connect_client() as sftp_client:
_sftp_create_remote_directory(sftp_client, upload_dir)
for source_path in source_files:
sftp_client.chdir(None) # reset
sftp_client.chdir(upload_dir)

source_rel_path = source_path.relative_to(source_root_dir)
source_rel_dir = source_rel_path.parent
source_filename_only = source_rel_path.relative_to(source_rel_dir)
_sftp_create_remote_directory(sftp_client, str(source_rel_dir))
sftp_client.chdir(str(source_rel_dir))
sftp_client.put(source_path, str(source_filename_only))

logger.info("Finished SFTP upload of files for '{}'", parquet_export.project_slug)

@contextmanager
def _connect_client(self) -> Generator[paramiko.SFTPClient, None, None]:
"""Connect to the SFTP client as a context manager"""
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.RejectPolicy())
ssh_client.load_host_keys(self.known_hosts_path)
ssh_client.connect(
self.host, port=self.port, username=self.username, password=self.password
)
sftp_client = ssh_client.open_sftp()
try:
yield sftp_client
finally:
sftp_client.close()
ssh_client.close()


def _sftp_create_remote_directory(sftp_client: paramiko.SFTPClient, directory: str) -> None:
"""
Create remote directory and its parents if it doesn't exist.

:param sftp_client: SFTP client instance
:param directory: Directory path to create
"""
try:
sftp_client.stat(directory)
except OSError:
parent_dir = os.path.dirname(directory) # noqa: PTH120
_sftp_create_remote_directory(sftp_client, str(parent_dir))
sftp_client.mkdir(directory)
logger.debug(f"Created remote directory: {directory}")
Loading
Loading