From e7dd4da07bff0737025403df07ee64186d2c90e7 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 10:33:58 +0000 Subject: [PATCH 01/22] [ELI-688] - renaming function to generic since it doesn't actually commune with aws --- src/eligibility_signposting_api/audit/audit_context.py | 2 +- src/eligibility_signposting_api/views/eligibility.py | 2 +- tests/unit/audit/test_audit_context.py | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/eligibility_signposting_api/audit/audit_context.py b/src/eligibility_signposting_api/audit/audit_context.py index 66325669..f9a47f59 100644 --- a/src/eligibility_signposting_api/audit/audit_context.py +++ b/src/eligibility_signposting_api/audit/audit_context.py @@ -140,7 +140,7 @@ def add_response_details(response_id: UUID, last_updated: datetime) -> None: g.audit_log.response.last_updated = last_updated @staticmethod - def write_to_firehose(service: AuditService) -> None: + def write_audit_record(service: AuditService) -> None: service.audit(g.audit_log.model_dump(by_alias=True)) @staticmethod diff --git a/src/eligibility_signposting_api/views/eligibility.py b/src/eligibility_signposting_api/views/eligibility.py index b935678f..d043f0b6 100644 --- a/src/eligibility_signposting_api/views/eligibility.py +++ b/src/eligibility_signposting_api/views/eligibility.py @@ -66,7 +66,7 @@ def check_eligibility( return handle_unknown_person_error(nhs_number) else: response: eligibility_response.EligibilityResponse = build_eligibility_response(eligibility_status) - AuditContext.write_to_firehose(audit_service) + AuditContext.write_audit_record(audit_service) return make_response(response.model_dump(by_alias=True, mode="json", exclude_none=True), HTTPStatus.OK) diff --git a/tests/unit/audit/test_audit_context.py b/tests/unit/audit/test_audit_context.py index 63749422..8c40c8c7 100644 --- a/tests/unit/audit/test_audit_context.py +++ b/tests/unit/audit/test_audit_context.py @@ -330,7 +330,7 @@ def test_add_response_details_adds_to_audit_log_on_g(app): assert g.audit_log.response.last_updated is last_updated -def test_write_to_firehose_calls_audit_service_with_correct_data_from_g(app): +def test_write_audit_record_calls_audit_service_with_correct_data_from_g(app): mock_audit_service = Mock(spec=AuditService) response_id = uuid.uuid4() last_updated = datetime(2023, 1, 1, 0, 0, tzinfo=UTC) @@ -339,7 +339,7 @@ def test_write_to_firehose_calls_audit_service_with_correct_data_from_g(app): g.audit_log = AuditEvent() AuditContext.add_response_details(response_id, last_updated) - AuditContext.write_to_firehose(mock_audit_service) + AuditContext.write_audit_record(mock_audit_service) assert g.audit_log.response.response_id == response_id assert g.audit_log.response.last_updated == last_updated From 4eae536e96ce469152e79f08fc55913176db617d Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 11:12:37 +0000 Subject: [PATCH 02/22] [ELI-688] - renaming firehose delivery stream and adding a kinesis client and mock stream --- .../audit/audit_service.py | 2 +- src/eligibility_signposting_api/config/config.py | 8 ++++---- tests/integration/conftest.py | 13 ++++++++++++- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/eligibility_signposting_api/audit/audit_service.py b/src/eligibility_signposting_api/audit/audit_service.py index 41fddb12..4b2d778d 100644 --- a/src/eligibility_signposting_api/audit/audit_service.py +++ b/src/eligibility_signposting_api/audit/audit_service.py @@ -16,7 +16,7 @@ class AuditService: # pragma: no cover def __init__( self, firehose: Annotated[BaseClient, Inject(qualifier="firehose")], - audit_delivery_stream: Annotated[AwsKinesisFirehoseStreamName, Inject(param="kinesis_audit_stream_to_s3")], + audit_delivery_stream: Annotated[AwsKinesisFirehoseStreamName, Inject(param="firehose_audit_stream_to_s3")], ) -> None: super().__init__() self.firehose = firehose diff --git a/src/eligibility_signposting_api/config/config.py b/src/eligibility_signposting_api/config/config.py index bb23991a..98f5b3ff 100644 --- a/src/eligibility_signposting_api/config/config.py +++ b/src/eligibility_signposting_api/config/config.py @@ -27,8 +27,8 @@ def config() -> dict[str, Any]: hashing_secret_name = HashSecretName(os.getenv("HASHING_SECRET_NAME", "test_secret")) aws_default_region = AwsRegion(os.getenv("AWS_DEFAULT_REGION", "eu-west-1")) enable_xray_patching = os.getenv("ENABLE_XRAY_PATCHING", "false").lower() == "true" - kinesis_audit_stream_to_s3 = AwsKinesisFirehoseStreamName( - os.getenv("KINESIS_AUDIT_STREAM_TO_S3", "test_kinesis_audit_stream_to_s3") + firehose_audit_stream_to_s3 = AwsKinesisFirehoseStreamName( + os.getenv("FIREHOSE_AUDIT_STREAM_TO_S3", "test_firehose_audit_stream_to_s3") ) log_level = LOG_LEVEL @@ -44,7 +44,7 @@ def config() -> dict[str, Any]: "audit_bucket_name": audit_bucket_name, "consumer_mapping_bucket_name": consumer_mapping_bucket_name, "firehose_endpoint": None, - "kinesis_audit_stream_to_s3": kinesis_audit_stream_to_s3, + "firehose_audit_stream_to_s3": firehose_audit_stream_to_s3, "enable_xray_patching": enable_xray_patching, "secretsmanager_endpoint": None, "hashing_secret_name": hashing_secret_name, @@ -63,7 +63,7 @@ def config() -> dict[str, Any]: "audit_bucket_name": audit_bucket_name, "consumer_mapping_bucket_name": consumer_mapping_bucket_name, "firehose_endpoint": URL(os.getenv("FIREHOSE_ENDPOINT", moto_server_endpoint)), - "kinesis_audit_stream_to_s3": kinesis_audit_stream_to_s3, + "firehose_audit_stream_to_s3": firehose_audit_stream_to_s3, "enable_xray_patching": enable_xray_patching, "secretsmanager_endpoint": URL(os.getenv("SECRET_MANAGER_ENDPOINT", moto_server_endpoint)), "hashing_secret_name": hashing_secret_name, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 41098728..41112355 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -158,6 +158,9 @@ def s3_client(boto3_session: Session, moto_server: URL) -> BaseClient: def firehose_client(boto3_session: Session, moto_server: URL) -> BaseClient: return boto3_session.client("firehose", endpoint_url=str(moto_server)) +@pytest.fixture(scope="session") +def kinesis_client(boto3_session: Session, moto_server: URL) -> BaseClient: + return boto3_session.client("kinesis", endpoint_url=str(moto_server)) @pytest.fixture(scope="session") def secretsmanager_client(boto3_session: Session, moto_server: URL) -> BaseClient: @@ -568,7 +571,7 @@ def audit_bucket(s3_client: BaseClient) -> Generator[BucketName]: @pytest.fixture(autouse=True) def firehose_delivery_stream(firehose_client: BaseClient, audit_bucket: BucketName) -> dict[str, Any]: - stream_name = "test_kinesis_audit_stream_to_s3" + stream_name = "test_firehose_audit_stream_to_s3" try: return firehose_client.create_delivery_stream( @@ -585,6 +588,14 @@ def firehose_delivery_stream(firehose_client: BaseClient, audit_bucket: BucketNa except firehose_client.exceptions.ResourceInUseException: return firehose_client.describe_delivery_stream(DeliveryStreamName=stream_name) +@pytest.fixture +def kinesis_delivery_stream(kinesis_client: BaseClient) -> Generator[str]: + stream_name = "test-kinesis-audit-stream" + try: + return kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) + except kinesis_client.exceptions.ResourceInUseException: + return kinesis_client.describe_stream(StreamName=stream_name) + @pytest.fixture def rsv_campaign_config(s3_client: BaseClient, rules_bucket: BucketName) -> Generator[CampaignConfig]: From 49993b8502f994a2ae16ca9b852fc3fb00906dfb Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 11:30:55 +0000 Subject: [PATCH 03/22] [ELI-688] - updating firehose mock config to use kinesis as source --- tests/integration/conftest.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 41112355..8d02ed37 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -570,13 +570,27 @@ def audit_bucket(s3_client: BaseClient) -> Generator[BucketName]: @pytest.fixture(autouse=True) -def firehose_delivery_stream(firehose_client: BaseClient, audit_bucket: BucketName) -> dict[str, Any]: +def kinesis_delivery_stream(kinesis_client: BaseClient) -> Generator[str]: + stream_name = "test-kinesis-audit-stream" + try: + return kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) + except kinesis_client.exceptions.ResourceInUseException: + return kinesis_client.describe_stream(StreamName=stream_name) + +@pytest.fixture(autouse=True) +def firehose_delivery_stream(firehose_client: BaseClient, + audit_bucket: BucketName, + kinesis_delivery_stream: str) -> dict[str, Any]: stream_name = "test_firehose_audit_stream_to_s3" try: return firehose_client.create_delivery_stream( DeliveryStreamName=stream_name, - DeliveryStreamType="DirectPut", + DeliveryStreamType="KinesisStreamAsSource", + KinesisStreamSourceConfiguration={ + "KinesisStreamARN": f"arn:aws:kinesis:eu-west-2:123456789012:stream/{kinesis_delivery_stream}", + "RoleARN": "arn:aws:iam::123456789012:role/firehose_delivery_role", + }, ExtendedS3DestinationConfiguration={ "BucketARN": f"arn:aws:s3:::{audit_bucket}", "RoleARN": "arn:aws:iam::123456789012:role/firehose_delivery_role", @@ -588,14 +602,6 @@ def firehose_delivery_stream(firehose_client: BaseClient, audit_bucket: BucketNa except firehose_client.exceptions.ResourceInUseException: return firehose_client.describe_delivery_stream(DeliveryStreamName=stream_name) -@pytest.fixture -def kinesis_delivery_stream(kinesis_client: BaseClient) -> Generator[str]: - stream_name = "test-kinesis-audit-stream" - try: - return kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) - except kinesis_client.exceptions.ResourceInUseException: - return kinesis_client.describe_stream(StreamName=stream_name) - @pytest.fixture def rsv_campaign_config(s3_client: BaseClient, rules_bucket: BucketName) -> Generator[CampaignConfig]: From 8fbec9edc0bccb242e1783b515e89b1e27953c4f Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 12:25:25 +0000 Subject: [PATCH 04/22] [ELI-688] - update service to put to kinesis and added a factory to inject --- .../audit/audit_service.py | 16 +++++++++------- src/eligibility_signposting_api/config/config.py | 7 +++++++ src/eligibility_signposting_api/repos/factory.py | 8 ++++++++ tests/docker-compose.mock_aws.yml | 1 + 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/eligibility_signposting_api/audit/audit_service.py b/src/eligibility_signposting_api/audit/audit_service.py index 4b2d778d..d153356c 100644 --- a/src/eligibility_signposting_api/audit/audit_service.py +++ b/src/eligibility_signposting_api/audit/audit_service.py @@ -7,6 +7,7 @@ from wireup import Inject, service from eligibility_signposting_api.config.config import AwsKinesisFirehoseStreamName +from eligibility_signposting_api.config.config import AwsKinesisStreamName logger = logging.getLogger(__name__) @@ -15,11 +16,11 @@ class AuditService: # pragma: no cover def __init__( self, - firehose: Annotated[BaseClient, Inject(qualifier="firehose")], - audit_delivery_stream: Annotated[AwsKinesisFirehoseStreamName, Inject(param="firehose_audit_stream_to_s3")], + kinesis: Annotated[BaseClient, Inject(qualifier="kinesis")], + audit_delivery_stream: Annotated[AwsKinesisStreamName, Inject(param="kinesis_audit_stream")], ) -> None: super().__init__() - self.firehose = firehose + self.kinesis = kinesis self.audit_delivery_stream = audit_delivery_stream @xray_recorder.capture("AuditService.audit") # pyright: ignore[reportCallIssue] @@ -34,8 +35,9 @@ def audit(self, audit_record: dict) -> None: str: The Firehose record ID. """ data = json.dumps(audit_record, default=str) - response = self.firehose.put_record( - DeliveryStreamName=self.audit_delivery_stream, - Record={"Data": (data + "\n").encode("utf-8")}, + response = self.kinesis.put_record( + StreamName=self.audit_delivery_stream, + Data=(data + "\n").encode("utf-8"), + PartitionKey="audit", ) - logger.info("Successfully sent to the Firehose", extra={"firehose_record_id": response["RecordId"]}) + logger.info("Successfully sent to kinesis") diff --git a/src/eligibility_signposting_api/config/config.py b/src/eligibility_signposting_api/config/config.py index 98f5b3ff..5ee990b5 100644 --- a/src/eligibility_signposting_api/config/config.py +++ b/src/eligibility_signposting_api/config/config.py @@ -15,6 +15,7 @@ AwsAccessKey = NewType("AwsAccessKey", str) AwsSecretAccessKey = NewType("AwsSecretAccessKey", str) AwsKinesisFirehoseStreamName = NewType("AwsKinesisFirehoseStreamName", str) +AwsKinesisStreamName = NewType("AwsKinesisStreamName", str) ApiDomainName = NewType("ApiDomainName", str) @@ -30,6 +31,9 @@ def config() -> dict[str, Any]: firehose_audit_stream_to_s3 = AwsKinesisFirehoseStreamName( os.getenv("FIREHOSE_AUDIT_STREAM_TO_S3", "test_firehose_audit_stream_to_s3") ) + kinesis_audit_stream = AwsKinesisFirehoseStreamName( + os.getenv("KINESIS_AUDIT_STREAM", "test-kinesis-audit-stream") + ) log_level = LOG_LEVEL if os.getenv("ENV"): @@ -45,6 +49,7 @@ def config() -> dict[str, Any]: "consumer_mapping_bucket_name": consumer_mapping_bucket_name, "firehose_endpoint": None, "firehose_audit_stream_to_s3": firehose_audit_stream_to_s3, + "kinesis_audit_stream": kinesis_audit_stream, "enable_xray_patching": enable_xray_patching, "secretsmanager_endpoint": None, "hashing_secret_name": hashing_secret_name, @@ -64,6 +69,8 @@ def config() -> dict[str, Any]: "consumer_mapping_bucket_name": consumer_mapping_bucket_name, "firehose_endpoint": URL(os.getenv("FIREHOSE_ENDPOINT", moto_server_endpoint)), "firehose_audit_stream_to_s3": firehose_audit_stream_to_s3, + "kinesis_audit_stream": kinesis_audit_stream, + "kinesis_endpoint": URL(os.getenv("KINESIS_ENDPOINT", moto_server_endpoint)), "enable_xray_patching": enable_xray_patching, "secretsmanager_endpoint": URL(os.getenv("SECRET_MANAGER_ENDPOINT", moto_server_endpoint)), "hashing_secret_name": hashing_secret_name, diff --git a/src/eligibility_signposting_api/repos/factory.py b/src/eligibility_signposting_api/repos/factory.py index 686252ef..037fc884 100644 --- a/src/eligibility_signposting_api/repos/factory.py +++ b/src/eligibility_signposting_api/repos/factory.py @@ -44,6 +44,14 @@ def firehose_client_factory( endpoint_url = str(firehose_endpoint) if firehose_endpoint is not None else None return session.client("firehose", endpoint_url=endpoint_url) +@service(qualifier="kinesis") +def kinesis_client_factory( + session: Session, + kinesis_endpoint: Annotated[URL, Inject(param="kinesis_endpoint")], +) -> BaseClient: + endpoint_url = str(kinesis_endpoint) if kinesis_endpoint is not None else None + return session.client("kinesis", endpoint_url=endpoint_url) + @service(qualifier="secretsmanager") def secretsmanager_client_factory( diff --git a/tests/docker-compose.mock_aws.yml b/tests/docker-compose.mock_aws.yml index a4388136..87010a5f 100644 --- a/tests/docker-compose.mock_aws.yml +++ b/tests/docker-compose.mock_aws.yml @@ -31,6 +31,7 @@ services: - S3_ENDPOINT=http://moto-server:5000 - SECRET_MANAGER_ENDPOINT=http://moto-server:5000 - FIREHOSE_ENDPOINT=http://moto-server:5000 + - KINESIS_ENDPOINT=http://moto-server:5000 - LOG_LEVEL=INFO entrypoint: /bin/sh command: From 1f23a9e0423b446200c56a0b0062b515cc54c5c0 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 13:57:49 +0000 Subject: [PATCH 05/22] [ELI-688] - adding a bridge between fake aws clients --- tests/integration/conftest.py | 51 +++++++++++++++++-- .../in_process/test_eligibility_endpoint.py | 18 ++++++- .../lambda/test_app_running_as_lambda.py | 27 +++++++++- tests/unit/repos/test_factory.py | 23 ++++++++- 4 files changed, 111 insertions(+), 8 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 8d02ed37..943928dc 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -577,6 +577,7 @@ def kinesis_delivery_stream(kinesis_client: BaseClient) -> Generator[str]: except kinesis_client.exceptions.ResourceInUseException: return kinesis_client.describe_stream(StreamName=stream_name) + @pytest.fixture(autouse=True) def firehose_delivery_stream(firehose_client: BaseClient, audit_bucket: BucketName, @@ -586,11 +587,7 @@ def firehose_delivery_stream(firehose_client: BaseClient, try: return firehose_client.create_delivery_stream( DeliveryStreamName=stream_name, - DeliveryStreamType="KinesisStreamAsSource", - KinesisStreamSourceConfiguration={ - "KinesisStreamARN": f"arn:aws:kinesis:eu-west-2:123456789012:stream/{kinesis_delivery_stream}", - "RoleARN": "arn:aws:iam::123456789012:role/firehose_delivery_role", - }, + DeliveryStreamType="DirectPut", ExtendedS3DestinationConfiguration={ "BucketARN": f"arn:aws:s3:::{audit_bucket}", "RoleARN": "arn:aws:iam::123456789012:role/firehose_delivery_role", @@ -602,6 +599,50 @@ def firehose_delivery_stream(firehose_client: BaseClient, except firehose_client.exceptions.ResourceInUseException: return firehose_client.describe_delivery_stream(DeliveryStreamName=stream_name) +def bridge_latest_kinesis_record_to_firehose( + kinesis_client: BaseClient, + kinesis_stream_name: str, + firehose_client: BaseClient, + firehose_delivery_stream_name: str, +) -> dict[str, Any]: + """ + Read the latest record from the Kinesis stream and forward it into Firehose. + + This is a test-only bridge to simulate Firehose consuming from Kinesis when + running against Moto. + + Returns the Firehose put_record response. + Raises AssertionError if no Kinesis records are found. + """ + stream_description = kinesis_client.describe_stream(StreamName=kinesis_stream_name) + shards = stream_description["StreamDescription"]["Shards"] + + if not shards: + raise AssertionError(f"No shards found for Kinesis stream: {kinesis_stream_name}") + + shard_id = shards[0]["ShardId"] + + iterator_response = kinesis_client.get_shard_iterator( + StreamName=kinesis_stream_name, + ShardId=shard_id, + ShardIteratorType="TRIM_HORIZON", + ) + shard_iterator = iterator_response["ShardIterator"] + + records_response = kinesis_client.get_records(ShardIterator=shard_iterator) + records = records_response.get("Records", []) + + if not records: + raise AssertionError(f"No records found in Kinesis stream: {kinesis_stream_name}") + + latest_record = records[-1] + latest_data = latest_record["Data"] + + return firehose_client.put_record( + DeliveryStreamName=firehose_delivery_stream_name, + Record={"Data": latest_data}, + ) + @pytest.fixture def rsv_campaign_config(s3_client: BaseClient, rules_bucket: BucketName) -> Generator[CampaignConfig]: diff --git a/tests/integration/in_process/test_eligibility_endpoint.py b/tests/integration/in_process/test_eligibility_endpoint.py index a7ce2aea..b05bdec1 100644 --- a/tests/integration/in_process/test_eligibility_endpoint.py +++ b/tests/integration/in_process/test_eligibility_endpoint.py @@ -30,7 +30,7 @@ ) from eligibility_signposting_api.repos.campaign_repo import BucketName from tests.fixtures.builders.model import rule -from tests.integration.conftest import UNIQUE_CONSUMER_HEADER +from tests.integration.conftest import UNIQUE_CONSUMER_HEADER, bridge_latest_kinesis_record_to_firehose def today() -> date: @@ -1256,6 +1256,8 @@ def test_if_correct_campaign_is_chosen_for_the_consumer_when_multiple_campaign_e requested_conditions: str, requested_category: str, expected_campaign_id: list[str], + kinesis_client, + firehose_client, ): # Given headers = {"nhs-login-nhs-number": str(persisted_person), UNIQUE_CONSUMER_HEADER: consumer_id} @@ -1265,6 +1267,12 @@ def test_if_correct_campaign_is_chosen_for_the_consumer_when_multiple_campaign_e f"/patient-check/{persisted_person}?includeActions=Y&category={requested_category}&conditions={requested_conditions}", headers=headers, ) + bridge_latest_kinesis_record_to_firehose( + kinesis_client=kinesis_client, + kinesis_stream_name="test-kinesis-audit-stream", + firehose_client=firehose_client, + firehose_delivery_stream_name="test_firehose_audit_stream_to_s3", + ) objects = s3_client.list_objects_v2(Bucket=audit_bucket).get("Contents", []) object_keys = [obj["Key"] for obj in objects] @@ -1345,6 +1353,8 @@ def test_if_cc_with_latest_active_iteration_is_chosen_if_exists_multiple_campaig postcode_for_comparator: str, cohort_for_comparator: str, expected_campaign_id: NHSNumber, + kinesis_client, + firehose_client, ): # Given consumer_id = "consumer-n3bs-jo4hn-ce4na" @@ -1433,6 +1443,12 @@ def test_if_cc_with_latest_active_iteration_is_chosen_if_exists_multiple_campaig # When client.get(f"/patient-check/{persisted_person_pc_sw19}", headers=headers) + bridge_latest_kinesis_record_to_firehose( + kinesis_client=kinesis_client, + kinesis_stream_name="test-kinesis-audit-stream", + firehose_client=firehose_client, + firehose_delivery_stream_name="test_firehose_audit_stream_to_s3", + ) objects = s3_client.list_objects_v2(Bucket=audit_bucket).get("Contents", []) object_keys = [obj["Key"] for obj in objects] diff --git a/tests/integration/lambda/test_app_running_as_lambda.py b/tests/integration/lambda/test_app_running_as_lambda.py index e823aed4..c41d1556 100644 --- a/tests/integration/lambda/test_app_running_as_lambda.py +++ b/tests/integration/lambda/test_app_running_as_lambda.py @@ -23,7 +23,7 @@ from eligibility_signposting_api.model.consumer_mapping import ConsumerId, ConsumerMapping from eligibility_signposting_api.model.eligibility_status import NHSNumber from eligibility_signposting_api.repos.campaign_repo import BucketName -from tests.integration.conftest import UNIQUE_CONSUMER_HEADER +from tests.integration.conftest import UNIQUE_CONSUMER_HEADER, bridge_latest_kinesis_record_to_firehose logger = logging.getLogger(__name__) @@ -174,6 +174,8 @@ def test_given_nhs_number_in_path_matches_with_nhs_number_in_headers_and_check_i invoke_with_mock_apigw_request, lambda_logs: Callable[[], list[str]], secretsmanager_client: BaseClient, # noqa:ARG001 + kinesis_client, + firehose_client, ): # Given invoke_path = f"/patient-check/{persisted_person}" @@ -190,6 +192,13 @@ def test_given_nhs_number_in_path_matches_with_nhs_number_in_headers_and_check_i # When response = invoke_with_mock_apigw_request(path=invoke_path, headers=headers, params=params) + bridge_latest_kinesis_record_to_firehose( + kinesis_client=kinesis_client, + kinesis_stream_name="test-kinesis-audit-stream", + firehose_client=firehose_client, + firehose_delivery_stream_name="test_firehose_audit_stream_to_s3", + ) + # Then assert_that( response, @@ -414,6 +423,8 @@ def test_given_person_has_unique_status_for_different_conditions_with_audit( # audit_bucket: BucketName, invoke_with_mock_apigw_request, secretsmanager_client: BaseClient, # noqa: ARG001 + kinesis_client, + firehose_client, ): # Given invoke_path = f"/patient-check/{persisted_person_all_cohorts}" @@ -429,6 +440,12 @@ def test_given_person_has_unique_status_for_different_conditions_with_audit( # # When response = invoke_with_mock_apigw_request(path=invoke_path, headers=headers, params=params) + bridge_latest_kinesis_record_to_firehose( + kinesis_client=kinesis_client, + kinesis_stream_name="test-kinesis-audit-stream", + firehose_client=firehose_client, + firehose_delivery_stream_name="test_firehose_audit_stream_to_s3", + ) # Then assert_that( @@ -597,10 +614,18 @@ def test_token_formatting_in_eligibility_response_and_audit( # noqa: PLR0913 s3_client: BaseClient, audit_bucket: BucketName, invoke_with_mock_apigw_request, + kinesis_client, + firehose_client, ): invoke_path = f"/patient-check/{person_with_all_data}" headers = {"nhs-login-nhs-number": str(person_with_all_data), UNIQUE_CONSUMER_HEADER: consumer_id} response = invoke_with_mock_apigw_request(invoke_path, headers) + bridge_latest_kinesis_record_to_firehose( + kinesis_client=kinesis_client, + kinesis_stream_name="test-kinesis-audit-stream", + firehose_client=firehose_client, + firehose_delivery_stream_name="test_firehose_audit_stream_to_s3", + ) assert_that( response, diff --git a/tests/unit/repos/test_factory.py b/tests/unit/repos/test_factory.py index b6a8a7c6..c5b39953 100644 --- a/tests/unit/repos/test_factory.py +++ b/tests/unit/repos/test_factory.py @@ -9,7 +9,7 @@ from eligibility_signposting_api.repos.factory import ( dynamodb_resource_factory, firehose_client_factory, - s3_service_factory, + s3_service_factory, kinesis_client_factory, ) @@ -79,3 +79,24 @@ def test_firehose_service_factory_without_endpoint(mock_session): mock_session.client.assert_called_once_with("firehose", endpoint_url=None) assert result is mock_client + + +def test_kinesis_service_factory_with_endpoint(mock_session): + mock_client = MagicMock(spec=BaseClient) + mock_session.client = MagicMock(return_value=mock_client) + endpoint = URL("http://localhost:4566") + + result = kinesis_client_factory(mock_session, endpoint) + + mock_session.client.assert_called_once_with("kinesis", endpoint_url="http://localhost:4566") + assert result is mock_client + + +def test_kinesis_service_factory_without_endpoint(mock_session): + mock_client = MagicMock(spec=BaseClient) + mock_session.client = MagicMock(return_value=mock_client) + + result = kinesis_client_factory(mock_session, None) + + mock_session.client.assert_called_once_with("kinesis", endpoint_url=None) + assert result is mock_client From df722ebfc1f921dd8fee7d25d7f37b86f4ac9a38 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 15:09:04 +0000 Subject: [PATCH 06/22] [ELI-688] - adding a none placeholder to stop wireup complaining --- src/eligibility_signposting_api/config/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/eligibility_signposting_api/config/config.py b/src/eligibility_signposting_api/config/config.py index 5ee990b5..81451fdc 100644 --- a/src/eligibility_signposting_api/config/config.py +++ b/src/eligibility_signposting_api/config/config.py @@ -50,6 +50,7 @@ def config() -> dict[str, Any]: "firehose_endpoint": None, "firehose_audit_stream_to_s3": firehose_audit_stream_to_s3, "kinesis_audit_stream": kinesis_audit_stream, + "kinesis_endpoint": None, "enable_xray_patching": enable_xray_patching, "secretsmanager_endpoint": None, "hashing_secret_name": hashing_secret_name, From 3380d9c052b2631e2f3efae033e0646fd56c4a16 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 15:41:54 +0000 Subject: [PATCH 07/22] [ELI-688] - updating firehose module and adding new data stream as a source --- .../kinesis_firehose_delivery_stream.tf | 10 ++++++++-- .../modules/kinesis_firehose/variables.tf | 9 +++++++-- .../stacks/api-layer/kinesis_data_stream.tf | 18 ++++++++++++++++++ .../stacks/api-layer/kinesis_firehose.tf | 5 +++-- 4 files changed, 36 insertions(+), 6 deletions(-) create mode 100644 infrastructure/stacks/api-layer/kinesis_data_stream.tf diff --git a/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf b/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf index ef6bbba3..a01cf836 100644 --- a/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf +++ b/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf @@ -2,6 +2,11 @@ resource "aws_kinesis_firehose_delivery_stream" "eligibility_audit_firehose_deli name = "${terraform.workspace == "default" ? "" : "${terraform.workspace}-"}${var.project_name}-${var.environment}-${var.audit_firehose_delivery_stream_name}" destination = "extended_s3" + kinesis_source_configuration { + kinesis_stream_arn = kinesis_source_stream_arn + role_arn = var.audit_firehose_role.arn + } + extended_s3_configuration { role_arn = var.audit_firehose_role.arn bucket_arn = var.s3_audit_bucket_arn @@ -14,8 +19,8 @@ resource "aws_kinesis_firehose_delivery_stream" "eligibility_audit_firehose_deli cloudwatch_logging_options { enabled = true - log_group_name = var.kinesis_cloud_watch_log_group_name - log_stream_name = var.kinesis_cloud_watch_log_stream + log_group_name = var.firehose_cloud_watch_log_group_name + log_stream_name = var.firehose_cloud_watch_log_stream } } @@ -27,6 +32,7 @@ resource "aws_kinesis_firehose_delivery_stream" "eligibility_audit_firehose_deli depends_on = [ aws_kms_key.firehose_cmk, + var.kinesis_source_stream_arn, var.audit_firehose_role ] diff --git a/infrastructure/modules/kinesis_firehose/variables.tf b/infrastructure/modules/kinesis_firehose/variables.tf index 03d81eba..c1f0804f 100644 --- a/infrastructure/modules/kinesis_firehose/variables.tf +++ b/infrastructure/modules/kinesis_firehose/variables.tf @@ -13,12 +13,12 @@ variable "s3_audit_bucket_arn" { type = string } -variable "kinesis_cloud_watch_log_group_name" { +variable "firehose_cloud_watch_log_group_name" { description = "kinesis cloud watch log group name" type = string } -variable "kinesis_cloud_watch_log_stream" { +variable "firehose_cloud_watch_log_stream" { description = "kinesis cloud watch log stream" type = string } @@ -28,6 +28,11 @@ variable "eligibility_lambda_role_arn" { type = any } +variable "kinesis_source_stream_arn" { + description = "the arn of the kinesis data stream the lambda puts records to" + type = any +} + diff --git a/infrastructure/stacks/api-layer/kinesis_data_stream.tf b/infrastructure/stacks/api-layer/kinesis_data_stream.tf new file mode 100644 index 00000000..cba21dbd --- /dev/null +++ b/infrastructure/stacks/api-layer/kinesis_data_stream.tf @@ -0,0 +1,18 @@ +resource "aws_kms_key" "kinesis_data_stream_kms_key" { + description = "${terraform.workspace == "default" ? "" : "${terraform.workspace}-"} kinesis_data_stream_kms Master Key" + deletion_window_in_days = 14 + is_enabled = true + enable_key_rotation = true +} + +resource "aws_kinesis_stream" "kinesis_source_stream" { + name = "${var.project_name}-${var.environment}-kinesis-audit-stream" + retention_period = 24 + + stream_mode_details { + stream_mode = "ON_DEMAND" # can discuss later + } + + encryption_type = "KMS" + kms_key_id = aws_kms_key.kinesis_data_stream_kms_key.arn +} diff --git a/infrastructure/stacks/api-layer/kinesis_firehose.tf b/infrastructure/stacks/api-layer/kinesis_firehose.tf index 850097b9..9c47bd89 100644 --- a/infrastructure/stacks/api-layer/kinesis_firehose.tf +++ b/infrastructure/stacks/api-layer/kinesis_firehose.tf @@ -7,7 +7,8 @@ module "eligibility_audit_firehose_delivery_stream" { stack_name = local.stack_name workspace = local.workspace tags = local.tags - kinesis_cloud_watch_log_group_name = aws_cloudwatch_log_group.firehose_audit.name - kinesis_cloud_watch_log_stream = aws_cloudwatch_log_stream.firehose_audit_stream.name + firehose_cloud_watch_log_group_name = aws_cloudwatch_log_group.firehose_audit.name + firehose_cloud_watch_log_stream = aws_cloudwatch_log_stream.firehose_audit_stream.name eligibility_lambda_role_arn = aws_iam_role.eligibility_lambda_role.arn + kinesis_source_stream_arn = aws_kinesis_stream.kinesis_source_stream.arn } From fa0395c8f156cc432d7f25e5c8fdd892d8f89449 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 15:51:53 +0000 Subject: [PATCH 08/22] [ELI-688] - plumbing into lambda --- infrastructure/modules/lambda/lambda.tf | 2 +- infrastructure/modules/lambda/variables.tf | 2 +- infrastructure/stacks/api-layer/lambda.tf | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/infrastructure/modules/lambda/lambda.tf b/infrastructure/modules/lambda/lambda.tf index 6485be67..1df2637a 100644 --- a/infrastructure/modules/lambda/lambda.tf +++ b/infrastructure/modules/lambda/lambda.tf @@ -20,7 +20,7 @@ resource "aws_lambda_function" "eligibility_signposting_lambda" { PERSON_TABLE_NAME = var.eligibility_status_table_name, RULES_BUCKET_NAME = var.eligibility_rules_bucket_name, CONSUMER_MAPPING_BUCKET_NAME = var.eligibility_consumer_mappings_bucket_name, - KINESIS_AUDIT_STREAM_TO_S3 = var.kinesis_audit_stream_to_s3_name + KINESIS_AUDIT_STREAM = var.kinesis_audit_stream_name ENV = var.environment LOG_LEVEL = var.log_level ENABLE_XRAY_PATCHING = var.enable_xray_patching diff --git a/infrastructure/modules/lambda/variables.tf b/infrastructure/modules/lambda/variables.tf index 6f238e14..ffc76497 100644 --- a/infrastructure/modules/lambda/variables.tf +++ b/infrastructure/modules/lambda/variables.tf @@ -54,7 +54,7 @@ variable "eligibility_status_table_name" { type = string } -variable "kinesis_audit_stream_to_s3_name" { +variable "kinesis_audit_stream_name" { description = "kinesis audit stream to s3 name" type = string } diff --git a/infrastructure/stacks/api-layer/lambda.tf b/infrastructure/stacks/api-layer/lambda.tf index f87c3658..dfb061ba 100644 --- a/infrastructure/stacks/api-layer/lambda.tf +++ b/infrastructure/stacks/api-layer/lambda.tf @@ -25,7 +25,7 @@ module "eligibility_signposting_lambda_function" { eligibility_rules_bucket_name = module.s3_rules_bucket.storage_bucket_name eligibility_consumer_mappings_bucket_name = module.s3_consumer_mappings_bucket.storage_bucket_name eligibility_status_table_name = module.eligibility_status_table.table_name - kinesis_audit_stream_to_s3_name = module.eligibility_audit_firehose_delivery_stream.firehose_stream_name + kinesis_audit_stream_name = aws_kinesis_stream.kinesis_source_stream.name hashing_secret_name = module.secrets_manager.aws_hashing_secret_name lambda_insights_extension_version = 38 log_level = "INFO" From 4b7d1b13bc81391f7df4bed5eafe8fb07e108deb Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 18:10:18 +0000 Subject: [PATCH 09/22] [ELI-688] - adding and attaching policies, extending permissions boundary --- .../stacks/api-layer/iam_policies.tf | 52 +++++++++++++++++++ .../iams_permissions_boundary.tf | 16 ++++++ 2 files changed, 68 insertions(+) diff --git a/infrastructure/stacks/api-layer/iam_policies.tf b/infrastructure/stacks/api-layer/iam_policies.tf index 560e9266..22fc8a1d 100644 --- a/infrastructure/stacks/api-layer/iam_policies.tf +++ b/infrastructure/stacks/api-layer/iam_policies.tf @@ -256,6 +256,58 @@ resource "aws_iam_role_policy_attachment" "lambda_insights_policy" { policy_arn = "arn:aws:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy" } +# Policy document for Kinesis Firehose to read from Kinesis Source stream +data "aws_iam_policy_document" "kinesis_source_access" { + statement { + sid = "AllowReadFromKinesisSourceStream" + effect = "Allow" + + actions = [ + "kinesis:DescribeStream", + "kinesis:DescribeStreamSummary", + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:ListShards", + "kinesis:SubscribeToShard", + ] + + resources = [ + aws_kinesis_stream.kinesis_source_stream, + ] + } +} + +# Attach kinesis read policy to firehose role +resource "aws_iam_role_policy" "kinesis_firehose_s3_write_policy" { + name = "KinesisSourceReadAccess" + role = aws_iam_role.eligibility_audit_firehose_role.id + policy = data.aws_iam_policy_document.kinesis_source_access.json +} + +# Policy document for Lambda to write to Kinesis stream +data "aws_iam_policy_document" "kinesis_write_access" { + statement { + sid = "AllowWriteToKinesisStream" + effect = "Allow" + + actions = [ + "kinesis:PutRecord", + "kinesis:PutRecords" + ] + + resources = [ + aws_kinesis_stream.kinesis_source_stream + ] + } +} + +# Attach kinesis write policy to Lambda role +resource "aws_iam_role_policy" "lambda_kinesis_write_policy" { + name = "KinesisWriteAccess" + role = aws_iam_role.eligibility_lambda_role.id + policy = data.aws_iam_policy_document.kinesis_write_access.json +} + # Policy doc for S3 Audit bucket data "aws_iam_policy_document" "s3_audit_bucket_policy" { statement { diff --git a/infrastructure/stacks/iams-developer-roles/iams_permissions_boundary.tf b/infrastructure/stacks/iams-developer-roles/iams_permissions_boundary.tf index 91c1e94d..10dca466 100644 --- a/infrastructure/stacks/iams-developer-roles/iams_permissions_boundary.tf +++ b/infrastructure/stacks/iams-developer-roles/iams_permissions_boundary.tf @@ -86,6 +86,22 @@ data "aws_iam_policy_document" "permissions_boundary" { "firehose:StartDeliveryStreamEncryption", "firehose:StopDeliveryStreamEncryption", + # Kinesis Stream - audit log streaming + "kinesis:CreateStream", + "kinesis:DeleteStream", + "kinesis:DescribeStream", + "kinesis:ListStreams", + "kinesis:PutRecord", + "kinesis:PutRecords", + "kinesis:TagStream", + "kinesis:ListTagsForStream", + "kinesis:UntagStream", + "kinesis:GetShardIterator", + "kinesis:GetRecords", + "kinesis:ListShards", + "kinesis:SubscribeToShard", + "kinesis:DescribeStreamSummary", + # IAM - specific role and policy management "iam:GetRole*", "iam:GetPolicy*", From b151610d1442d08189831cd0e567f9b728fdb583 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 19:07:00 +0000 Subject: [PATCH 10/22] [ELI-688] - adding and attaching kms policies --- .../stacks/api-layer/iam_policies.tf | 57 ++++++++++++++++++- 1 file changed, 54 insertions(+), 3 deletions(-) diff --git a/infrastructure/stacks/api-layer/iam_policies.tf b/infrastructure/stacks/api-layer/iam_policies.tf index 22fc8a1d..5d03e676 100644 --- a/infrastructure/stacks/api-layer/iam_policies.tf +++ b/infrastructure/stacks/api-layer/iam_policies.tf @@ -256,7 +256,7 @@ resource "aws_iam_role_policy_attachment" "lambda_insights_policy" { policy_arn = "arn:aws:iam::aws:policy/CloudWatchLambdaInsightsExecutionRolePolicy" } -# Policy document for Kinesis Firehose to read from Kinesis Source stream +# Policy document to read from Kinesis Source stream data "aws_iam_policy_document" "kinesis_source_access" { statement { sid = "AllowReadFromKinesisSourceStream" @@ -272,7 +272,25 @@ data "aws_iam_policy_document" "kinesis_source_access" { ] resources = [ - aws_kinesis_stream.kinesis_source_stream, + aws_kinesis_stream.kinesis_source_stream.arn, + ] + } +} + +# Policy document to use KMS key for reading from Kinesis Source stream +data "aws_iam_policy_document" "kinesis_source_kms_read_access" { + statement { + sid = "AllowUseOfKinesisSourceKeyForReads" + effect = "Allow" + + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey", + "kms:DescribeKey" + ] + + resources = [ + aws_kms_key.kinesis_data_stream_kms_key.arn ] } } @@ -284,6 +302,13 @@ resource "aws_iam_role_policy" "kinesis_firehose_s3_write_policy" { policy = data.aws_iam_policy_document.kinesis_source_access.json } +# Attach kinesis source stream KMS read policy to firehose role +resource "aws_iam_role_policy" "firehose_kinesis_source_kms_policy" { + name = "KinesisSourceKmsReadAccess" + role = aws_iam_role.eligibility_audit_firehose_role.id + policy = data.aws_iam_policy_document.kinesis_source_kms_read_access.json +} + # Policy document for Lambda to write to Kinesis stream data "aws_iam_policy_document" "kinesis_write_access" { statement { @@ -296,11 +321,30 @@ data "aws_iam_policy_document" "kinesis_write_access" { ] resources = [ - aws_kinesis_stream.kinesis_source_stream + aws_kinesis_stream.kinesis_source_stream.arn ] } } +# Policy document to use the KMS key +data "aws_iam_policy_document" "kinesis_kms_write_access" { + statement { + sid = "AllowUseOfKinesisStreamKeyForWrites" + effect = "Allow" + + actions = [ + "kms:Encrypt", + "kms:GenerateDataKey", + "kms:DescribeKey" + ] + + resources = [ + aws_kms_key.kinesis_data_stream_kms_key.arn + ] + } +} + + # Attach kinesis write policy to Lambda role resource "aws_iam_role_policy" "lambda_kinesis_write_policy" { name = "KinesisWriteAccess" @@ -308,6 +352,13 @@ resource "aws_iam_role_policy" "lambda_kinesis_write_policy" { policy = data.aws_iam_policy_document.kinesis_write_access.json } +# Attach kinesis KMS access policy to Lambda role +resource "aws_iam_role_policy" "lambda_kinesis_kms_policy" { + name = "KinesisStreamKmsWriteAccess" + role = aws_iam_role.eligibility_lambda_role.id + policy = data.aws_iam_policy_document.kinesis_kms_write_access.json +} + # Policy doc for S3 Audit bucket data "aws_iam_policy_document" "s3_audit_bucket_policy" { statement { From 5fa446841da276893046c37cb3b8b7cedc96e0c1 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 19:23:16 +0000 Subject: [PATCH 11/22] [ELI-688] - clean up and removing server side encryption as incompatible --- .../kinesis_firehose/kinesis_firehose_delivery_stream.tf | 8 +------- infrastructure/stacks/api-layer/iam_policies.tf | 2 +- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf b/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf index a01cf836..29681856 100644 --- a/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf +++ b/infrastructure/modules/kinesis_firehose/kinesis_firehose_delivery_stream.tf @@ -3,7 +3,7 @@ resource "aws_kinesis_firehose_delivery_stream" "eligibility_audit_firehose_deli destination = "extended_s3" kinesis_source_configuration { - kinesis_stream_arn = kinesis_source_stream_arn + kinesis_stream_arn = var.kinesis_source_stream_arn role_arn = var.audit_firehose_role.arn } @@ -24,12 +24,6 @@ resource "aws_kinesis_firehose_delivery_stream" "eligibility_audit_firehose_deli } } - server_side_encryption { - enabled = true - key_arn = aws_kms_key.firehose_cmk.arn - key_type = "CUSTOMER_MANAGED_CMK" - } - depends_on = [ aws_kms_key.firehose_cmk, var.kinesis_source_stream_arn, diff --git a/infrastructure/stacks/api-layer/iam_policies.tf b/infrastructure/stacks/api-layer/iam_policies.tf index 5d03e676..9c27f386 100644 --- a/infrastructure/stacks/api-layer/iam_policies.tf +++ b/infrastructure/stacks/api-layer/iam_policies.tf @@ -296,7 +296,7 @@ data "aws_iam_policy_document" "kinesis_source_kms_read_access" { } # Attach kinesis read policy to firehose role -resource "aws_iam_role_policy" "kinesis_firehose_s3_write_policy" { +resource "aws_iam_role_policy" "kinesis_firehose_read_policy" { name = "KinesisSourceReadAccess" role = aws_iam_role.eligibility_audit_firehose_role.id policy = data.aws_iam_policy_document.kinesis_source_access.json From 0fc6e3e1f9f7721e0b17fc8397d1140b6e6917b8 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 19:29:38 +0000 Subject: [PATCH 12/22] [ELI-688] - adding github perms --- .../github_actions_policies.tf | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/infrastructure/stacks/iams-developer-roles/github_actions_policies.tf b/infrastructure/stacks/iams-developer-roles/github_actions_policies.tf index ac774d86..d67f197c 100644 --- a/infrastructure/stacks/iams-developer-roles/github_actions_policies.tf +++ b/infrastructure/stacks/iams-developer-roles/github_actions_policies.tf @@ -674,6 +674,41 @@ resource "aws_iam_policy" "firehose_readonly" { tags = merge(local.tags, { Name = "firehose-describe-access" }) } +resource "aws_iam_policy" "kinesis_management" { + name = "kinesis-management" + description = "Allow GitHub Actions to manage project Kinesis streams" + path = "/service-policies/" + + policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Effect = "Allow", + Action = [ + "kinesis:CreateStream", + "kinesis:DeleteStream", + "kinesis:DescribeStream", + "kinesis:DescribeStreamSummary", + "kinesis:ListStreams", + "kinesis:ListTagsForStream", + "kinesis:AddTagsToStream", + "kinesis:RemoveTagsFromStream", + "kinesis:ListShards", + "kinesis:IncreaseStreamRetentionPeriod", + "kinesis:DecreaseStreamRetentionPeriod", + "kinesis:StartStreamEncryption", + "kinesis:StopStreamEncryption" + ], + Resource = [ + "arn:aws:kinesis:${var.default_aws_region}:${data.aws_caller_identity.current.account_id}:stream/eligibility-signposting-api-*" + ] + } + ] + }) + + tags = merge(local.tags, { Name = "kinesis-management" }) +} + resource "aws_iam_policy" "cloudwatch_management" { #checkov:skip=CKV_AWS_355: GetMetricWidgetImage requires wildcard resource #checkov:skip=CKV_AWS_290: GetMetricWidgetImage requires wildcard resource @@ -788,3 +823,8 @@ resource "aws_iam_role_policy_attachment" "cloudwatch_management" { role = aws_iam_role.github_actions.name policy_arn = aws_iam_policy.cloudwatch_management.arn } + +resource "aws_iam_role_policy_attachment" "kinesis_management_attach" { + role = aws_iam_role.github_actions.name + policy_arn = aws_iam_policy.kinesis_management.arn +} From 959d744195570ea4803bfb6cea8d3889b252efc1 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 19:39:21 +0000 Subject: [PATCH 13/22] [ELI-688] - adding and attaching kms policy to the stream --- .../stacks/api-layer/kinesis_data_stream.tf | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/infrastructure/stacks/api-layer/kinesis_data_stream.tf b/infrastructure/stacks/api-layer/kinesis_data_stream.tf index cba21dbd..e0d0fdb3 100644 --- a/infrastructure/stacks/api-layer/kinesis_data_stream.tf +++ b/infrastructure/stacks/api-layer/kinesis_data_stream.tf @@ -5,6 +5,70 @@ resource "aws_kms_key" "kinesis_data_stream_kms_key" { enable_key_rotation = true } +resource "aws_kms_alias" "kinesis_data_stream_kms_key" { + name = "alias/${var.project_name}-${var.environment}-kinesis-audit-stream" + target_key_id = aws_kms_key.kinesis_data_stream_kms_key.key_id +} + + +data "aws_iam_policy_document" "kinesis_stream_kms_key_policy" { + statement { + sid = "EnableRootPermissions" + effect = "Allow" + + principals { + type = "AWS" + identifiers = ["arn:aws:iam::${data.aws_caller_identity.current.account_id}:root"] + } + + actions = ["kms:*"] + resources = ["*"] + } + + statement { + sid = "AllowLambdaUseOfKey" + effect = "Allow" + + principals { + type = "AWS" + identifiers = [aws_iam_role.eligibility_lambda_role.arn] + } + + actions = [ + "kms:Encrypt", + "kms:Decrypt", + "kms:ReEncrypt*", + "kms:GenerateDataKey*", + "kms:DescribeKey" + ] + + resources = ["*"] + } + + statement { + sid = "AllowFirehoseRoleUseOfKey" + effect = "Allow" + + principals { + type = "AWS" + identifiers = [aws_iam_role.eligibility_audit_firehose_role.arn] + } + + actions = [ + "kms:Decrypt", + "kms:GenerateDataKey*", + "kms:DescribeKey" + ] + + resources = ["*"] + } +} + +resource "aws_kms_key_policy" "kinesis_stream_kms_key_policy" { + key_id = aws_kms_key.kinesis_data_stream_kms_key.id + policy = data.aws_iam_policy_document.kinesis_stream_kms_key_policy.json +} + resource "aws_kinesis_stream" "kinesis_source_stream" { name = "${var.project_name}-${var.environment}-kinesis-audit-stream" retention_period = 24 From c1211b05a501e4fc552dad43c936e8dac9b44770 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 20:06:10 +0000 Subject: [PATCH 14/22] [ELI-688] - addressing some initial comments --- .../audit/audit_service.py | 18 +++++++++--------- .../config/config.py | 2 +- .../repos/factory.py | 1 + tests/integration/conftest.py | 5 ++--- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/eligibility_signposting_api/audit/audit_service.py b/src/eligibility_signposting_api/audit/audit_service.py index d153356c..d8556c1a 100644 --- a/src/eligibility_signposting_api/audit/audit_service.py +++ b/src/eligibility_signposting_api/audit/audit_service.py @@ -6,7 +6,6 @@ from botocore.client import BaseClient from wireup import Inject, service -from eligibility_signposting_api.config.config import AwsKinesisFirehoseStreamName from eligibility_signposting_api.config.config import AwsKinesisStreamName logger = logging.getLogger(__name__) @@ -17,27 +16,28 @@ class AuditService: # pragma: no cover def __init__( self, kinesis: Annotated[BaseClient, Inject(qualifier="kinesis")], - audit_delivery_stream: Annotated[AwsKinesisStreamName, Inject(param="kinesis_audit_stream")], + audit_stream: Annotated[AwsKinesisStreamName, Inject(param="kinesis_audit_stream")], ) -> None: super().__init__() self.kinesis = kinesis - self.audit_delivery_stream = audit_delivery_stream + self.audit_stream = audit_stream @xray_recorder.capture("AuditService.audit") # pyright: ignore[reportCallIssue] def audit(self, audit_record: dict) -> None: """ - Sends an audit record to the configured Firehose delivery stream. + Sends an audit record to the configured kinesis data stream. Args: audit_record (dict): The audit data to send. - - Returns: - str: The Firehose record ID. """ data = json.dumps(audit_record, default=str) response = self.kinesis.put_record( - StreamName=self.audit_delivery_stream, + StreamName=self.audit_stream, Data=(data + "\n").encode("utf-8"), PartitionKey="audit", ) - logger.info("Successfully sent to kinesis") + logger.info("Successfully sent to kinesis", extra={ + "stream_name": self.audit_stream, + "kinesis_sequence_number": response.get("SequenceNumber"), + "kinesis_shard_id": response.get("ShardId"), + },) diff --git a/src/eligibility_signposting_api/config/config.py b/src/eligibility_signposting_api/config/config.py index 81451fdc..52dd868b 100644 --- a/src/eligibility_signposting_api/config/config.py +++ b/src/eligibility_signposting_api/config/config.py @@ -31,7 +31,7 @@ def config() -> dict[str, Any]: firehose_audit_stream_to_s3 = AwsKinesisFirehoseStreamName( os.getenv("FIREHOSE_AUDIT_STREAM_TO_S3", "test_firehose_audit_stream_to_s3") ) - kinesis_audit_stream = AwsKinesisFirehoseStreamName( + kinesis_audit_stream = AwsKinesisStreamName( os.getenv("KINESIS_AUDIT_STREAM", "test-kinesis-audit-stream") ) log_level = LOG_LEVEL diff --git a/src/eligibility_signposting_api/repos/factory.py b/src/eligibility_signposting_api/repos/factory.py index 037fc884..d3996e77 100644 --- a/src/eligibility_signposting_api/repos/factory.py +++ b/src/eligibility_signposting_api/repos/factory.py @@ -44,6 +44,7 @@ def firehose_client_factory( endpoint_url = str(firehose_endpoint) if firehose_endpoint is not None else None return session.client("firehose", endpoint_url=endpoint_url) + @service(qualifier="kinesis") def kinesis_client_factory( session: Session, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 943928dc..9fa15307 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -570,7 +570,7 @@ def audit_bucket(s3_client: BaseClient) -> Generator[BucketName]: @pytest.fixture(autouse=True) -def kinesis_delivery_stream(kinesis_client: BaseClient) -> Generator[str]: +def kinesis_stream(kinesis_client: BaseClient) -> dict[str, Any]: stream_name = "test-kinesis-audit-stream" try: return kinesis_client.create_stream(StreamName=stream_name, ShardCount=1) @@ -580,8 +580,7 @@ def kinesis_delivery_stream(kinesis_client: BaseClient) -> Generator[str]: @pytest.fixture(autouse=True) def firehose_delivery_stream(firehose_client: BaseClient, - audit_bucket: BucketName, - kinesis_delivery_stream: str) -> dict[str, Any]: + audit_bucket: BucketName) -> dict[str, Any]: stream_name = "test_firehose_audit_stream_to_s3" try: From cf1a135ac13c585900ed1326a5b2f270c10db6fd Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 20:40:39 +0000 Subject: [PATCH 15/22] [ELI-688] - adding a more useful partition key that will split up into 32 buckets --- .../audit/audit_service.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/eligibility_signposting_api/audit/audit_service.py b/src/eligibility_signposting_api/audit/audit_service.py index d8556c1a..5933d4b6 100644 --- a/src/eligibility_signposting_api/audit/audit_service.py +++ b/src/eligibility_signposting_api/audit/audit_service.py @@ -1,3 +1,4 @@ +import hashlib import json import logging from typing import Annotated @@ -22,6 +23,12 @@ def __init__( self.kinesis = kinesis self.audit_stream = audit_stream + @staticmethod + def get_partition_key(response_id: str) -> str: + h = int(hashlib.sha256(response_id.encode()).hexdigest(), 16) + bucket = h % 32 + return f"audit-{bucket:02d}" + @xray_recorder.capture("AuditService.audit") # pyright: ignore[reportCallIssue] def audit(self, audit_record: dict) -> None: """ @@ -31,10 +38,12 @@ def audit(self, audit_record: dict) -> None: audit_record (dict): The audit data to send. """ data = json.dumps(audit_record, default=str) + response_id = audit_record.get("response", {}).get("responseId") + partition_key = self.get_partition_key(str(response_id)) response = self.kinesis.put_record( StreamName=self.audit_stream, Data=(data + "\n").encode("utf-8"), - PartitionKey="audit", + PartitionKey=partition_key, ) logger.info("Successfully sent to kinesis", extra={ "stream_name": self.audit_stream, From 89dd5886d2ee72f866310584b414734ae4274409 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 21:10:29 +0000 Subject: [PATCH 16/22] [ELI-688] - formatting --- tests/unit/repos/test_factory.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/repos/test_factory.py b/tests/unit/repos/test_factory.py index c5b39953..63b43462 100644 --- a/tests/unit/repos/test_factory.py +++ b/tests/unit/repos/test_factory.py @@ -9,7 +9,8 @@ from eligibility_signposting_api.repos.factory import ( dynamodb_resource_factory, firehose_client_factory, - s3_service_factory, kinesis_client_factory, + s3_service_factory, + kinesis_client_factory, ) From c2c46f8f8ee90ce3185096a8bc20f31024439864 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 21:17:14 +0000 Subject: [PATCH 17/22] [ELI-688] - formatting --- .../audit/audit_service.py | 13 ++++++++----- src/eligibility_signposting_api/config/config.py | 4 +--- tests/integration/conftest.py | 6 ++++-- tests/unit/repos/test_factory.py | 2 +- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/eligibility_signposting_api/audit/audit_service.py b/src/eligibility_signposting_api/audit/audit_service.py index 5933d4b6..aee541e7 100644 --- a/src/eligibility_signposting_api/audit/audit_service.py +++ b/src/eligibility_signposting_api/audit/audit_service.py @@ -45,8 +45,11 @@ def audit(self, audit_record: dict) -> None: Data=(data + "\n").encode("utf-8"), PartitionKey=partition_key, ) - logger.info("Successfully sent to kinesis", extra={ - "stream_name": self.audit_stream, - "kinesis_sequence_number": response.get("SequenceNumber"), - "kinesis_shard_id": response.get("ShardId"), - },) + logger.info( + "Successfully sent to kinesis", + extra={ + "stream_name": self.audit_stream, + "kinesis_sequence_number": response.get("SequenceNumber"), + "kinesis_shard_id": response.get("ShardId"), + }, + ) diff --git a/src/eligibility_signposting_api/config/config.py b/src/eligibility_signposting_api/config/config.py index 52dd868b..6dc32b22 100644 --- a/src/eligibility_signposting_api/config/config.py +++ b/src/eligibility_signposting_api/config/config.py @@ -31,9 +31,7 @@ def config() -> dict[str, Any]: firehose_audit_stream_to_s3 = AwsKinesisFirehoseStreamName( os.getenv("FIREHOSE_AUDIT_STREAM_TO_S3", "test_firehose_audit_stream_to_s3") ) - kinesis_audit_stream = AwsKinesisStreamName( - os.getenv("KINESIS_AUDIT_STREAM", "test-kinesis-audit-stream") - ) + kinesis_audit_stream = AwsKinesisStreamName(os.getenv("KINESIS_AUDIT_STREAM", "test-kinesis-audit-stream")) log_level = LOG_LEVEL if os.getenv("ENV"): diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9fa15307..9157c373 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -158,10 +158,12 @@ def s3_client(boto3_session: Session, moto_server: URL) -> BaseClient: def firehose_client(boto3_session: Session, moto_server: URL) -> BaseClient: return boto3_session.client("firehose", endpoint_url=str(moto_server)) + @pytest.fixture(scope="session") def kinesis_client(boto3_session: Session, moto_server: URL) -> BaseClient: return boto3_session.client("kinesis", endpoint_url=str(moto_server)) + @pytest.fixture(scope="session") def secretsmanager_client(boto3_session: Session, moto_server: URL) -> BaseClient: """ @@ -579,8 +581,7 @@ def kinesis_stream(kinesis_client: BaseClient) -> dict[str, Any]: @pytest.fixture(autouse=True) -def firehose_delivery_stream(firehose_client: BaseClient, - audit_bucket: BucketName) -> dict[str, Any]: +def firehose_delivery_stream(firehose_client: BaseClient, audit_bucket: BucketName) -> dict[str, Any]: stream_name = "test_firehose_audit_stream_to_s3" try: @@ -598,6 +599,7 @@ def firehose_delivery_stream(firehose_client: BaseClient, except firehose_client.exceptions.ResourceInUseException: return firehose_client.describe_delivery_stream(DeliveryStreamName=stream_name) + def bridge_latest_kinesis_record_to_firehose( kinesis_client: BaseClient, kinesis_stream_name: str, diff --git a/tests/unit/repos/test_factory.py b/tests/unit/repos/test_factory.py index 63b43462..a654cd5c 100644 --- a/tests/unit/repos/test_factory.py +++ b/tests/unit/repos/test_factory.py @@ -9,8 +9,8 @@ from eligibility_signposting_api.repos.factory import ( dynamodb_resource_factory, firehose_client_factory, - s3_service_factory, kinesis_client_factory, + s3_service_factory, ) From c07dd857223fa8226281ddcbef472af9eea00a89 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 21:30:19 +0000 Subject: [PATCH 18/22] [ELI-688] - checkov skips --- infrastructure/stacks/api-layer/kinesis_data_stream.tf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/infrastructure/stacks/api-layer/kinesis_data_stream.tf b/infrastructure/stacks/api-layer/kinesis_data_stream.tf index e0d0fdb3..d26ea272 100644 --- a/infrastructure/stacks/api-layer/kinesis_data_stream.tf +++ b/infrastructure/stacks/api-layer/kinesis_data_stream.tf @@ -12,6 +12,9 @@ resource "aws_kms_alias" "kinesis_data_stream_kms_key" { data "aws_iam_policy_document" "kinesis_stream_kms_key_policy" { + #checkov:skip=CKV_AWS_111 Root user needs full KMS key management + #checkov:skip=CKV_AWS_109 Root user needs full KMS key management + #checkov:skip=CKV_AWS_356 Root user needs full KMS key management statement { sid = "EnableRootPermissions" effect = "Allow" From 28592fe35da5868b25763e58b945d97a672342dd Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 21:38:20 +0000 Subject: [PATCH 19/22] [ELI-688] - linting --- tests/integration/conftest.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 9157c373..5493868b 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -619,7 +619,8 @@ def bridge_latest_kinesis_record_to_firehose( shards = stream_description["StreamDescription"]["Shards"] if not shards: - raise AssertionError(f"No shards found for Kinesis stream: {kinesis_stream_name}") + msg = f"No shards in {kinesis_stream_name}" + raise AssertionError(msg) shard_id = shards[0]["ShardId"] @@ -634,7 +635,8 @@ def bridge_latest_kinesis_record_to_firehose( records = records_response.get("Records", []) if not records: - raise AssertionError(f"No records found in Kinesis stream: {kinesis_stream_name}") + msg = f"No records in {kinesis_stream_name}" + raise AssertionError(msg) latest_record = records[-1] latest_data = latest_record["Data"] From 70b94940b6cabc780b2604c1dd00b8dc45acb134 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Wed, 18 Mar 2026 22:19:51 +0000 Subject: [PATCH 20/22] [ELI-688] - slightly tighter security --- .../stacks/api-layer/kinesis_data_stream.tf | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/infrastructure/stacks/api-layer/kinesis_data_stream.tf b/infrastructure/stacks/api-layer/kinesis_data_stream.tf index d26ea272..6684f30d 100644 --- a/infrastructure/stacks/api-layer/kinesis_data_stream.tf +++ b/infrastructure/stacks/api-layer/kinesis_data_stream.tf @@ -46,6 +46,12 @@ data "aws_iam_policy_document" "kinesis_stream_kms_key_policy" { ] resources = ["*"] + + condition { + test = "StringEquals" + variable = "kms:ViaService" + values = ["kinesis.${var.default_aws_region}.amazonaws.com"] + } } statement { @@ -64,6 +70,12 @@ data "aws_iam_policy_document" "kinesis_stream_kms_key_policy" { ] resources = ["*"] + + condition { + test = "StringEquals" + variable = "kms:ViaService" + values = ["firehose.eu-west-2.amazonaws.com"] + } } } From c14931eaafa7c078c491060b3db4a63b162fc9e7 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Fri, 20 Mar 2026 09:06:51 +0000 Subject: [PATCH 21/22] [ELI-688] - adding missing fixtures and bridge from new tests --- .../integration/in_process/test_eligibility_endpoint.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integration/in_process/test_eligibility_endpoint.py b/tests/integration/in_process/test_eligibility_endpoint.py index fc2c4e1f..1ec589e6 100644 --- a/tests/integration/in_process/test_eligibility_endpoint.py +++ b/tests/integration/in_process/test_eligibility_endpoint.py @@ -1703,6 +1703,8 @@ def test_iteration_selection_by_datetime_with_multiple_campaigns_same_target( # rules_bucket: BucketName, audit_bucket: BucketName, secretsmanager_client: BaseClient, # noqa: ARG002 + kinesis_client, + firehose_client ): # Given consumer_id = "consumer-n3bs-jo4hn-ce4na" @@ -1795,6 +1797,13 @@ def test_iteration_selection_by_datetime_with_multiple_campaigns_same_target( # # When response = client.get(f"/patient-check/{persisted_person_pc_sw19}", headers=headers) + bridge_latest_kinesis_record_to_firehose( + kinesis_client=kinesis_client, + kinesis_stream_name="test-kinesis-audit-stream", + firehose_client=firehose_client, + firehose_delivery_stream_name="test_firehose_audit_stream_to_s3", + ) + # Then assert_that(response, is_response().with_status_code(HTTPStatus.OK)) From c4670bf09dd76fee27678d21e5ce35379597a813 Mon Sep 17 00:00:00 2001 From: TOEL2 Date: Fri, 20 Mar 2026 09:24:14 +0000 Subject: [PATCH 22/22] [ELI-688] - formatting --- tests/integration/in_process/test_eligibility_endpoint.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/in_process/test_eligibility_endpoint.py b/tests/integration/in_process/test_eligibility_endpoint.py index 1ec589e6..2c036a38 100644 --- a/tests/integration/in_process/test_eligibility_endpoint.py +++ b/tests/integration/in_process/test_eligibility_endpoint.py @@ -1704,7 +1704,7 @@ def test_iteration_selection_by_datetime_with_multiple_campaigns_same_target( # audit_bucket: BucketName, secretsmanager_client: BaseClient, # noqa: ARG002 kinesis_client, - firehose_client + firehose_client, ): # Given consumer_id = "consumer-n3bs-jo4hn-ce4na"