Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ The Double Extortion connector ingests ransomware and data-leak announcements pu
- Authenticates against the DoubleExtortion AWS Cognito identity provider.
- Collects Double Extortion announcements and models them as **Incidents**.
- Creates **Organization** identities for victims.
- Optionally materializes **Intrusion Sets** from DEP actor names.
- Optionally materializes **Country** locations and links victims to them.
- Automatically links intrusion sets to sectors and sectors to countries when those entities are created.
- Generates optional **Indicators** for advertised victim domains and leak hash identifiers.
- Adds announcement-type labels to incidents (for example `dep:announcement-type:pii`).
- Supports querying different Double Extortion Platform datasets via `DEP_DSET`.
Expand Down Expand Up @@ -60,6 +63,19 @@ All configuration values can be supplied via the `config.yml` file or through en
| `dep.enable_hash_indicator` | `DEP_ENABLE_HASH_INDICATOR` | `true` | Create a hash indicator when a hash is provided. |
| `dep.skip_empty_victim` | `DEP_SKIP_EMPTY_VICTIM` | `true` | Skip items where victim is empty, `n/a`, or `none`. |
| `dep.create_sector_identities` | `DEP_CREATE_SECTOR_IDENTITIES` | `true` | Create sector identities and link victims with a `part-of` relationship. |
| `dep.create_intrusion_sets` | `DEP_CREATE_INTRUSION_SETS` | `true` | Create intrusion sets from DEP actor values and link incidents with `attributed-to`. |
| `dep.create_country_locations` | `DEP_CREATE_COUNTRY_LOCATIONS` | `true` | Create country locations and link victim identities with `located-at`. |

## Why `IntrusionSet` for DEP actor values

DEP `actor` values are modeled as STIX `IntrusionSet` objects instead of `ThreatActor` by default.

- DEP actor strings usually represent campaign/operator labels, not high-confidence real-world identities.
- `IntrusionSet` is a safer semantic fit for recurring malicious activity clusters.
- This avoids over-claiming attribution when source data quality is limited.
- It supports incident and targeting analysis directly through `attributed-to` (incident -> intrusion set) and `targets` (intrusion set -> sector).

A `ThreatActor` model can be adopted later if the feed includes stronger attribution context (persona, role, motivation, sophistication).

## Docker

Expand Down Expand Up @@ -89,7 +105,10 @@ docker run --rm \
- Incidents are created with deterministic IDs derived from DEP `hashid`, and bundles are sent with `update=True`, so repeated records update existing incidents instead of creating duplicates.
- Sector names are normalized before sector-identity generation to reduce duplicates caused by inconsistent casing or whitespace in DEP data.
- The API occasionally URL-encodes announcement descriptions. The connector automatically decodes the description before sending it to OpenCTI.
- Intrusion set creation is disabled by default because not every dataset represents a threat actor. If needed, adapt the logic in `DepConnector._process_item`.
- DEP actor and country values can be materialized as entities using `DEP_CREATE_INTRUSION_SETS` and `DEP_CREATE_COUNTRY_LOCATIONS`.
- DEP actor and country values are also stored in incident custom properties (`dep_actor`, `dep_country`) for source traceability.
- Cross-entity links are automatic: intrusion set -> sector (`targets`) and sector -> country (`related-to`) when both entities are present.
- Generic low-quality actor values (for example `unknown`, `anonymous`, `ransomware group`) are ignored for intrusion-set creation.
- To reload the connector code in the platform, run: `docker compose build dep-connector; docker compose up -d dep-connector; docker compose logs -f dep-connector`

## License
Expand Down
2 changes: 2 additions & 0 deletions config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@ dep:
enable_hash_indicator: true
skip_empty_victim: true
create_sector_identities: true
create_intrusion_sets: true
create_country_locations: true
164 changes: 140 additions & 24 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class LeakRecord:

victim: str | None = None
sector: str | None = None
actor: str | None = None
country: str | None = None

revenue: str | None = None

Expand Down Expand Up @@ -86,16 +88,36 @@ def indicator_domain(self) -> str | None:
self.site
)

@field_validator("sector")
@field_validator("sector", "actor", "country")
@classmethod
def normalize_sector(cls, v: str | None) -> str | None:
def normalize_named_field(cls, v: str | None) -> str | None:
if v is None:
return None
normalized = " ".join(v.split()).strip()
return normalized or None
if not normalized:
return None
if normalized.lower() in {"n/a", "none"}:
return None
return normalized


class DepConnector:
GENERIC_ACTOR_VALUES = frozenset(
{
"unknown",
"unk",
"anonymous",
"unattributed",
"undisclosed",
"not disclosed",
"not-disclosed",
"ransomware group",
"ransomware gang",
"threat actor",
"attacker",
}
)

def __init__(self) -> None:
config = self._load_config()
self.helper = pycti.OpenCTIConnectorHelper(config)
Expand Down Expand Up @@ -199,6 +221,18 @@ def __init__(self) -> None:
config,
default=True,
)
self.create_intrusion_sets = pycti.get_config_variable(
"DEP_CREATE_INTRUSION_SETS",
["dep", "create_intrusion_sets"],
config,
default=True,
)
self.create_country_locations = pycti.get_config_variable(
"DEP_CREATE_COUNTRY_LOCATIONS",
["dep", "create_country_locations"],
config,
default=True,
)

@staticmethod
def _load_config() -> dict[str, Any]:
Expand Down Expand Up @@ -342,6 +376,33 @@ def _create_sector_identity(self, sector: str) -> stix2.Identity:
labels=[self.label_value],
)

def _create_intrusion_set(self, actor: str) -> stix2.IntrusionSet:
actor_key = actor.lower()
intrusion_set_id = (
f"intrusion-set--{uuid5(NAMESPACE_URL, f'dep-actor:{actor_key}')}"
)
return stix2.IntrusionSet(
id=intrusion_set_id,
name=actor,
confidence=self.confidence,
labels=[self.label_value],
created_by_ref=self.author_identity,
)

def _create_country_location(self, country: str) -> stix2.Location:
country_key = country.lower()
location_id = f"location--{uuid5(NAMESPACE_URL, f'dep-country:{country_key}')}"
return stix2.Location(
id=location_id,
name=country,
country=country,
confidence=self.confidence,
labels=[self.label_value],
created_by_ref=self.author_identity,
custom_properties={"x_opencti_location_type": "Country"},
allow_custom=True,
)

def _create_incident(self, item: LeakRecord) -> stix2.Incident:
victim_name = item.victim or item.victim_domain
if not victim_name:
Expand All @@ -363,6 +424,15 @@ def _create_incident(self, item: LeakRecord) -> stix2.Incident:
external_reference["description"] = item.ann_title
# incident_id must be deterministic to allow updates
incident_id = f"incident--{uuid5(NAMESPACE_URL, f'dep-announcement:{item.hashid.strip().lower()}')}"
custom_properties: dict[str, Any] = {
"incident_type": "cybercrime",
"first_seen": first_seen,
}
if item.actor:
custom_properties["dep_actor"] = item.actor
if item.country:
custom_properties["dep_country"] = item.country

return stix2.Incident(
id=incident_id,
name=incident_name,
Expand All @@ -372,10 +442,7 @@ def _create_incident(self, item: LeakRecord) -> stix2.Incident:
labels=self._build_incident_labels(item),
created_by_ref=self.author_identity,
external_references=[external_reference],
custom_properties={
"incident_type": "cybercrime",
"first_seen": first_seen,
},
custom_properties=custom_properties,
)

def _build_incident_labels(self, item: LeakRecord) -> list[str]:
Expand Down Expand Up @@ -434,6 +501,10 @@ def _detect_hash_type(hash_value: str) -> str | None:
length_to_type = {32: "MD5", 40: "SHA-1", 64: "SHA-256"}
return length_to_type.get(len(hash_value))

def _is_low_quality_actor(self, actor: str) -> bool:
normalized = " ".join(actor.lower().split())
return normalized in self.GENERIC_ACTOR_VALUES

def _build_relationship(
self,
relationship_type: str,
Expand Down Expand Up @@ -465,39 +536,84 @@ def _should_skip_item(self, victim: str | None) -> bool:
normalized = (victim or "").strip().lower()
return normalized in {"", "n/a", "none"}

def _process_item(self, item: LeakRecord) -> None:
if self._should_skip_item(item.victim):
self.helper.log_info(
"Skipping DEP item with empty or placeholder victim value"
)
return
victim = self._create_victim_identity(item)
incident = self._create_incident(item)

def _build_indicators(self, item: LeakRecord) -> list[stix2.Indicator]:
indicators: list[stix2.Indicator] = []
site_indicator = self._create_site_indicator(item)
if site_indicator:
indicators.append(site_indicator)
hash_indicator = self._create_hash_indicator(item)
if hash_indicator:
indicators.append(hash_indicator)
return indicators

def _build_optional_entities(
self,
item: LeakRecord,
victim: stix2.Identity | None,
incident: stix2.Incident,
) -> list[stix2._STIXBase21]:
objects: list[stix2._STIXBase21] = []
sector_identity: stix2.Identity | None = None
sector = item.sector
if self.create_sector_identities and sector and victim:
sector_identity = self._create_sector_identity(sector)
if self.create_sector_identities and item.sector and victim:
sector_identity = self._create_sector_identity(item.sector)
if sector_identity and victim:
objects.append(sector_identity)
objects.append(
self._build_relationship("part-of", victim.id, sector_identity.id)
)

intrusion_set: stix2.IntrusionSet | None = None
if (
self.create_intrusion_sets
and item.actor
and not self._is_low_quality_actor(item.actor)
):
intrusion_set = self._create_intrusion_set(item.actor)
if intrusion_set:
objects.append(intrusion_set)
objects.append(
self._build_relationship("attributed-to", incident.id, intrusion_set.id)
)

country_location: stix2.Location | None = None
if self.create_country_locations and item.country and victim:
country_location = self._create_country_location(item.country)
if country_location and victim:
objects.append(country_location)
objects.append(
self._build_relationship("located-at", victim.id, country_location.id)
)
if intrusion_set and sector_identity:
objects.append(
self._build_relationship(
"targets", intrusion_set.id, sector_identity.id
)
)
if sector_identity and country_location:
objects.append(
self._build_relationship(
"related-to", sector_identity.id, country_location.id
)
)
return objects

def _process_item(self, item: LeakRecord) -> None:
if self._should_skip_item(item.victim):
self.helper.log_info(
"Skipping DEP item with empty or placeholder victim value"
)
return
victim = self._create_victim_identity(item)
incident = self._create_incident(item)
indicators = self._build_indicators(item)

objects: list[stix2._STIXBase21] = [self.author_identity]
if victim:
objects.append(victim)
objects.append(incident)
if victim:
objects.append(self._build_relationship("targets", incident.id, victim.id))
if sector_identity and victim:
objects.append(sector_identity)
objects.append(
self._build_relationship("part-of", victim.id, sector_identity.id)
)
objects.extend(self._build_optional_entities(item, victim, incident))
for indicator in indicators:
objects.append(indicator)
objects.append(
Expand Down