Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ It should track the code in `main.py`, not stale assumptions from earlier iterat
- This is an OpenCTI external-import connector for Double Extortion Platform (DEP) announcements.
- The connector authenticates against DEP AWS Cognito, fetches announcement records from the DEP REST API, converts them to STIX 2.1, and sends bundles to OpenCTI with `update=True`.
- The connector scope is `report,incident,identity,indicator`.
- The implementation is concentrated in a single runtime file: `main.py`.
- The implementation is split across the `dep_connector/` package (`converter_to_stix.py`, `client_api.py`, `config_loader.py`, `connector.py`) with `main.py` as the thin entrypoint.

## Runtime and configuration truths

Expand Down Expand Up @@ -292,7 +292,12 @@ For code changes, do not stop at static checks alone; perform Docker-based runti

## File map

- Connector runtime and STIX mapping: `main.py`
- Connector entrypoint: `main.py`
- Data models and STIX converter: `dep_connector/converter_to_stix.py`
- DEP API client (auth + fetch): `dep_connector/client_api.py`
- Configuration loader: `dep_connector/config_loader.py`
- Connector orchestration (run cycle): `dep_connector/connector.py`
- Package re-export: `dep_connector/__init__.py`
- Sample connector config: `config.yml.sample`
- Local development stack: `docker-compose.yml`
- Runtime image definition: `Dockerfile`
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ RUN --mount=type=cache,target=/root/.cache \
--no-install-project --no-editable
COPY pyproject.toml uv.lock ./
COPY *.py ./
COPY dep_connector/ dep_connector/
RUN --mount=type=cache,target=/root/.cache/uv \
uv sync --locked --no-editable --no-dev

Expand Down
3 changes: 3 additions & 0 deletions dep_connector/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dep_connector.connector import DepConnector

__all__ = ["DepConnector"]
92 changes: 92 additions & 0 deletions dep_connector/client_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import json
import logging
from typing import TypeAlias

import requests

logger = logging.getLogger(__name__)

JsonPrimitive: TypeAlias = str | int | float | bool | None
JsonValue: TypeAlias = JsonPrimitive | list["JsonValue"] | dict[str, "JsonValue"]
DepApiItem: TypeAlias = dict[str, JsonValue]


class DepClient:
def __init__(
self,
*,
login_endpoint: str,
api_endpoint: str,
api_key: str | None,
username: str | None,
password: str | None,
client_id: str,
dataset: str,
extended_results: bool,
) -> None:
self.login_endpoint = login_endpoint
self.api_endpoint = api_endpoint
self.api_key = api_key
self.username = username
self.password = password
self.client_id = client_id
self.dataset = dataset
self.extended_results = extended_results

def authenticate(self) -> str:
headers = {
"Content-Type": "application/x-amz-json-1.1",
"X-Amz-Target": "AWSCognitoIdentityProviderService.InitiateAuth",
}
payload = {
"AuthParameters": {"USERNAME": self.username, "PASSWORD": self.password},
"AuthFlow": "USER_PASSWORD_AUTH",
"ClientId": self.client_id,
}
response = requests.post(
self.login_endpoint,
headers=headers,
json=payload,
timeout=30,
)
response.raise_for_status()
auth_payload: dict[str, dict[str, str]] = response.json()
token = auth_payload["AuthenticationResult"]["IdToken"]
if not token:
error = "Unable to retrieve IdToken from authentication response"
raise ValueError(error)
return token

def fetch_raw(
self,
start_date: str,
end_date: str,
) -> list[DepApiItem]:
token = self.authenticate()
params: dict[str, str] = {
"ts": start_date,
"te": end_date,
"dset": self.dataset,
"full": "true",
}
if self.extended_results:
params["extended"] = "true"

headers = {
"X-Api-Key": self.api_key,
"Authorization": token,
}

response = requests.get(
self.api_endpoint,
headers=headers,
params=params,
timeout=60,
)
response.raise_for_status()
try:
payload: list[DepApiItem] = response.json()
except json.JSONDecodeError as exception:
message = "Unable to decode DEP API response"
raise ValueError(message) from exception
return payload
26 changes: 26 additions & 0 deletions dep_connector/config_loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from pathlib import Path

import yaml

_PROJECT_ROOT = Path(__file__).resolve().parent.parent


def load_config() -> dict[str, object]:
config_path = (
os.environ["OPENCTI_CONFIG_FILE"]
if "OPENCTI_CONFIG_FILE" in os.environ
else _PROJECT_ROOT / "config.yml"
)
config_path = Path(config_path)
if not config_path.exists():
return {}

with config_path.open(encoding="utf-8") as config_file:
loaded = yaml.safe_load(config_file) or {}

if not isinstance(loaded, dict):
error = "Configuration root must be a mapping"
raise TypeError(error)

return loaded
Loading
Loading