diff --git a/AGENTS.md b/AGENTS.md index a10eb35..26e2f89 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -9,7 +9,7 @@ It should track the code in `main.py`, not stale assumptions from earlier iterat - This is an OpenCTI external-import connector for Double Extortion Platform (DEP) announcements. - The connector authenticates against DEP AWS Cognito, fetches announcement records from the DEP REST API, converts them to STIX 2.1, and sends bundles to OpenCTI with `update=True`. -- The connector scope is `incident,identity,indicator`. +- The connector scope is `report,incident,identity,indicator`. - The implementation is concentrated in a single runtime file: `main.py`. ## Runtime and configuration truths @@ -54,7 +54,7 @@ It should track the code in `main.py`, not stale assumptions from earlier iterat - `sector`, `actor`, and `country` are whitespace-normalized; empty strings, `n/a`, and `none` become `None`. - Indicator domain extraction prefers `victimDomain`, then falls back to `site`. - Domain normalization uses `urlsplit`, extracts the hostname, and lowercases it. -- `annDescription` is URL-decoded with `urllib.parse.unquote` before the incident is created. +- `annDescription` is URL-decoded with `urllib.parse.unquote` before the report or incident is created. ## Filtering rules @@ -80,14 +80,41 @@ It should track the code in `main.py`, not stale assumptions from earlier iterat - type: `Identity` - identity_class: `organization` - contact: `https://doubleextortion.com/` -- Every emitted object and relationship carries the label `DigIntLab`. +- Every emitted object and relationship created from DEP content carries the label `DigIntLab`. - Confidence is consistently taken from `DEP_CONFIDENCE`. - Bundles are deduplicated by STIX ID before sending to OpenCTI. - Prefer deterministic IDs for DEP-derived entities and relationships to keep re-imports idempotent. ## Data model mappings -### Incident +### Primary object + +- Controlled by `DEP_PRIMARY_OBJECT` (default: `report`). +- `report`: each announcement is wrapped in a STIX `Report` container whose `object_refs` includes all correlated entities and relationships. This is the default and preferred mode for Knowledge Graph analysis. +- `incident`: each announcement is modeled as a standalone STIX `Incident` with explicit relationship edges (`targets`, `attributed-to`, `indicates`). + +### Report (default mode) + +- One report is created per DEP announcement. +- The report is always created, even when no victim identity is created. +- Deterministic report ID is based on normalized DEP `hashid`: + - `report--uuid5(NAMESPACE_URL, "dep-announcement:")` +- Report name format: + - `DEP announcement - ` + - fallback to `victimDomain` + - fallback to `Unknown Victim` +- `published` is derived from the DEP `date` at `00:00:00Z`. +- `report_types`: `["threat-report"]` +- Report custom properties (when present): + - `dep_actor` + - `dep_country` +- Report labels always include `DigIntLab`, plus one label per announcement type: + - `dep:announcement-type:` +- Report external reference prefers `annLink`; if absent, it falls back to `site`. +- `annTitle` is attached as the external reference description when present. +- `object_refs` contains all objects in the bundle (author identity, victim, indicators, intrusion set, country, sector, and all relationships between them). + +### Incident (incident mode) - One incident is created per DEP announcement. - The incident is always created, even when no victim identity is created. @@ -172,14 +199,30 @@ It should track the code in `main.py`, not stale assumptions from earlier iterat - pattern: `[file:hashes.'' = '']` - Indicator IDs are deterministic because they are generated from the STIX pattern. - Indicator `valid_from` uses current UTC processing time, so timestamps are not deterministic even though IDs are. -- Indicators are linked to incidents, not to victims. +- Indicators are also linked to the victim with `related-to`. +- In incident mode, indicators are linked to the incident with `indicates`. +- In report mode, indicators are included in the report's `object_refs` and can also have explicit `related-to -> victim` edges. ## Relationships emitted +### In report mode (default) + +- `victim -> sector` with `part-of` +- `victim -> country` with `located-at` +- `indicator -> victim` with `related-to` +- `intrusion-set -> sector` with `targets` +- `intrusion-set -> country` with `targets` +- `sector -> country` with `related-to` + +All of the above, plus the victim, indicators, and intrusion set, are referenced in the Report's `object_refs`. There is no `attributed-to` edge from the Report itself because the Report is a container, not a relationship endpoint. + +### In incident mode + - `incident -> victim` with `targets` - `victim -> sector` with `part-of` - `incident -> intrusion-set` with `attributed-to` - `victim -> country` with `located-at` +- `indicator -> victim` with `related-to` - `intrusion-set -> sector` with `targets` - `intrusion-set -> country` with `targets` - `sector -> country` with `related-to` @@ -198,6 +241,7 @@ These links are created automatically when both related objects exist. There are - `DEP_CREATE_INTRUSION_SETS` - `DEP_CREATE_COUNTRY_LOCATIONS` - Important non-boolean knobs: + - `DEP_PRIMARY_OBJECT` (default: `report`; valid values: `report`, `incident`) - `DEP_DSET` - `DEP_LOOKBACK_DAYS` - `DEP_OVERLAP_HOURS` @@ -217,7 +261,7 @@ These links are created automatically when both related objects exist. There are - Keep optional enrichment behind the existing feature flags. - Do not reintroduce removed compatibility flags for cross-entity relationships. - If you change modeling, update `README.md`, `config.yml.sample`, and `AGENTS.md` together. -- If you touch incident or indicator generation, verify idempotency assumptions still hold under `update=True`. +- If you touch report, incident, or indicator generation, verify idempotency assumptions still hold under `update=True`. ## Validation and local workflow @@ -234,16 +278,14 @@ These links are created automatically when both related objects exist. There are - Run type checks: - `task type-check` - Main quality gate: - - `task check` -- Additional syntax check: - - `python -m compileall main.py` + - `task format check type-check` - Docker-based runtime validation can be satisfied by either: - building and running the connector image directly - using `docker compose up` with the local stack when broader integration checks are needed - Never start the connector before the OpenCTI API/platform is ready and reachable. - During Docker-based validation, wait for OpenCTI readiness first, then start the connector. -`task check` is the canonical combined gate from `Taskfile.yml` because it runs format check, lint, and mypy. +Use `task format check type-check` for complete local checks before considering code changes done. There is a `task test` target, but there is currently no first-party test suite in this repository. Do not assume automated test coverage exists. For code changes, do not stop at static checks alone; perform Docker-based runtime validation as well. diff --git a/DOCKERHUB.md b/DOCKERHUB.md index cae9316..1994697 100644 --- a/DOCKERHUB.md +++ b/DOCKERHUB.md @@ -9,11 +9,12 @@ An [OpenCTI](https://github.com/OpenCTI-Platform/OpenCTI) external-import connec ## What it does - Authenticates against the DEP AWS Cognito identity provider -- Polls the DEP REST API on a configurable interval and maps each announcement to an OpenCTI **Incident** +- Polls the DEP REST API on a configurable interval and maps each announcement to an OpenCTI **Report** by default, or an **Incident** when `DEP_PRIMARY_OBJECT=incident` - Creates **Organization** identities for victim companies - Optionally creates **Sector** identities and links victims via a `part-of` relationship - Optionally generates **Indicators** for victim domains and leak hash identifiers -- Attaches announcement-type labels (e.g. `dep:announcement-type:pii`) to incidents +- Links generated indicators to the victim with `related-to` +- Attaches announcement-type labels (e.g. `dep:announcement-type:pii`) to the primary object - Maintains connector state with a configurable overlap window to capture late DEP updates --- @@ -37,31 +38,32 @@ All values can be set via environment variables (which take precedence) or via a ### Required -| Environment variable | Description | -|---|---| -| `OPENCTI_URL` | URL of your OpenCTI platform | -| `OPENCTI_TOKEN` | OpenCTI API token | -| `DEP_USERNAME` | DEP portal username | -| `DEP_PASSWORD` | DEP portal password | -| `DEP_API_KEY` | API key issued by DEP | -| `DEP_CLIENT_ID` | AWS Cognito App Client ID | +| Environment variable | Description | +| -------------------- | ---------------------------- | +| `OPENCTI_URL` | URL of your OpenCTI platform | +| `OPENCTI_TOKEN` | OpenCTI API token | +| `DEP_USERNAME` | DEP portal username | +| `DEP_PASSWORD` | DEP portal password | +| `DEP_API_KEY` | API key issued by DEP | +| `DEP_CLIENT_ID` | AWS Cognito App Client ID | ### Optional -| Environment variable | Default | Description | -|---|---|---| -| `CONNECTOR_RUN_INTERVAL` | `3600` | Polling interval in seconds | -| `DEP_CONFIDENCE` | `70` | Confidence score on generated STIX objects | -| `DEP_LOOKBACK_DAYS` | `7` | Days to look back on first run | -| `DEP_OVERLAP_HOURS` | `72` | Overlap hours from previous run to catch late updates | -| `DEP_DSET` | `ext` | Dataset to query (e.g. `ext`, `sanctions`) | -| `DEP_EXTENDED_RESULTS` | `true` | Request extended leak information | -| `DEP_ENABLE_SITE_INDICATOR` | `true` | Create a domain indicator per victim | -| `DEP_ENABLE_HASH_INDICATOR` | `true` | Create a hash indicator when a hash is provided | -| `DEP_SKIP_EMPTY_VICTIM` | `true` | Skip items where victim name is empty or n/a | -| `DEP_CREATE_SECTOR_IDENTITIES` | `true` | Create sector identities and link victims | -| `DEP_LOGIN_ENDPOINT` | `https://cognito-idp.eu-west-1.amazonaws.com/` | Cognito login endpoint | -| `DEP_API_ENDPOINT` | `https://api.eu-ep1.doubleextortion.com/v1/dbtr/privlist` | DEP REST endpoint | +| Environment variable | Default | Description | +| ------------------------------ | --------------------------------------------------------- | ----------------------------------------------------- | +| `CONNECTOR_RUN_INTERVAL` | `3600` | Polling interval in seconds | +| `DEP_CONFIDENCE` | `70` | Confidence score on generated STIX objects | +| `DEP_LOOKBACK_DAYS` | `7` | Days to look back on first run | +| `DEP_OVERLAP_HOURS` | `72` | Overlap hours from previous run to catch late updates | +| `DEP_DSET` | `ext` | Dataset to query (e.g. `ext`, `sanctions`) | +| `DEP_PRIMARY_OBJECT` | `report` | Primary STIX object to emit: `report` or `incident` | +| `DEP_EXTENDED_RESULTS` | `true` | Request extended leak information | +| `DEP_ENABLE_SITE_INDICATOR` | `true` | Create a domain indicator per victim | +| `DEP_ENABLE_HASH_INDICATOR` | `true` | Create a hash indicator when a hash is provided | +| `DEP_SKIP_EMPTY_VICTIM` | `true` | Skip items where victim name is empty or n/a | +| `DEP_CREATE_SECTOR_IDENTITIES` | `true` | Create sector identities and link victims | +| `DEP_LOGIN_ENDPOINT` | `https://cognito-idp.eu-west-1.amazonaws.com/` | Cognito login endpoint | +| `DEP_API_ENDPOINT` | `https://api.eu-ep1.doubleextortion.com/v1/dbtr/privlist` | DEP REST endpoint | --- diff --git a/README.md b/README.md index e099908..39de10f 100644 --- a/README.md +++ b/README.md @@ -7,16 +7,16 @@ The Double Extortion connector ingests ransomware and data-leak announcements pu ## Features - Authenticates against the DoubleExtortion AWS Cognito identity provider. -- Collects Double Extortion announcements and models them as **Incidents**. +- Collects Double Extortion announcements and models them as **Reports** (default) or **Incidents** (configurable via `DEP_PRIMARY_OBJECT`). - Creates **Organization** identities for victims. - Optionally materializes **Intrusion Sets** from DEP actor names. - Optionally materializes **Country** locations and links victims to them. - Automatically links intrusion sets to sectors, intrusion sets to countries, and sectors to countries when those entities are created. - Generates optional **Indicators** for advertised victim domains and leak hash identifiers. -- Adds announcement-type labels to incidents (for example `dep:announcement-type:pii`). +- Adds announcement-type labels to reports or incidents (for example `dep:announcement-type:pii`). - Supports querying different Double Extortion Platform datasets via `DEP_DSET`. - Maintains connector state with a configurable overlap window to capture late DEP updates. -- Uses stable incident identifiers (based on DEP `hashid`) so refreshed DEP records update existing incidents. +- Uses stable identifiers (based on DEP `hashid`) for both reports and incidents so refreshed DEP records update existing objects. Screenshot 2025-11-30 114440 @@ -49,22 +49,23 @@ All configuration values can be supplied via the `config.yml` file or through en ### Optional values -| YAML path | Environment variable | Default | Description | -| --------------------------- | --------------------------- | --------------------------------------------------------- | ----------------------------------------------------------------------------------- | -| `connector.interval` | `CONNECTOR_RUN_INTERVAL` | `3600` | Interval in seconds between executions. | -| `dep.confidence` | `DEP_CONFIDENCE` | `70` | Confidence score attached to generated STIX objects. | -| `dep.login_endpoint` | `DEP_LOGIN_ENDPOINT` | `https://cognito-idp.eu-west-1.amazonaws.com/` | Cognito login endpoint. | -| `dep.api_endpoint` | `DEP_API_ENDPOINT` | `https://api.eu-ep1.doubleextortion.com/v1/dbtr/privlist` | REST endpoint for announcements. | -| `dep.lookback_days` | `DEP_LOOKBACK_DAYS` | `7` | Days to look back on the first run. | -| `dep.overlap_hours` | `DEP_OVERLAP_HOURS` | `72` | Hours to overlap from the previous `last_run` when fetching, to catch late updates. | -| `dep.extended_results` | `DEP_EXTENDED_RESULTS` | `true` | Request extended leak information. | -| `dep.dset` | `DEP_DSET` | `ext` | Dataset to query (for example `ext`, `sanctions`). | -| `dep.enable_site_indicator` | `DEP_ENABLE_SITE_INDICATOR` | `true` | Create a domain indicator per victim. | -| `dep.enable_hash_indicator` | `DEP_ENABLE_HASH_INDICATOR` | `true` | Create a hash indicator when a hash is provided. | -| `dep.skip_empty_victim` | `DEP_SKIP_EMPTY_VICTIM` | `true` | Skip items where victim is empty, `n/a`, or `none`. | -| `dep.create_sector_identities` | `DEP_CREATE_SECTOR_IDENTITIES` | `true` | Create sector identities and link victims with a `part-of` relationship. | -| `dep.create_intrusion_sets` | `DEP_CREATE_INTRUSION_SETS` | `true` | Create intrusion sets from DEP actor values and link incidents with `attributed-to`. | -| `dep.create_country_locations` | `DEP_CREATE_COUNTRY_LOCATIONS` | `true` | Create country locations and link victim identities with `located-at`. | +| YAML path | Environment variable | Default | Description | +| ------------------------------ | ------------------------------ | --------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------- | +| `connector.interval` | `CONNECTOR_RUN_INTERVAL` | `3600` | Interval in seconds between executions. | +| `dep.confidence` | `DEP_CONFIDENCE` | `70` | Confidence score attached to generated STIX objects. | +| `dep.login_endpoint` | `DEP_LOGIN_ENDPOINT` | `https://cognito-idp.eu-west-1.amazonaws.com/` | Cognito login endpoint. | +| `dep.api_endpoint` | `DEP_API_ENDPOINT` | `https://api.eu-ep1.doubleextortion.com/v1/dbtr/privlist` | REST endpoint for announcements. | +| `dep.lookback_days` | `DEP_LOOKBACK_DAYS` | `7` | Days to look back on the first run. | +| `dep.overlap_hours` | `DEP_OVERLAP_HOURS` | `72` | Hours to overlap from the previous `last_run` when fetching, to catch late updates. | +| `dep.extended_results` | `DEP_EXTENDED_RESULTS` | `true` | Request extended leak information. | +| `dep.dset` | `DEP_DSET` | `ext` | Dataset to query (for example `ext`, `sanctions`). | +| `dep.enable_site_indicator` | `DEP_ENABLE_SITE_INDICATOR` | `true` | Create a domain indicator per victim. | +| `dep.enable_hash_indicator` | `DEP_ENABLE_HASH_INDICATOR` | `true` | Create a hash indicator when a hash is provided. | +| `dep.skip_empty_victim` | `DEP_SKIP_EMPTY_VICTIM` | `true` | Skip items where victim is empty, `n/a`, or `none`. | +| `dep.create_sector_identities` | `DEP_CREATE_SECTOR_IDENTITIES` | `true` | Create sector identities and link victims with a `part-of` relationship. | +| `dep.create_intrusion_sets` | `DEP_CREATE_INTRUSION_SETS` | `true` | Create intrusion sets from DEP actor values and link incidents with `attributed-to` (incident mode only). | +| `dep.primary_object` | `DEP_PRIMARY_OBJECT` | `report` | Primary object: `report` wraps all objects in a STIX Report container; `incident` creates a standalone Incident object. | +| `dep.create_country_locations` | `DEP_CREATE_COUNTRY_LOCATIONS` | `true` | Create country locations and link victim identities with `located-at`. | ## Why `IntrusionSet` for DEP actor values @@ -73,7 +74,7 @@ DEP `actor` values are modeled as STIX `IntrusionSet` objects instead of `Threat - DEP actor strings usually represent campaign/operator labels, not high-confidence real-world identities. - `IntrusionSet` is a safer semantic fit for recurring malicious activity clusters. - This avoids over-claiming attribution when source data quality is limited. -- It supports incident and targeting analysis directly through `attributed-to` (incident -> intrusion set) and `targets` links from intrusion sets to sectors and countries. +- It supports targeting analysis directly through `attributed-to` (incident -> intrusion set, in incident mode) and `targets` links from intrusion sets to sectors and countries. A `ThreatActor` model can be adopted later if the feed includes stronger attribution context (persona, role, motivation, sophistication). @@ -102,11 +103,14 @@ docker run --rm \ - The project uses [**go-task**](https://github.com/go-task/task) with a `Taskfile.yml` to streamline common development commands. - The project uses [**uv**](https://docs.astral.sh/uv/) as the Python virtual environment and dependency management tool. - The connector stores `last_run` in OpenCTI worker state and fetches with an overlap (`DEP_OVERLAP_HOURS`) to catch delayed DEP changes. Delete the state in OpenCTI to force a full backfill window from `DEP_LOOKBACK_DAYS`. -- Incidents are created with deterministic IDs derived from DEP `hashid`, and bundles are sent with `update=True`, so repeated records update existing incidents instead of creating duplicates. +- Reports and incidents are created with deterministic IDs derived from DEP `hashid`, and bundles are sent with `update=True`, so repeated records update existing objects instead of creating duplicates. +- In `report` mode each announcement is wrapped in a STIX `Report` object whose `object_refs` contains all correlated entities (victim, indicators, intrusion set, country, sector and their relationships). This produces a pre-correlated Knowledge Graph view directly in OpenCTI, consistent with most other connectors and feeds. +- In `incident` mode the announcement is modeled as a STIX `Incident` with explicit `targets`, `attributed-to`, and `indicates` relationships. - Sector names are normalized before sector-identity generation to reduce duplicates caused by inconsistent casing or whitespace in DEP data. - The API occasionally URL-encodes announcement descriptions. The connector automatically decodes the description before sending it to OpenCTI. - DEP actor and country values can be materialized as entities using `DEP_CREATE_INTRUSION_SETS` and `DEP_CREATE_COUNTRY_LOCATIONS`. -- DEP actor and country values are also stored in incident custom properties (`dep_actor`, `dep_country`) for source traceability. +- DEP actor and country values are also stored in the primary object custom properties (`dep_actor`, `dep_country`) for source traceability. +- Generated indicators are also linked to the victim with `related-to` so those indicator nodes are connected in the Knowledge Graph. - Cross-entity links are automatic: intrusion set -> sector (`targets`), intrusion set -> country (`targets`), and sector -> country (`related-to`) when both entities are present. - Generic low-quality actor values (for example `unknown`, `anonymous`, `ransomware group`) are ignored for intrusion-set creation. - To reload the connector code in the platform, run: `docker compose build dep-connector; docker compose up -d dep-connector; docker compose logs -f dep-connector` diff --git a/config.yml.sample b/config.yml.sample index 0120bef..7fb1478 100644 --- a/config.yml.sample +++ b/config.yml.sample @@ -6,7 +6,7 @@ connector: id: ChangeMe type: EXTERNAL_IMPORT name: DEP Connector - scope: incident,identity,indicator + scope: report,incident,identity,indicator log_level: info interval: 3600 # In seconds @@ -32,3 +32,4 @@ dep: create_sector_identities: true create_intrusion_sets: true create_country_locations: true + primary_object: report # "report" (default) or "incident" diff --git a/main.py b/main.py index 9d567c8..52d4b1f 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,5 @@ import json import os -import time from datetime import UTC, datetime, timedelta from datetime import date as dt_date from enum import StrEnum @@ -14,7 +13,8 @@ import yaml from pydantic import ConfigDict, Field, field_validator from pydantic.dataclasses import dataclass -from stix2 import v21 as stix2 # type: ignore[import-untyped] +from stix2 import TLP_AMBER # type: ignore[import-untyped] +from stix2 import v21 as stix2 class AnnouncementType(StrEnum): @@ -31,6 +31,11 @@ class AnnouncementType(StrEnum): SENSITIVES = "SENSITIVES" +class PrimaryObject(StrEnum): + REPORT = "report" + INCIDENT = "incident" + + @dataclass(config=ConfigDict(extra="allow", frozen=True)) class LeakRecord: date: dt_date @@ -130,7 +135,9 @@ def __init__(self) -> None: description="We Track and Monitor the Cyber Space", contact_information="https://doubleextortion.com/", identity_class="organization", + object_marking_refs=[TLP_AMBER], ) + self._current_work_id: str | None = None self.interval = pycti.get_config_variable( "CONNECTOR_RUN_INTERVAL", @@ -233,6 +240,22 @@ def __init__(self) -> None: config, default=True, ) + primary_object_value = str( + pycti.get_config_variable( + "DEP_PRIMARY_OBJECT", + ["dep", "primary_object"], + config, + default=PrimaryObject.REPORT.value, + ) + ).strip() + try: + self.primary_object = PrimaryObject(primary_object_value.lower()) + except ValueError as exc: + error = ( + "DEP primary object must be one of: report, incident " + f"(got: {primary_object_value})" + ) + raise ValueError(error) from exc @staticmethod def _load_config() -> dict[str, Any]: @@ -363,6 +386,7 @@ def _create_victim_identity(self, item: LeakRecord) -> stix2.Identity | None: labels=[self.label_value], created_by_ref=self.author_identity, external_references=external_references or None, + object_marking_refs=[TLP_AMBER], ) def _create_sector_identity(self, sector: str) -> stix2.Identity: @@ -374,6 +398,7 @@ def _create_sector_identity(self, sector: str) -> stix2.Identity: created_by_ref=self.author_identity, confidence=self.confidence, labels=[self.label_value], + object_marking_refs=[TLP_AMBER], ) def _create_intrusion_set(self, actor: str) -> stix2.IntrusionSet: @@ -387,6 +412,7 @@ def _create_intrusion_set(self, actor: str) -> stix2.IntrusionSet: confidence=self.confidence, labels=[self.label_value], created_by_ref=self.author_identity, + object_marking_refs=[TLP_AMBER], ) def _create_country_location(self, country: str) -> stix2.Location: @@ -399,20 +425,25 @@ def _create_country_location(self, country: str) -> stix2.Location: confidence=self.confidence, labels=[self.label_value], created_by_ref=self.author_identity, + object_marking_refs=[TLP_AMBER], custom_properties={"x_opencti_location_type": "Country"}, allow_custom=True, ) - def _create_incident(self, item: LeakRecord) -> stix2.Incident: - victim_name = item.victim or item.victim_domain - if not victim_name: - victim_name = "Unknown Victim" - incident_name = f"DEP announcement - {victim_name}" - description = item.ann_description - if description: - description = unquote(description) - first_seen = datetime.combine(item.date, datetime.min.time(), tzinfo=UTC) - external_reference = {"source_name": "dep"} + @staticmethod + def _build_primary_name(item: LeakRecord) -> str: + victim_name = item.victim or item.victim_domain or "Unknown Victim" + return f"DEP announcement - {victim_name}" + + @staticmethod + def _build_primary_description(item: LeakRecord) -> str | None: + if item.ann_description: + return unquote(item.ann_description) + return None + + @staticmethod + def _build_primary_external_reference(item: LeakRecord) -> dict[str, Any]: + external_reference: dict[str, Any] = {"source_name": "dep"} if item.ann_link: external_reference["url"] = item.ann_link elif item.site: @@ -422,16 +453,29 @@ def _create_incident(self, item: LeakRecord) -> stix2.Incident: ) if item.ann_title: external_reference["description"] = item.ann_title + return external_reference + + @staticmethod + def _build_primary_custom_properties(item: LeakRecord) -> dict[str, Any]: + custom_properties: dict[str, Any] = {} + if item.actor: + custom_properties["dep_actor"] = item.actor + if item.country: + custom_properties["dep_country"] = item.country + return custom_properties + + def _create_incident(self, item: LeakRecord) -> stix2.Incident: + incident_name = self._build_primary_name(item) + description = self._build_primary_description(item) + first_seen = datetime.combine(item.date, datetime.min.time(), tzinfo=UTC) + external_reference = self._build_primary_external_reference(item) # incident_id must be deterministic to allow updates incident_id = f"incident--{uuid5(NAMESPACE_URL, f'dep-announcement:{item.hashid.strip().lower()}')}" - custom_properties: dict[str, Any] = { + custom_properties = { "incident_type": "cybercrime", "first_seen": first_seen, + **self._build_primary_custom_properties(item), } - if item.actor: - custom_properties["dep_actor"] = item.actor - if item.country: - custom_properties["dep_country"] = item.country return stix2.Incident( id=incident_id, @@ -439,13 +483,40 @@ def _create_incident(self, item: LeakRecord) -> stix2.Incident: description=description, created=first_seen, confidence=self.confidence, - labels=self._build_incident_labels(item), + labels=self._build_labels(item), created_by_ref=self.author_identity, external_references=[external_reference], + object_marking_refs=[TLP_AMBER], custom_properties=custom_properties, ) - def _build_incident_labels(self, item: LeakRecord) -> list[str]: + def _create_report(self, item: LeakRecord, object_refs: list[str]) -> stix2.Report: + report_name = self._build_primary_name(item) + description = self._build_primary_description(item) + published = datetime.combine(item.date, datetime.min.time(), tzinfo=UTC) + external_reference = self._build_primary_external_reference(item) + # report_id must be deterministic to allow updates + report_id = f"report--{uuid5(NAMESPACE_URL, f'dep-announcement:{item.hashid.strip().lower()}')}" + custom_properties = self._build_primary_custom_properties(item) + + kwargs: dict[str, Any] = { + "id": report_id, + "name": report_name, + "description": description, + "published": published, + "report_types": ["threat-report"], + "confidence": self.confidence, + "labels": self._build_labels(item), + "created_by_ref": self.author_identity, + "external_references": [external_reference], + "object_refs": object_refs, + "object_marking_refs": [TLP_AMBER], + } + if custom_properties: + kwargs["custom_properties"] = custom_properties + return stix2.Report(**kwargs) + + def _build_labels(self, item: LeakRecord) -> list[str]: labels = {self.label_value} labels.update( f"dep:announcement-type:{announcement_type.value.lower()}" @@ -471,6 +542,7 @@ def _create_site_indicator(self, item: LeakRecord) -> stix2.Indicator | None: confidence=self.confidence, labels=[self.label_value], created_by_ref=self.author_identity, + object_marking_refs=[TLP_AMBER], ) def _create_hash_indicator(self, item: LeakRecord) -> stix2.Indicator | None: @@ -494,6 +566,7 @@ def _create_hash_indicator(self, item: LeakRecord) -> stix2.Indicator | None: confidence=self.confidence, labels=[self.label_value], created_by_ref=self.author_identity, + object_marking_refs=[TLP_AMBER], ) @staticmethod @@ -521,6 +594,7 @@ def _build_relationship( created_by_ref=self.author_identity, confidence=self.confidence, labels=[self.label_value], + object_marking_refs=[TLP_AMBER], ) def _send_objects(self, objects: list[stix2._STIXBase21]) -> None: @@ -528,7 +602,12 @@ def _send_objects(self, objects: list[stix2._STIXBase21]) -> None: return deduped = {obj.id: obj for obj in objects if getattr(obj, "id", None)} bundle = stix2.Bundle(objects=list(deduped.values()), allow_custom=True) - self.helper.send_stix2_bundle(bundle.serialize(), update=True) + self.helper.send_stix2_bundle( + bundle.serialize(), + update=True, + work_id=self._current_work_id, + cleanup_inconsistent_bundle=True, + ) def _should_skip_item(self, victim: str | None) -> bool: if not self.skip_empty_victim: @@ -546,11 +625,50 @@ def _build_indicators(self, item: LeakRecord) -> list[stix2.Indicator]: indicators.append(hash_indicator) return indicators + def _build_indicator_victim_relationships( + self, + indicators: list[stix2.Indicator], + victim: stix2.Identity | None, + ) -> list[stix2.Relationship]: + if victim is None: + return [] + return [ + self._build_relationship("related-to", indicator.id, victim.id) + for indicator in indicators + ] + + def _build_cross_entity_relationships( + self, + intrusion_set: stix2.IntrusionSet | None, + sector_identity: stix2.Identity | None, + country_location: stix2.Location | None, + ) -> list[stix2._STIXBase21]: + objects: list[stix2._STIXBase21] = [] + if intrusion_set and sector_identity: + objects.append( + self._build_relationship( + "targets", intrusion_set.id, sector_identity.id + ) + ) + if intrusion_set and country_location: + objects.append( + self._build_relationship( + "targets", intrusion_set.id, country_location.id + ) + ) + if sector_identity and country_location: + objects.append( + self._build_relationship( + "related-to", sector_identity.id, country_location.id + ) + ) + return objects + def _build_optional_entities( self, item: LeakRecord, victim: stix2.Identity | None, - incident: stix2.Incident, + incident_id: str | None = None, ) -> list[stix2._STIXBase21]: objects: list[stix2._STIXBase21] = [] sector_identity: stix2.Identity | None = None @@ -571,9 +689,12 @@ def _build_optional_entities( intrusion_set = self._create_intrusion_set(item.actor) if intrusion_set: objects.append(intrusion_set) - objects.append( - self._build_relationship("attributed-to", incident.id, intrusion_set.id) - ) + if incident_id is not None: + objects.append( + self._build_relationship( + "attributed-to", incident_id, intrusion_set.id + ) + ) country_location: stix2.Location | None = None if self.create_country_locations and item.country and victim: @@ -583,26 +704,28 @@ def _build_optional_entities( objects.append( self._build_relationship("located-at", victim.id, country_location.id) ) - if intrusion_set and sector_identity: - objects.append( - self._build_relationship( - "targets", intrusion_set.id, sector_identity.id - ) - ) - if intrusion_set and country_location: - objects.append( - self._build_relationship( - "targets", intrusion_set.id, country_location.id - ) - ) - if sector_identity and country_location: - objects.append( - self._build_relationship( - "related-to", sector_identity.id, country_location.id - ) + objects.extend( + self._build_cross_entity_relationships( + intrusion_set, sector_identity, country_location ) + ) return objects + def _build_content( + self, + item: LeakRecord, + victim: stix2.Identity | None, + indicators: list[stix2.Indicator], + incident_id: str | None = None, + ) -> list[stix2._STIXBase21]: + content: list[stix2._STIXBase21] = [self.author_identity] + if victim: + content.append(victim) + content.extend(self._build_optional_entities(item, victim, incident_id)) + content.extend(indicators) + content.extend(self._build_indicator_victim_relationships(indicators, victim)) + return content + def _process_item(self, item: LeakRecord) -> None: if self._should_skip_item(item.victim): self.helper.log_info( @@ -610,23 +733,40 @@ def _process_item(self, item: LeakRecord) -> None: ) return victim = self._create_victim_identity(item) - incident = self._create_incident(item) indicators = self._build_indicators(item) + if self.primary_object is PrimaryObject.INCIDENT: + self._process_item_as_incident(item, victim, indicators) + else: + self._process_item_as_report(item, victim, indicators) - objects: list[stix2._STIXBase21] = [self.author_identity] - if victim: - objects.append(victim) + def _process_item_as_incident( + self, + item: LeakRecord, + victim: stix2.Identity | None, + indicators: list[stix2.Indicator], + ) -> None: + incident = self._create_incident(item) + objects = self._build_content(item, victim, indicators, incident.id) objects.append(incident) if victim: objects.append(self._build_relationship("targets", incident.id, victim.id)) - objects.extend(self._build_optional_entities(item, victim, incident)) - for indicator in indicators: - objects.append(indicator) - objects.append( - self._build_relationship("indicates", indicator.id, incident.id) - ) + objects.extend( + self._build_relationship("indicates", indicator.id, incident.id) + for indicator in indicators + ) self._send_objects(objects) + def _process_item_as_report( + self, + item: LeakRecord, + victim: stix2.Identity | None, + indicators: list[stix2.Indicator], + ) -> None: + content = self._build_content(item, victim, indicators) + object_refs = [obj.id for obj in content if getattr(obj, "id", None)] + report = self._create_report(item, object_refs) + self._send_objects([*content, report]) + def _run_cycle(self) -> None: now = datetime.now(UTC) start = now - timedelta(days=self.lookback_days) @@ -653,31 +793,43 @@ def _run_cycle(self) -> None: f"(overlap: {self.overlap_hours}h)" ) + self._current_work_id = self.helper.api.work.initiate_work( + self.helper.connect_id, + f"DEP connector - {now.strftime('%Y-%m-%d %H:%M:%S')} UTC", + ) try: - items = self._fetch_data(start, end) - except Exception as error: # pylint: disable=broad-except - self.helper.log_error(f"Failed to fetch DEP data: {error}") - return - - self.helper.log_info(f"Received {len(items)} entries from DEP API") - - for item in items: try: - self._process_item(item) + items = self._fetch_data(start, end) except Exception as error: # pylint: disable=broad-except - self.helper.log_error( - f"Failed to process DEP item for victim {item.victim}: {error}" - ) + self.helper.log_error(f"Failed to fetch DEP data: {error}") + return - self.helper.log_info("Persisting connector state") - self.helper.set_state({"last_run": end.isoformat()}) - self.helper.log_info("DEP run completed") + self.helper.log_info(f"Received {len(items)} entries from DEP API") + + for item in items: + try: + self._process_item(item) + except Exception as error: # pylint: disable=broad-except + self.helper.log_error( + f"Failed to process DEP item for victim {item.victim}: {error}" + ) + + self.helper.log_info("Persisting connector state") + self.helper.set_state({"last_run": end.isoformat()}) + self.helper.log_info("DEP run completed") + finally: + self.helper.api.work.to_processed( + self._current_work_id, + f"DEP connector run completed, last_run: {end.isoformat()}", + ) + self._current_work_id = None def run(self) -> None: self.helper.log_info("Starting DEP connector") - while True: - self._run_cycle() - time.sleep(self.interval) + self.helper.schedule_iso( + message_callback=self._run_cycle, + duration_period=f"PT{self.interval}S", + ) if __name__ == "__main__": diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 8419a6f..0000000 --- a/requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -pycti>=6.8.13 -pyyaml>=6.0.3 -requests>=2.32.5 -stix2>=3.0.1 -pydantic>=2.12.5 \ No newline at end of file