From f0f36a8b6af32ff5bd55694517521a9daf6e04fd Mon Sep 17 00:00:00 2001 From: Edoardo Rosa <6991986+notdodo@users.noreply.github.com> Date: Sun, 8 Mar 2026 22:09:51 +0100 Subject: [PATCH] enh: link sectors with countries and intrusionsets --- README.md | 21 +++++- config.yml.sample | 2 + main.py | 164 +++++++++++++++++++++++++++++++++++++++------- 3 files changed, 162 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index cca9346..765f73e 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,9 @@ The Double Extortion connector ingests ransomware and data-leak announcements pu - Authenticates against the DoubleExtortion AWS Cognito identity provider. - Collects Double Extortion announcements and models them as **Incidents**. - Creates **Organization** identities for victims. +- Optionally materializes **Intrusion Sets** from DEP actor names. +- Optionally materializes **Country** locations and links victims to them. +- Automatically links intrusion sets to sectors and sectors to countries when those entities are created. - Generates optional **Indicators** for advertised victim domains and leak hash identifiers. - Adds announcement-type labels to incidents (for example `dep:announcement-type:pii`). - Supports querying different Double Extortion Platform datasets via `DEP_DSET`. @@ -60,6 +63,19 @@ All configuration values can be supplied via the `config.yml` file or through en | `dep.enable_hash_indicator` | `DEP_ENABLE_HASH_INDICATOR` | `true` | Create a hash indicator when a hash is provided. | | `dep.skip_empty_victim` | `DEP_SKIP_EMPTY_VICTIM` | `true` | Skip items where victim is empty, `n/a`, or `none`. | | `dep.create_sector_identities` | `DEP_CREATE_SECTOR_IDENTITIES` | `true` | Create sector identities and link victims with a `part-of` relationship. | +| `dep.create_intrusion_sets` | `DEP_CREATE_INTRUSION_SETS` | `true` | Create intrusion sets from DEP actor values and link incidents with `attributed-to`. | +| `dep.create_country_locations` | `DEP_CREATE_COUNTRY_LOCATIONS` | `true` | Create country locations and link victim identities with `located-at`. | + +## Why `IntrusionSet` for DEP actor values + +DEP `actor` values are modeled as STIX `IntrusionSet` objects instead of `ThreatActor` by default. + +- DEP actor strings usually represent campaign/operator labels, not high-confidence real-world identities. +- `IntrusionSet` is a safer semantic fit for recurring malicious activity clusters. +- This avoids over-claiming attribution when source data quality is limited. +- It supports incident and targeting analysis directly through `attributed-to` (incident -> intrusion set) and `targets` (intrusion set -> sector). + +A `ThreatActor` model can be adopted later if the feed includes stronger attribution context (persona, role, motivation, sophistication). ## Docker @@ -89,7 +105,10 @@ docker run --rm \ - Incidents are created with deterministic IDs derived from DEP `hashid`, and bundles are sent with `update=True`, so repeated records update existing incidents instead of creating duplicates. - Sector names are normalized before sector-identity generation to reduce duplicates caused by inconsistent casing or whitespace in DEP data. - The API occasionally URL-encodes announcement descriptions. The connector automatically decodes the description before sending it to OpenCTI. -- Intrusion set creation is disabled by default because not every dataset represents a threat actor. If needed, adapt the logic in `DepConnector._process_item`. +- DEP actor and country values can be materialized as entities using `DEP_CREATE_INTRUSION_SETS` and `DEP_CREATE_COUNTRY_LOCATIONS`. +- DEP actor and country values are also stored in incident custom properties (`dep_actor`, `dep_country`) for source traceability. +- Cross-entity links are automatic: intrusion set -> sector (`targets`) and sector -> country (`related-to`) when both entities are present. +- Generic low-quality actor values (for example `unknown`, `anonymous`, `ransomware group`) are ignored for intrusion-set creation. - To reload the connector code in the platform, run: `docker compose build dep-connector; docker compose up -d dep-connector; docker compose logs -f dep-connector` ## License diff --git a/config.yml.sample b/config.yml.sample index 7f83967..0120bef 100644 --- a/config.yml.sample +++ b/config.yml.sample @@ -30,3 +30,5 @@ dep: enable_hash_indicator: true skip_empty_victim: true create_sector_identities: true + create_intrusion_sets: true + create_country_locations: true diff --git a/main.py b/main.py index 34a23ec..0852d16 100644 --- a/main.py +++ b/main.py @@ -38,6 +38,8 @@ class LeakRecord: victim: str | None = None sector: str | None = None + actor: str | None = None + country: str | None = None revenue: str | None = None @@ -86,16 +88,36 @@ def indicator_domain(self) -> str | None: self.site ) - @field_validator("sector") + @field_validator("sector", "actor", "country") @classmethod - def normalize_sector(cls, v: str | None) -> str | None: + def normalize_named_field(cls, v: str | None) -> str | None: if v is None: return None normalized = " ".join(v.split()).strip() - return normalized or None + if not normalized: + return None + if normalized.lower() in {"n/a", "none"}: + return None + return normalized class DepConnector: + GENERIC_ACTOR_VALUES = frozenset( + { + "unknown", + "unk", + "anonymous", + "unattributed", + "undisclosed", + "not disclosed", + "not-disclosed", + "ransomware group", + "ransomware gang", + "threat actor", + "attacker", + } + ) + def __init__(self) -> None: config = self._load_config() self.helper = pycti.OpenCTIConnectorHelper(config) @@ -199,6 +221,18 @@ def __init__(self) -> None: config, default=True, ) + self.create_intrusion_sets = pycti.get_config_variable( + "DEP_CREATE_INTRUSION_SETS", + ["dep", "create_intrusion_sets"], + config, + default=True, + ) + self.create_country_locations = pycti.get_config_variable( + "DEP_CREATE_COUNTRY_LOCATIONS", + ["dep", "create_country_locations"], + config, + default=True, + ) @staticmethod def _load_config() -> dict[str, Any]: @@ -342,6 +376,33 @@ def _create_sector_identity(self, sector: str) -> stix2.Identity: labels=[self.label_value], ) + def _create_intrusion_set(self, actor: str) -> stix2.IntrusionSet: + actor_key = actor.lower() + intrusion_set_id = ( + f"intrusion-set--{uuid5(NAMESPACE_URL, f'dep-actor:{actor_key}')}" + ) + return stix2.IntrusionSet( + id=intrusion_set_id, + name=actor, + confidence=self.confidence, + labels=[self.label_value], + created_by_ref=self.author_identity, + ) + + def _create_country_location(self, country: str) -> stix2.Location: + country_key = country.lower() + location_id = f"location--{uuid5(NAMESPACE_URL, f'dep-country:{country_key}')}" + return stix2.Location( + id=location_id, + name=country, + country=country, + confidence=self.confidence, + labels=[self.label_value], + created_by_ref=self.author_identity, + custom_properties={"x_opencti_location_type": "Country"}, + allow_custom=True, + ) + def _create_incident(self, item: LeakRecord) -> stix2.Incident: victim_name = item.victim or item.victim_domain if not victim_name: @@ -363,6 +424,15 @@ def _create_incident(self, item: LeakRecord) -> stix2.Incident: external_reference["description"] = item.ann_title # incident_id must be deterministic to allow updates incident_id = f"incident--{uuid5(NAMESPACE_URL, f'dep-announcement:{item.hashid.strip().lower()}')}" + custom_properties: dict[str, Any] = { + "incident_type": "cybercrime", + "first_seen": first_seen, + } + if item.actor: + custom_properties["dep_actor"] = item.actor + if item.country: + custom_properties["dep_country"] = item.country + return stix2.Incident( id=incident_id, name=incident_name, @@ -372,10 +442,7 @@ def _create_incident(self, item: LeakRecord) -> stix2.Incident: labels=self._build_incident_labels(item), created_by_ref=self.author_identity, external_references=[external_reference], - custom_properties={ - "incident_type": "cybercrime", - "first_seen": first_seen, - }, + custom_properties=custom_properties, ) def _build_incident_labels(self, item: LeakRecord) -> list[str]: @@ -434,6 +501,10 @@ def _detect_hash_type(hash_value: str) -> str | None: length_to_type = {32: "MD5", 40: "SHA-1", 64: "SHA-256"} return length_to_type.get(len(hash_value)) + def _is_low_quality_actor(self, actor: str) -> bool: + normalized = " ".join(actor.lower().split()) + return normalized in self.GENERIC_ACTOR_VALUES + def _build_relationship( self, relationship_type: str, @@ -465,15 +536,7 @@ def _should_skip_item(self, victim: str | None) -> bool: normalized = (victim or "").strip().lower() return normalized in {"", "n/a", "none"} - def _process_item(self, item: LeakRecord) -> None: - if self._should_skip_item(item.victim): - self.helper.log_info( - "Skipping DEP item with empty or placeholder victim value" - ) - return - victim = self._create_victim_identity(item) - incident = self._create_incident(item) - + def _build_indicators(self, item: LeakRecord) -> list[stix2.Indicator]: indicators: list[stix2.Indicator] = [] site_indicator = self._create_site_indicator(item) if site_indicator: @@ -481,11 +544,68 @@ def _process_item(self, item: LeakRecord) -> None: hash_indicator = self._create_hash_indicator(item) if hash_indicator: indicators.append(hash_indicator) + return indicators + def _build_optional_entities( + self, + item: LeakRecord, + victim: stix2.Identity | None, + incident: stix2.Incident, + ) -> list[stix2._STIXBase21]: + objects: list[stix2._STIXBase21] = [] sector_identity: stix2.Identity | None = None - sector = item.sector - if self.create_sector_identities and sector and victim: - sector_identity = self._create_sector_identity(sector) + if self.create_sector_identities and item.sector and victim: + sector_identity = self._create_sector_identity(item.sector) + if sector_identity and victim: + objects.append(sector_identity) + objects.append( + self._build_relationship("part-of", victim.id, sector_identity.id) + ) + + intrusion_set: stix2.IntrusionSet | None = None + if ( + self.create_intrusion_sets + and item.actor + and not self._is_low_quality_actor(item.actor) + ): + intrusion_set = self._create_intrusion_set(item.actor) + if intrusion_set: + objects.append(intrusion_set) + objects.append( + self._build_relationship("attributed-to", incident.id, intrusion_set.id) + ) + + country_location: stix2.Location | None = None + if self.create_country_locations and item.country and victim: + country_location = self._create_country_location(item.country) + if country_location and victim: + objects.append(country_location) + objects.append( + self._build_relationship("located-at", victim.id, country_location.id) + ) + if intrusion_set and sector_identity: + objects.append( + self._build_relationship( + "targets", intrusion_set.id, sector_identity.id + ) + ) + if sector_identity and country_location: + objects.append( + self._build_relationship( + "related-to", sector_identity.id, country_location.id + ) + ) + return objects + + def _process_item(self, item: LeakRecord) -> None: + if self._should_skip_item(item.victim): + self.helper.log_info( + "Skipping DEP item with empty or placeholder victim value" + ) + return + victim = self._create_victim_identity(item) + incident = self._create_incident(item) + indicators = self._build_indicators(item) objects: list[stix2._STIXBase21] = [self.author_identity] if victim: @@ -493,11 +613,7 @@ def _process_item(self, item: LeakRecord) -> None: objects.append(incident) if victim: objects.append(self._build_relationship("targets", incident.id, victim.id)) - if sector_identity and victim: - objects.append(sector_identity) - objects.append( - self._build_relationship("part-of", victim.id, sector_identity.id) - ) + objects.extend(self._build_optional_entities(item, victim, incident)) for indicator in indicators: objects.append(indicator) objects.append(