Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a21ce7f
First go at FTPS upload. Works!
jeremyestein Dec 11, 2025
210556b
Document that settings should live outside of any repo so they don't …
jeremyestein Dec 12, 2025
1cf2e8c
Control the PIXL install from our pyproject so the dependencies are all
jeremyestein Dec 12, 2025
8f96e8a
Make FTPS upload more usable and check that file to be uploaded is
jeremyestein Dec 12, 2025
b64b299
Convert CSV into parquet with the appropriate decimal array type.
jeremyestein Dec 12, 2025
9edb4ff
Make separate exporter container that runs cron to anonymise and export
jeremyestein Dec 16, 2025
129f8a6
Write a toy hasher so we can develop the rest of the pipeline in the
jeremyestein Dec 18, 2025
e64d155
Merge branch 'dev' into jeremy/pseudon
jeremyestein Dec 18, 2025
e0d64a4
Consistently use Python-style variable names
jeremyestein Dec 18, 2025
7f7dd8a
Move config around to reflect recent container changes
jeremyestein Dec 18, 2025
f6b1a67
Make log message more useful
jeremyestein Dec 18, 2025
00167a9
many env files now
jeremyestein Dec 18, 2025
4ef9683
Document manual pipeline calls
jeremyestein Dec 18, 2025
32ccc74
Fix all but one linting error
jeremyestein Dec 18, 2025
b54d1fb
Remove __init__.py that I had accidentally introduced at the top level,
jeremyestein Dec 18, 2025
9b39a47
Ignore missing import errors
jeremyestein Dec 18, 2025
48e2a83
Example crontab timing value was invalid (too many fields)
jeremyestein Dec 23, 2025
0ceaa39
Apply suggestions from code review
jeremyestein Dec 23, 2025
dfcb845
Clarify pyarrow data type usage
jeremyestein Dec 23, 2025
b4c7405
Add mention of PIXL to readme
jeremyestein Dec 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions .dockerignore

This file was deleted.

3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ wheels/

# IDEs
.idea/

# settings files (should not be in the source tree anyway, but just in case)
*.env
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ repos:
args: [--config-file=pyproject.toml]
additional_dependencies:
[
"pandas-stubs",
"types-psycopg2",
"types-pika"
]
Expand Down
2 changes: 1 addition & 1 deletion .python-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.11
3.13
18 changes: 14 additions & 4 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
FROM python:3.14-slim-bookworm
FROM python:3.13-slim-bookworm AS waveform_base
LABEL authors="Stephen Thompson, Jeremy Stein"
# Cron is really small. For the sake of not having to reinstall it all the time,
# put it on both images even though we only need it on exporter.
RUN export DEBIAN_FRONTEND=noninteractive && \
apt-get update && \
apt-get install --yes --no-install-recommends cron && \
apt-get autoremove --yes && apt-get clean --yes && rm -rf /var/lib/apt/lists/*
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/
WORKDIR /app
ARG UVCACHE=/root/.cache/uv
COPY pyproject.toml uv.lock* /app/
COPY PIXL /PIXL
WORKDIR /app
COPY waveform-controller/pyproject.toml waveform-controller/uv.lock /app/
RUN --mount=type=cache,target=${UVCACHE} uv pip install --system .
COPY . /app/
COPY waveform-controller/. /app/
RUN --mount=type=cache,target=${UVCACHE} uv pip install --system .
FROM waveform_base AS waveform_controller
CMD ["emap-extract-waveform"]
FROM waveform_base AS waveform_exporter
ENTRYPOINT ["/app/exporter-scripts/entrypoint.sh"]
48 changes: 42 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,42 @@ emap docker up -d

## 2 Install and deploy waveform controller using docker

Configuration, copy the configuration file to the config directory and edit
as necessary. Remove the comment telling you not to put secrets in it.
Create a root directory for your installation of the waveform-controller project,
separate to the Emap project root.

### Expected top-level dir structure
```
cp settings.env.EXAMPLE config/settings.env
├── PIXL
├── config
├── waveform-controller
└── waveform-export
```

### Instructions for achieving this structure

Clone this repo (`waveform-controller`) and [PIXL](https://github.com/SAFEHR-data/PIXL),
both inside your root directory.

Set up the config files as follows:
```
mkdir config
cp waveform-controller/config.EXAMPLE/controller.env.EXAMPLE config/controller.env
cp waveform-controller/config.EXAMPLE/exporter.env.EXAMPLE config/settings.env
cp waveform-controller/config.EXAMPLE/hasher.env.EXAMPLE config/hasher.env
```
From the new config files, remove the comments telling you not to put secrets in it, as instructed.

If it doesn't already exist you should create a directory named
`waveform-export` in the parent directory to store the saved waveform
messages.

```
mkdir ../waveform-export
mkdir waveform-export
```

Build and start the controller with docker
Build and start the controller and exporter with docker
```
cd ../waveform-controller
cd waveform-controller
docker compose build
docker compose up -d
```
Expand All @@ -67,5 +86,22 @@ Each row of the csv will contain

`csn, mrn, units, samplingRate, observationTime, waveformData`

## Perform a parquet conversion (including de-id)
At the time of writing, the cron pipeline is not set up. This section shows
how to perform an ad-hoc de-id.
```
docker compose run waveform-controller emap-csv-pseudon --csv /waveform-export/original-csv/my_original_csv.csv
```

## Perform an export
At the time of writing, the cron pipeline is not set up. This section shows
how to perform an ad-hoc FTPS upload.

Exported files must be under the WAVEFORM_PSEUDONYMISED_PARQUET directory.
Files passed in must be given relative to this directory:
```
docker compose run --entrypoint "" waveform-exporter emap-send-ftps my_pseudonymised_file.parquet
```

# Developing
See [developing docs](docs/develop.md)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ./config/settings.env and then DELETE THIS COMMENT.
# Copy it to ../config/controller.env and then DELETE THIS COMMENT.
UDS_DBNAME="fakeuds"
UDS_USERNAME="inform_user"
UDS_PASSWORD="inform"
Expand Down
8 changes: 8 additions & 0 deletions config.EXAMPLE/exporter.env.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ../config/exporter.env and then DELETE THIS COMMENT.
# When does the exporter run
EXPORTER_CRON_SCHEDULE="14 5 * * *"
FTPS_HOST=myftps.example.com
FTPS_PORT=990
FTPS_USERNAME=
FTPS_PASSWORD=
6 changes: 6 additions & 0 deletions config.EXAMPLE/hasher.env.EXAMPLE
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# This is an EXAMPLE file, do not put real secrets in here.
# Copy it to ../config/hasher.env and then DELETE THIS COMMENT.
HASHER_API_AZ_CLIENT_ID=
HASHER_API_AZ_CLIENT_PASSWORD=
HASHER_API_AZ_TENANT_ID=
HASHER_API_AZ_KEY_VAULT_NAME=
2 changes: 0 additions & 2 deletions config/.gitignore

This file was deleted.

39 changes: 35 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,15 +1,46 @@
services:
waveform-controller:
build:
context: .
dockerfile: Dockerfile
context: ..
dockerfile: waveform-controller/Dockerfile
target: waveform_controller
args:
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
# ideally we'd use docker secrets but it's not enabled currently
env_file:
- ./config/settings.env
- ../config/controller.env
volumes:
- ../waveform-export:/app/waveform-export
- ../waveform-export:/waveform-export
waveform-exporter:
build:
context: ..
dockerfile: waveform-controller/Dockerfile
target: waveform_exporter
args:
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
# ideally we'd use docker secrets but it's not enabled currently
env_file:
- ../config/exporter.env
volumes:
- ../waveform-export:/waveform-export
waveform-hasher:
build:
context: ../PIXL
dockerfile: ./docker/pixl-python/Dockerfile
target: hasher_api
args:
PIXL_PACKAGE_DIR: hasher
HTTP_PROXY: ${HTTP_PROXY}
http_proxy: ${http_proxy}
HTTPS_PROXY: ${HTTPS_PROXY}
https_proxy: ${https_proxy}
ports:
- "127.0.0.1:${HASHER_API_PORT}:8000"
env_file:
- ../config/hasher.env
19 changes: 19 additions & 0 deletions exporter-scripts/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

# (can't use -u because need to check for potentially unset var)
set -eo pipefail

# Set up cron schedule according to the environment variable
if [ -z "$EXPORTER_CRON_SCHEDULE" ]; then
echo "You must set EXPORTER_CRON_SCHEDULE when running this container"
exit 1
fi
set -x
cat <<EOF | crontab -
PATH=/usr/local/bin:/usr/bin:/bin
SHELL=/usr/bin/bash
$EXPORTER_CRON_SCHEDULE /app/exporter-scripts/scheduled-script.sh
EOF

# cron scheduler is PID 1 in this container
exec cron -f
6 changes: 6 additions & 0 deletions exporter-scripts/scheduled-script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash

# Run by the cron scheduler
# Probably want snakemake instead...
emap-csv-pseudon --help
emap-send-ftps --help
12 changes: 11 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"pandas",
#"pyarrow>=22.0",
"pika>=1.3.2",
"pre-commit>=4.5.0",
"psycopg2-binary>=2.9.11",
# need to be compatible with PIXL, which currently pins 2.9.10 (arguably it shouldn't)
"psycopg2-binary>=2.9.10",
# trick for making a "relative" path, works inside or outside container image
"core @ file:///${PROJECT_ROOT}/../PIXL/pixl_core",
]

[project.optional-dependencies]
dev = ["pytest>=9.0.2"]

[project.scripts]
emap-extract-waveform = "controller:receiver"
emap-csv-pseudon = "pseudon.pseudon:main"
emap-send-ftps = "exporter.ftps:do_upload"

[tool.mypy]
ignore_missing_imports = true
13 changes: 9 additions & 4 deletions src/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ def waveform_callback(ch, method_frame, _header_frame, body):
lookup_success = True
try:
matched_mrn = emap_db.get_row(location_string, observation_time)
except ValueError as e:
except ValueError:
lookup_success = False
logger.error(f"Ambiguous or non existent match: {e}")
logger.error(
"Ambiguous or non existent match for location %s, obs time %s",
location_string,
observation_time,
exc_info=True,
)
matched_mrn = ("unmatched_mrn", "unmatched_nhs", "unmatched_csn")
except ConnectionError as e:
logger.error(f"Database error, will try again: {e}")
except ConnectionError:
logger.error("Database error, will try again", exc_info=True)
reject_message(ch, method_frame.delivery_tag, True)
return

Expand Down
15 changes: 11 additions & 4 deletions src/csv_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import csv
from datetime import datetime
from pathlib import Path
from locations import WAVEFORM_ORIGINAL_CSV


def create_file_name(
Expand Down Expand Up @@ -33,12 +33,19 @@ def write_frame(
"""
observation_datetime = datetime.fromtimestamp(observation_timestamp)

out_path = "waveform-export/"
Path(out_path).mkdir(exist_ok=True)
WAVEFORM_ORIGINAL_CSV.mkdir(exist_ok=True, parents=False)

filename = out_path + create_file_name(
filename = WAVEFORM_ORIGINAL_CSV / create_file_name(
source_stream_id, observation_datetime, csn, units
)

# write header if is new file
if not filename.exists():
with open(filename, "w") as fileout:
fileout.write(
"csn,mrn,source_stream_id,units,sampling_rate,timestamp,location,values\n"
)

with open(filename, "a") as fileout:
wv_writer = csv.writer(fileout, delimiter=",")
waveform_data = waveform_data.get("value", "")
Expand Down
Empty file added src/exporter/__init__.py
Empty file.
64 changes: 64 additions & 0 deletions src/exporter/ftps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import argparse
import logging
import os
from pathlib import Path

import settings
from core.uploader._ftps import _connect_to_ftp, _create_and_set_as_cwd

from locations import WAVEFORM_PSEUDONYMISED_PARQUET

logging.basicConfig(format="%(levelname)s:%(asctime)s: %(message)s")
logger = logging.getLogger(__name__)


def do_upload():
parser = argparse.ArgumentParser()
parser.add_argument(
"file_to_upload",
type=Path,
help="file to upload relative to pseudonymised folder",
)
args = parser.parse_args()
do_upload_inner(args.file_to_upload)


def do_upload_inner(rel_file_to_upload: Path):
# Absolute paths override the base path, so disallow that (abspath1 / abspath2 == abspath2)
if rel_file_to_upload.is_absolute():
raise ValueError("File must be relative to pseudonymised folder")
WAVEFORM_PSEUDONYMISED_PARQUET.mkdir(parents=False, exist_ok=True)
file_to_upload = (WAVEFORM_PSEUDONYMISED_PARQUET / rel_file_to_upload).resolve(
strict=True
)
# Check that even after evaluating ".." and symlinks, the file is still under the "safe" directory
# for upload.
if not file_to_upload.is_relative_to(WAVEFORM_PSEUDONYMISED_PARQUET):
raise ValueError(
f"File {file_to_upload} must be under {WAVEFORM_PSEUDONYMISED_PARQUET}. "
f"If this is unexpected, maybe you are using symlinks or '..' in the path?"
)
if not file_to_upload.exists():
raise ValueError(f"File {file_to_upload} does not exist")
logger.info(
"Connecting to FTPS server %s:%s, with username %s",
settings.FTPS_HOST,
settings.FTPS_PORT,
settings.FTPS_USERNAME,
)
ftp = _connect_to_ftp(
settings.FTPS_HOST,
settings.FTPS_PORT,
settings.FTPS_USERNAME,
settings.FTPS_PASSWORD,
)
remote_project_dir = "waveform-export"
_create_and_set_as_cwd(ftp, remote_project_dir)
remote_filename = os.path.basename(file_to_upload)
command = f"STOR {remote_filename}"
logger.info("Uploading file %s", file_to_upload)
with open(file_to_upload, "rb") as file_to_upload_fh:
ftp.storbinary(command, file_to_upload_fh)
print("Directory listing: ")
ftp.dir()
ftp.quit()
6 changes: 6 additions & 0 deletions src/locations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pathlib import Path

WAVEFORM_EXPORT_BASE = Path("/waveform-export")
WAVEFORM_ORIGINAL_CSV = WAVEFORM_EXPORT_BASE / "original-csv"
WAVEFORM_ORIGINAL_PARQUET = WAVEFORM_EXPORT_BASE / "original-parquet"
WAVEFORM_PSEUDONYMISED_PARQUET = WAVEFORM_EXPORT_BASE / "pseudonymised"
Empty file added src/pseudon/__init__.py
Empty file.
15 changes: 15 additions & 0 deletions src/pseudon/hashing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from functools import lru_cache


@lru_cache
def do_hash(type_prefix, value: str):
"""Stub implementation of deidentification function for testing purposes.

Not that I think this will happen in practice, but we'd want the CSN
"1234" to hash to a different value than the MRN "1234", so prefix
each value with its type.
"""
SALT = "waveform-exporter"
full_value_to_hash = f"{SALT}:{type_prefix}:{value}"
hash_str = f"{hash(full_value_to_hash) & 0xFFFFFFFF:08x}"
return hash_str
Loading