The Advanced Persistent Threat (APT) Intelligence Engine is a high-performance, modular framework designed for the real-time ingestion, normalization, and behavioral analysis of multi-modal security telemetry. Moving beyond static Indicators of Compromise (IoCs), this engine leverages adversarial behavioral fingerprinting, MITRE ATT&CK mapping, and unsupervised machine learning (K-Means clustering) to identify, track, and correlate complex intrusion campaigns across disparate data sources.
Developed with a security-first, extensible architecture, the engine seamlessly integrates with enterprise SOC infrastructure (Splunk, ELK, Kafka) to provide immediate tactical insights and strategic intelligence reporting.
The engine operates on a pipeline architecture consisting of four primary stages: Ingestion, Mapping, Analytics, and Reporting.
graph TD
A[Telemetry Ingestor<br/>Kafka / Splunk / Syslog / WebSocket] --> B[Adversarial Mapping Layer<br/>mitre_mapper.py / technique_inference]
B --> C[Analytics & Fingerprinting<br/>fingerprint_engine.py / similarity_engine.py]
C --> D[Clustering Engine<br/>clustering_engine.py / K-Means]
C --> E[Graph Builder<br/>graph_builder.py / Attack Path Network]
D --> F[Intelligence Reporting<br/>intelligence_report.py]
E --> F
F --> G[Strategic Intelligence Reports]
style A fill:#2b5c8f,stroke:#1a365d,stroke-width:2px,color:#fff
style B fill:#319795,stroke:#234e52,stroke-width:2px,color:#fff
style C fill:#d69e2e,stroke:#744210,stroke-width:2px,color:#fff
style D fill:#dd6b20,stroke:#7b341e,stroke-width:2px,color:#fff
style E fill:#805ad5,stroke:#44337a,stroke-width:2px,color:#fff
style F fill:#e53e3e,stroke:#742a2a,stroke-width:2px,color:#fff
A thread-safe, scalable streaming engine capable of processing high-velocity event streams via sliding windows or batch configurations.
- Real-Time Ingestor (
realtime_ingestor.py): The core ingestion loop utilizing concurrent worker threads and deque-based buffers for non-blocking event processing. Supports dynamic batch sizing and sliding time windows. - Enterprise Connectors (
connectors.py): Pluggable generator functions supporting native integration with Kafka, Elasticsearch, Splunk, REST APIs, WebSockets, Syslog, and Windows Event Logs.
Example: Kafka Connector Integration
def kafka_event_stream(
topic: str,
bootstrap_servers: List[str],
group_id: str = "threat-intelligence-engine",
) -> Generator[Dict[str, Any], None, None]:
# ... connection logic ...
for message in consumer:
raw_event = message.value
# Normalize to standard schema
yield {
"campaign_id": raw_event.get("campaign_id", "KAFKA-STREAM"),
"event_id": raw_event.get("event_id", str(uuid.uuid4())),
"timestamp": raw_event.get("timestamp", str(datetime.now(timezone.utc))),
"host": raw_event.get("host", "UNKNOWN"),
"technique_id": raw_event.get("technique_id", None),
"source": "kafka",
"_raw": raw_event,
}- Sysmon Parser (
sysmon_parser.py): Deterministic parser for inferring adversarial techniques from raw Sysmon logs.
Example: Deterministic Technique Inference
def infer_technique(event):
event_id = event.get("EventID")
command = str(event.get("CommandLine", "")).lower()
# Process Creation (Event ID 1)
if event_id == 1:
# Encoded PowerShell -> Execution (T1059)
if "powershell" in image and "-enc" in command:
return "T1059"
# Mimikatz execution -> Credential Dumping (T1003)
if "mimikatz" in command:
return "T1003"
# Network Connection (Event ID 3) -> Exfiltration (T1041)
if event_id == 3:
return "T1041"
return None- MITRE ATT&CK Mapper (
mitre_mapper.py): Normalizes disparate event streams by translating raw telemetry and inferred techniques into standardized MITRE ATT&CK tactics (e.g., Execution, Persistence, Credential Access).
The computational core responsible for translating discrete events into actionable intelligence.
- Fingerprint Engine (
fingerprint_engine.py): Generates fixed-length behavioral vectors representing adversary behavior. Computes technique frequency, tactic sequences, host spread, and a proprietary Campaign Complexity Score.
Example: Behavioral Vectorization
# Fixed-length vectorization mapping
ALL_TECHNIQUES = ["T1059", "T1547", "T1003", "T1021", "T1041"]
def generate_fingerprint(mapped_logs):
techniques = [e["technique_id"] for e in mapped_logs]
hosts = set(e["host"] for e in mapped_logs)
technique_freq = Counter(techniques)
# TRUE BEHAVIORAL VECTOR (Technique-based)
# Each dimension represents frequency of a specific technique
vector = np.array([
technique_freq.get(t, 0) for t in ALL_TECHNIQUES
])
# Behavioral complexity metric
complexity_score = len(set(techniques)) * len(hosts)
# ... returns comprehensive fingerprint dict ...- Clustering Engine (
clustering_engine.py): Utilizesscikit-learnK-Means clustering on behavioral vectors to group structurally similar campaigns and identify threat actor overlap.
Example: K-Means Clustering on Behavioral Vectors
from sklearn.cluster import KMeans
import numpy as np
def cluster_campaigns(fingerprints, n_clusters=2):
# Extract behavioral vectors
vectors = np.array([fp["vector"] for fp in fingerprints])
# Dynamic cluster adjustment based on distinct vectors
if len(set(map(tuple, vectors))) < n_clusters:
n_clusters = len(set(map(tuple, vectors)))
# Execute K-Means clustering
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
labels = kmeans.fit_predict(vectors)
return labels- Similarity Engine (
similarity_engine.py): Calculates cosine similarity between campaign vectors to quantify operational divergence.
Example: Quantifying Campaign Divergence
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
def compute_similarity(fp1, fp2):
# Reshape behavioral vectors for scikit-learn
v1 = np.array(fp1["vector"]).reshape(1, -1)
v2 = np.array(fp2["vector"]).reshape(1, -1)
# Compute cosine similarity score (1.0 = identical, 0.0 = completely divergent)
score = cosine_similarity(v1, v2)[0][0]
return score- Graph Builder (
graph_builder.py): Constructs directed graphs usingnetworkxto visualize the chronological progression and relationship of attack techniques within a campaign.
Example: Building Attack Progression Graphs
import networkx as nx
def build_attack_graph(mapped_logs):
G = nx.DiGraph()
# Map sequential technique transitions as directed edges
for i in range(len(mapped_logs)-1):
src = mapped_logs[i]["technique_id"]
dst = mapped_logs[i+1]["technique_id"]
G.add_edge(src, dst)
return G- Report Generator (
intelligence_report.py): Synthesizes analytical outputs into strategic threat intelligence reports, detailing risk levels, cluster classifications, and behavioral assessments.
The engine supports multiple operational paradigms, configurable via the MODE directive in main.py:
# ==========================================
# CONFIGURATION (main.py)
# ==========================================
# "simulate" → synthetic campaigns (batch mode)
# "sysmon" → real Sysmon ingestion (batch mode)
# "realtime" → real-time streaming ingestion (simulated)
# "kafka" → Kafka streaming
# "splunk" → Splunk integration
MODE = "realtime"Install the required dependencies based on your desired operational mode:
# Core Dependencies
pip install pandas scikit-learn networkx numpy jinja2
# Connector Dependencies (Install as needed)
pip install kafka-python elasticsearch splunk-sdk requests websocket-clientModify CONNECTOR_CONFIG in main.py with your enterprise credentials. Refer to ENDPOINT_CONFIGURATIONS.py for comprehensive examples.
Example: Hybrid Cloud/On-Prem Setup
CONNECTOR_CONFIG = {
"splunk": {
"host": "splunk.internal.local:8089",
"username": "threat_intel",
"password": "SecurePassword123", # Use env vars in prod
"search_query": "sourcetype=sysmon index=main earliest=-30m latest=now"
},
"kafka": {
"bootstrap_servers": ["kafka1.cloud.local:9092", "kafka2.cloud.local:9092"],
"topic": "security.alerts",
"group_id": "apt_threat_intelligence"
}
}1. Interactive Demo (connector_demo.py)
For a quick overview of all supported enterprise integrations and configurations, run the interactive demo:
python connector_demo.pyThis utility provides a terminal UI comparing connector speed, scale, and specific deployment commands.
2. Main Pipeline Execution (main.py)
To run the engine in your configured mode (batch, simulated, or live connector):
python main.pyThe project includes a suite of mock servers to validate the ingestion pipeline without requiring access to production systems:
# Start all mock servers (REST, WebSocket, Splunk)
python setup_infrastructure.py
# Send simulated Syslog events to test ingestion
python send_syslog.py localhost 514 10The engine departs from traditional signature matching by employing Behavioral Vectorization.
- Extraction: Telemetry is parsed to identify execution artifacts, network connections, and registry modifications.
- Inference: Artifacts are mapped to specific MITRE ATT&CK techniques (e.g., PowerShell execution with obfuscation -> T1059).
- Vectorization: Techniques are compiled into a frequency distribution vector across a predefined technique universe.
- Clustering: Vectors are mapped into n-dimensional space. K-Means clustering identifies centroids representing distinct operational playbooks, enabling the correlation of seemingly disparate campaigns attributed to the same threat actor.
To facilitate testing and verification, the repository includes pre-packaged, sanitized sample datasets under the data/ directory:
- Raw Logs (
data/raw_logs_*.json): Represents raw event logs capturing execution, network connection, and registry modification signals from various hosts. Sensitive fields like machine IP addresses, domain names, and user accounts have been sanitized (e.g., mapped to generic structures likeHOST-1and dummy hashes). - Campaign Data (
data/campaign_*.json): Normalized telemetry structured into specific attack campaigns, mapping events chronologically with inferred MITRE ATT&CK techniques such as:T1059(Command and Scripting Interpreter)T1003(Credential Dumping)T1547(Boot or Logon Autostart Execution)T1021(Remote Services)T1041(Exfiltration Over C2 Channel)
These datasets allow you to run the clustering and similarity engines immediately and verify output results.
- Credential Management: Never hardcode credentials in
CONNECTOR_CONFIGorENDPOINT_CONFIGURATIONS.py. Utilize environment variables or secure vault integrations for production deployments. - Data Privacy: Ensure appropriate anonymization or masking is applied to telemetry streams if processing PII or sensitive hostnames.
Developed for advanced threat hunting and proactive adversary characterization.