diff --git a/README.md b/README.md index 95e47c2..37b09aa 100644 --- a/README.md +++ b/README.md @@ -6,14 +6,18 @@ Multi-Agent Reinforcement Learning (MARL) cybersecurity simulator mathematically **Project:** NetForge RL **GNN-based Policy Model:** https://github.com/elprofesoriqo/GNN-based-Policy-Model-for-MARL-Cyber -## Architectural Overhaul Notice +## Architectural Changes & State-of-the-Art Modeling -This repository represents a complete structural redesign of the original CybORG framework. I took ownership of this branch because the legacy CybORG environment was fundamentally restricted to single-agent, turn-based paradigms (utilizing nested OpenAI Gym wrappers) which artificially broke parallel gradients and hindered true Multi-Agent research. +This repository is a dramatic evolution from the legacy CybORG / CAGE challenge environment. While acknowledging the incredible fundamental work by DSTG, NetForge RL transitions the paradigm from a synchronous, fully observable game into a high-fidelity, physically constrained network simulation designed for real-world Sim-to-Real transfer. ### What is Different? -1. **Parallel Execution via PettingZoo:** The core simulator is now strictly built upon the `pettingzoo.ParallelEnv` standard instead of monolithic Gym wrappers. Red and Blue teams act in a simultaneous time vacuum, and the engine natively resolves their conflicting action intents. -2. **Abstract Action Engine:** Actions no longer mutate simulator state directly via complex monolithic switch statements. `BaseAction` computes an `ActionEffect` (JSON representation of physical network impact), which the core environment evaluates and securely commits. -3. **No Legacy Bloat:** I have deleted all obsolete OpenAI Gym references, redundant CAGE challenge sub-modules, and unneeded demo code. +1. **Interruptible Tick-Based Engine:** CybORG's instantaneous actions are gone. NetForge RL runs on an asynchronous `current_tick` clock. Actions have a `duration` natively. Real-time interruptions exist: if the SOC isolates a host mid-exfiltration, the attacker's action is aborted. +2. **Strict POMDP Isolation & Fog of War:** Defenders do not see the ground truth. They receive dynamic telemetry alerts generated by a newly implemented `siem_log_buffer` suffering from realistic `log_latency`. Background noise agents obfuscate true malicious alerts. +3. **MultiDiscrete Tensors & Procedural Networks:** To avoid static overfitting and combinatorial explosions, Action spaces utilize `MultiDiscrete` Arrays (e.g. `[ActionType, TargetIP]`). Topologies procedurally generate up to 50 active nodes utilizing padded masking dynamically. +4. **Attack Economics & Cost Mechanics:** Each agent is bounded by Operational Budgets (`agent_funds`, `agent_compute`). Reckless defensive isolation triggers massive Business Downtime mathematical penalties mirroring real-world SLA fines. +5. **Cyber-Physical (OT) Convergence:** Generating distinct `OT_Subnets` featuring `PLC` nodes mapping thermodynamic vulnerabilities. Red operators can inflict catastrophic Kinetic Impacts `(+10000/-10000 rewards)` overriding logical state tracking entirely. +6. **Social Engineering (Stochastics):** DMZ architectures can natively be bypassed by Red teams leveraging `SpearPhishing` arrays scaled against dynamically rolled `human_vulnerability_score` matrix properties. Blue counters this via explicit `SecurityAwarenessTraining` capital expenditure. +7. **Ray RLlib & PyTorch LSTMs:** Packaged natively with Custom PyTorch Models linking Recurrent Memory sequences (LSTMs) alongside mathematical boolean Action Masking dropping invalid tensor networks natively out-of-the-box. ### Simulator Architecture Flow @@ -37,10 +41,10 @@ graph TD The environment is designed to be highly plug-and-play. ```python -from marl_cyborg.environment.parallel_env import ParallelMarlCyborg +from netforge_rl.environment.parallel_env import NetForgeRLEnv # Instantiate the native PettingZoo environment -env = ParallelMarlCyborg(scenario_config={}) +env = NetForgeRLEnv(scenario_config={}) # Reset to get parallel Gymnasium boxes observations, infos = env.reset() @@ -53,50 +57,38 @@ print("Blue Box:", observations["Blue"]) The primary reason for this fork is extensibility. Want to add an *ARP Poisoning* attack? -Simply inherit the `BaseAction` inside `marl_cyborg/actions/network/arp_poison.py`, write how it modifies the theoretical `ActionEffect`, and the engine natively calculates the physics resolution. See `marl_cyborg.actions.network.ip_fragmentation.IPFragmentationAction` for a physical example of this structural implementation. +Simply inherit the `BaseAction` inside `netforge_rl/actions/network/arp_poison.py`, write how it modifies the theoretical `ActionEffect`, and the engine natively calculates the physics resolution. See `netforge_rl.actions.network.ip_fragmentation.IPFragmentationAction` for a physical example of this structural implementation. ## License & Accreditation This project is built upon the foundational work provided by the original CybORG contributors (CyberSecurityCRC / DSTG). The core internal simulator physics remain preserved, while the outward translation layers, action hierarchy, and Multi-Agent APIs have been entirely redesigned by Igor Jankowski. ## Repository Structure -- `marl_cyborg/`: Core simulation environment +- `netforge_rl/`: Core simulation environment - `actions/`: Contains definitions for all `BaseAction` implementations. - - `red_actions.py`: Red team offensive actions. - - `blue_actions.py`: Blue team defensive actions. - - `core/`: State, Observation, and Action abstract base classes. + - `agents/`: Contains specialized algorithmic actors like `GreenAgent` (Background Noise simulation). + - `core/`: State, Observation, and Action abstract base classes enforcing physical constraints. - `environment/`: - - `parallel_env.py`: The primary PettingZoo MARL environment. + - `parallel_env.py`: The primary asynchronous PettingZoo MARL environment. - `pcap_synthesizer.py`: Generates synthetic offline `.pcap` network traffic mappings. - `train_curriculum.py`: Example RL training script. - `test_physics.py`: Physics unit tests. ## Available Actions -All actions are natively available to the RL models through the environment's discrete action space (`Discrete(256)`). The engine dynamically scales and maps these 11 actions per team against all available network IPs. +All actions are natively available to the RL models through the environment's `MultiDiscrete` action space mapped seamlessly via PyTorch Logit structures. ### Red Team (Offensive) -1. **NetworkScan**: Scans a target subnet for active IP addresses. -2. **DiscoverRemoteSystems**: Performs a Ping Sweep to pinpoint active hosts. -3. **DiscoverNetworkServices**: Port scans a host to enumerate running services. -4. **ExploitRemoteService**: Exploits a vulnerability on a target IP to gain User privileges. -5. **PrivilegeEscalate**: Escalates from User to Root access. -6. **Impact**: Destroys/encrypts data on a compromised host (Ransomware/Wiper). -7. **ExploitBlueKeep**: Exploits RDP (CVE-2019-0708) on Port 3389. -8. **ExploitEternalBlue**: Exploits SMB (MS17-010) on Port 445. -9. **ExploitHTTP_RFI**: Remote File Inclusion exploit targeting Port 80. -10. **JuicyPotato**: Local privilege escalation via DCOM (Windows). -11. **V4L2KernelExploit**: Local privilege escalation via Video4Linux kernel vulns (Linux). +1. **NetworkScan / DiscoverRemoteSystems / DiscoverNetworkServices**: Passive/Active reconnaissance probing ports & ping sweeps. +2. **SpearPhishing**: Bypasses corporate structures directly exploiting human error factors inside user networks. +3. **ExploitRemoteService / ExploitEternalBlue...**: Gain user privileges weaponizing CVEs based on specific OS versions and open Ports. +4. **PrivilegeEscalate**: Pivot from constrained user constraints to `Root`/`System`. +5. **Impact**: Ransomware execution mapping standard IT failure metrics. +6. **OverloadPLC (Kinetic)**: Weaponizes thermodynamics on compromised OT Networks forcing episode kinetic destruction sequences. ### Blue Team (Defensive) -1. **IsolateHost**: Disconnects a host completely from the network. -2. **RestoreHost**: Brings an isolated host back online from a clean snapshot. -3. **Monitor**: Actively monitors traffic on a specific subnet or host for anomalies. -4. **Analyze**: Deep scans a specific host for malware signatures or unauthorized user activity. -5. **DeployDecoy**: Deploys a generic fake service (Apache/Tomcat/Femitter) to bait attackers. -6. **Remove**: Removes unauthorized user privileges. -7. **RestoreFromBackup**: Purges an infected host and restores it to a clean baseline from a backup. -8. **DecoyApache**: Deploys a fake Apache web server (Port 80) honeypot. -9. **DecoySSHD**: Deploys a fake SSH daemon (Port 22) honeypot. -10. **DecoyTomcat**: Deploys a fake Tomcat server (Port 8080) honeypot. -11. **Misinform**: Injects false host telemetry or alters logging to feed Red agents fake data. +1. **IsolateHost / RestoreHost**: Logical quarantining of suspected nodes (Incurs heavily tracked SLA Business downtime). +2. **Monitor / Analyze**: Asynchronous deep network/host scans bypassing standard physical delays. +3. **SecurityAwarenessTraining**: Burns financial budget mathematically slashing organic `human_vulnerability_scores` defending against phish payloads. +4. **DeployHoneytoken (Active Deception)**: Secretly seeds RAM-based tokens triggering massive unevadable 0-delay severity 10 SIEM alerts when parsed by automated Red lateral mapping capabilities. +5. **DecoyApache / DecoySSHD / DeployDecoy**: Deploys visible port-80/22 traps binding attacker compute resources across dead execution loops. diff --git a/changelog.md b/changelog.md index 5778ca5..b1d303c 100644 --- a/changelog.md +++ b/changelog.md @@ -1,11 +1,11 @@ # Changelog -All notable changes to the `marl_cyborg` project will be documented in this file. +All notable changes to the `netforge_rl` project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ## [3.0.0] - 2026-02-28 ### Added -- **PettingZoo API Core Integration**: Created `marl_cyborg/environment/parallel_env.py` substituting the legacy wrapper paradigm with `pettingzoo.ParallelEnv`, explicitly allowing concurrent multi-agent action steps. +- **PettingZoo API Core Integration**: Created `netforge_rl/environment/parallel_env.py` substituting the legacy wrapper paradigm with `pettingzoo.ParallelEnv`, explicitly allowing concurrent multi-agent action steps. - **Gymnasium Box Compatibility**: All spaces natively map to `gymnasium.spaces` APIs instead of arbitrary nested classes. - **`BaseAction` / `BaseObservation` Abstract Hierarchy**: Abstracted action mutation. Cyber attacks no longer edit the state directly, but rather return a theoretical JSON impact via `ActionEffect` allowing the environment to resolve simultaneity conflicts natively. - **Python 3.12 Support (Native)**: Enforced via the new `pyproject.toml` definition. diff --git a/marl_cyborg/core/action.py b/marl_cyborg/core/action.py deleted file mode 100644 index 1305546..0000000 --- a/marl_cyborg/core/action.py +++ /dev/null @@ -1,62 +0,0 @@ -from abc import ABC, abstractmethod -from typing import Dict, Any, Optional, TYPE_CHECKING - -if TYPE_CHECKING: - from marl_cyborg.core.state import GlobalNetworkState - - -class ActionEffect: - """Encapsulates the resulting state changes from an action for conflict - - resolution. - """ - - def __init__( - self, - success: bool, - state_deltas: Dict[str, Any], - observation_data: Dict[str, Any], - eta: int = 0, - ): - self.success = success - self.state_deltas = state_deltas - self.observation_data = observation_data - self.eta = eta - - -class BaseAction(ABC): - """Modular Base Action for the MARL CybORG Environment. - - All highly specific network attacks (Layer 2 - Layer 7) inherit from this class. - """ - - def __init__( - self, - agent_id: str, - target_ip: Optional[str] = None, - source_ip: Optional[str] = None, - cost: int = 1, - ): - self.agent_id = agent_id - self.target_ip = target_ip - self.source_ip = source_ip - self.cost = cost - - @abstractmethod - def validate(self, global_state: 'GlobalNetworkState') -> bool: - """Checks if the action is physically possible in the current network - - state (e.g., is there a route? - - is the port open?). - """ - pass - - @abstractmethod - def execute(self, global_state: 'GlobalNetworkState') -> ActionEffect: - """Computes the theoretical effect of the action. - - Note: State is NOT mutated directly here. Mutations are returned via ActionEffect - to allow the Environment to resolve simultaneous multi-agent collisions. - """ - pass diff --git a/marl_cyborg/environment/__init__.py b/marl_cyborg/environment/__init__.py deleted file mode 100644 index 030e31c..0000000 --- a/marl_cyborg/environment/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .base_env import BaseMarlCyborg -from .parallel_env import ParallelMarlCyborg - -__all__ = ['BaseMarlCyborg', 'ParallelMarlCyborg'] diff --git a/marl_cyborg/environment/parallel_env.py b/marl_cyborg/environment/parallel_env.py deleted file mode 100644 index 00c8363..0000000 --- a/marl_cyborg/environment/parallel_env.py +++ /dev/null @@ -1,477 +0,0 @@ -from typing import Dict, Tuple -import numpy as np -import gymnasium as gym - -from marl_cyborg.core.action import BaseAction, ActionEffect -from marl_cyborg.core.observation import BaseObservation -from marl_cyborg.environment.base_env import BaseMarlCyborg -from marl_cyborg.topologies.network_generator import NetworkGenerator - - -class ParallelMarlCyborg(BaseMarlCyborg): - """MARL Environment for CybORG. - - Follows the PettingZoo Parallel API standard for simultaneous Multi- - Agent execution and relies exclusively on Gymnasium spaces natively. - """ - - metadata = {'render_modes': ['ansi'], 'name': 'marl_cyborg_v3'} - - def __init__(self, scenario_config: dict): - # Default to procedural generation if no specific architecture config is provided - topology_path = ( - scenario_config.get('topology_path') if scenario_config else None - ) - self.network_generator = NetworkGenerator(config_path=topology_path) - - scenario_type = ( - scenario_config.get('scenario_type', 'ransomware') - if scenario_config - else 'ransomware' - ) - self.possible_agents = [ - 'red_commander', - 'red_operator', - 'blue_commander', - 'blue_operator', - ] - self.agents = self.possible_agents[:] - - if scenario_type.lower() == 'ransomware': - from marl_cyborg.scenarios.ransomware import RansomwareScenario - - self.scenario = RansomwareScenario(self.agents) - else: - from marl_cyborg.scenarios.apt_espionage import AptEspionageScenario - - self.scenario = AptEspionageScenario(self.agents) - - self.global_state = self.network_generator.generate() - - # Native Gymnasium Spaces for PettingZoo API - self.observation_spaces = { - agent: gym.spaces.Box(low=-1.0, high=1.0, shape=(256,), dtype=np.float32) - for agent in self.possible_agents - } - self.action_spaces = { - agent: gym.spaces.Discrete( - 256 - ) # Expanded to natively support advanced actions across 40+ IPs - for agent in self.possible_agents - } - self.max_steps = 100 - self.current_step = 0 - - def reset( - self, seed=None, options=None - ) -> Tuple[Dict[str, np.ndarray], Dict[str, dict]]: - """Resets the network state to initial configuration natively - - (Gymnasium style + PettingZoo). - """ - self.global_state = self.network_generator.generate(seed=seed) - self.agents = self.possible_agents[:] - self.global_state.agent_energy = {agent: 50 for agent in self.agents} - observations = {} - for agent_id in self.agents: - obs = BaseObservation(agent_id) - obs.update_from_state(self.global_state, []) - observations[agent_id] = obs.to_numpy(max_size=256) - self.current_step = 0 - - return observations, {agent: {} for agent in self.agents} - - def observation_space(self, agent): - return self.observation_spaces[agent] - - def action_space(self, agent): - return self.action_spaces[agent] - - def action_mask(self, agent: str) -> np.ndarray: - """Returns a binary mask denoting valid and distinct action integers for the agent, - pruning out computationally redundant modulo duplicates. - """ - mask = np.zeros(self.action_spaces[agent].n, dtype=np.int8) - - target_ips = sorted(list(self.global_state.all_hosts.keys())) - num_targets = len(target_ips) if target_ips else 1 - - if 'red' in agent.lower(): - valid_groups = 4 if 'commander' in agent.lower() else 9 - else: - valid_groups = 5 if 'commander' in agent.lower() else 7 - - max_valid_action = min(valid_groups * num_targets, self.action_spaces[agent].n) - mask[:max_valid_action] = 1 - return mask - - def step( - self, agent_actions: Dict[str, int] - ) -> Tuple[ - Dict[str, BaseObservation], - Dict[str, float], - Dict[str, bool], - Dict[str, bool], - Dict[str, dict], - ]: - """ - Simultaneous Step Execution Logic: - - 1. VALIDATION: Check if actions are physically possible. - 2. EXECUTION: Compute intended state changes (ActionEffects) WITHOUT mutating state yet. - 3. CONFLICT RESOLUTION: E.g., if Blue drops a connection while Red exploits it, Blue wins. - 4. MUTATION: Apply final resolved effects to the true global state. - 5. OBSERVATION: Re-calculate what each agent can see. - """ - intended_effects = {} - for agent, action_int in agent_actions.items(): - # Validate temporal locks - if self.current_step < self.global_state.agent_locked_until.get(agent, 0): - intended_effects[agent] = ActionEffect( - success=False, - state_deltas={}, - observation_data={ - 'error': 'Agent locked executing previous action' - }, - ) - continue - - if isinstance(action_int, BaseAction): - action = action_int - else: - action = self._decode_action(agent, int(action_int)) - - # Validate temporal energy constraints - if self.global_state.agent_energy.get(agent, 0) < action.cost: - intended_effects[agent] = ActionEffect( - success=False, - state_deltas={}, - observation_data={'error': 'Insufficient Energy'}, - ) - continue - - # Expend energy for the action regardless of success - self.global_state.agent_energy[agent] -= action.cost - - if action.validate(self.global_state): - effect = action.execute(self.global_state) - if getattr(effect, 'eta', 0) > 0: - self.global_state.agent_locked_until[agent] = ( - self.current_step + effect.eta - ) - self.global_state.pending_effects.append( - (self.current_step + effect.eta, agent, effect) - ) - intended_effects[agent] = ActionEffect( - success=False, - state_deltas={}, - observation_data={ - 'status': f'Executing action... ETA {effect.eta} steps' - }, - ) - else: - intended_effects[agent] = effect - else: - intended_effects[agent] = ActionEffect( - success=False, - state_deltas={}, - observation_data={'exploit': 'validation failed natively'}, - ) - - # Process delayed effects that have arrived natively - remaining_pending = [] - for eta_step, p_agent, p_effect in self.global_state.pending_effects: - if self.current_step >= eta_step: - intended_effects[p_agent] = ( - p_effect # Overlay onto their intended effect - ) - else: - remaining_pending.append((eta_step, p_agent, p_effect)) - self.global_state.pending_effects = remaining_pending - - resolved_effects = self._resolve_conflicts(intended_effects) - - self._apply_state_deltas(resolved_effects) - - observations = {} - rewards = {} - terminate = self.scenario.check_termination(self.global_state) - self.current_step += 1 - - # Trigger dynamic topology mutations mid-episode - if self.current_step % 40 == 0: - self.global_state.reallocate_dhcp() - - is_truncated = self.current_step >= self.max_steps - truncate = {agent: is_truncated for agent in self.agents} - - for agent in self.agents: - obs = BaseObservation(agent) - obs.update_from_state(self.global_state, resolved_effects) - - obs_array = obs.to_numpy(max_size=256) - if 'operator' in agent: - commander_id = agent.replace('operator', 'commander') - if commander_id in agent_actions: - cmd_action = agent_actions[commander_id] - # Normalize the Discrete(100) action to a float between 0.0 and 1.0 - cmd_val = ( - (float(cmd_action) / 100.0) - if not isinstance(cmd_action, BaseAction) - else 1.0 - ) - obs_array[0] = cmd_val - - observations[agent] = obs_array - - # Reward shaping applied here natively factoring in immediate action outcomes - agent_effect = resolved_effects.get(agent) - rewards[agent] = self._calculate_reward( - agent, self.global_state, agent_effect - ) - - self.agents = [ - agent - for agent in self.agents - if not terminate[agent] and not truncate[agent] - ] - - # ── Build info dicts with security metrics for callbacks ── - infos = self._extract_agent_infos(observations, resolved_effects) - - return observations, rewards, terminate, truncate, infos - - def render(self): - """Standard PettingZoo GUI logging render hook.""" - pass - - def _decode_action(self, agent_id: str, action_int: int) -> BaseAction: - from marl_cyborg.actions import ( - IsolateHost, - RestoreHost, - Monitor, - Analyze, - DeployDecoy, - Remove, - RestoreFromBackup, - DecoyApache, - DecoySSHD, - DecoyTomcat, - Misinform, - NetworkScan, - DiscoverRemoteSystems, - DiscoverNetworkServices, - ExploitRemoteService, - PrivilegeEscalate, - Impact, - ExploitBlueKeep, - ExploitEternalBlue, - ExploitHTTP_RFI, - JuicyPotato, - V4L2KernelExploit, - KillProcess, - ShareIntelligence, - ConfigureACL, - ) - - target_ips = sorted(list(self.global_state.all_hosts.keys())) - if not target_ips: - target_ips = ['127.0.0.1'] - - target_ip = target_ips[action_int % len(target_ips)] - action_group = action_int // len(target_ips) - - if 'red' in agent_id.lower(): - if 'commander' in agent_id.lower(): - action_type = action_group % 4 - if action_type == 0: - return NetworkScan(agent_id, '10.0.0.0/24') - elif action_type == 1: - return DiscoverRemoteSystems(agent_id, '10.0.0.0/24') - elif action_type == 2: - return DiscoverNetworkServices(agent_id, target_ip) - else: - return ShareIntelligence(agent_id, 'red_operator') - else: - action_type = action_group % 9 - if action_type == 0: - return ExploitRemoteService(agent_id, target_ip) - elif action_type == 1: - return PrivilegeEscalate(agent_id, target_ip) - elif action_type == 2: - return Impact(agent_id, target_ip) - elif action_type == 3: - return ExploitBlueKeep(agent_id, target_ip) - elif action_type == 4: - return ExploitEternalBlue(agent_id, target_ip) - elif action_type == 5: - return ExploitHTTP_RFI(agent_id, target_ip) - elif action_type == 6: - return JuicyPotato(agent_id, target_ip) - elif action_type == 7: - return V4L2KernelExploit(agent_id, target_ip) - else: - return KillProcess(agent_id, target_ip) - else: - if 'commander' in agent_id.lower(): - action_type = action_group % 5 - if action_type == 0: - return DeployDecoy(agent_id, target_ip) - elif action_type == 1: - return DecoyApache(agent_id, target_ip) - elif action_type == 2: - return DecoySSHD(agent_id, target_ip) - elif action_type == 3: - return DecoyTomcat(agent_id, target_ip) - else: - return Misinform(agent_id, target_ip) - else: - action_type = action_group % 7 - if action_type == 0: - return IsolateHost(agent_id, target_ip) - elif action_type == 1: - return RestoreHost(agent_id, target_ip) - elif action_type == 2: - return Monitor(agent_id, target_ip) - elif action_type == 3: - return Analyze(agent_id, target_ip) - elif action_type == 4: - return Remove(agent_id, target_ip) - elif action_type == 5: - return RestoreFromBackup(agent_id, target_ip) - else: - subnet = '.'.join(target_ip.split('.')[:3]) + '.0/24' - return ConfigureACL(agent_id, target_subnet=subnet, port=445) - - def _resolve_conflicts( - self, effects: Dict[str, ActionEffect] - ) -> Dict[str, ActionEffect]: - """Core physics engine. - - Mathematically resolves simultaneous temporal collisions. - Priority: Blue Defensive actions generally supersede Red Offensive actions - on the exact same network node if executed in the exact same fraction of a second. - """ - # Separate offensive and defensive intents - red_agents = [a for a in effects if 'red' in a.lower()] - blue_agents = [a for a in effects if 'blue' in a.lower()] - - # 1. Compile all Blue defensive targets and actions for this timestep - blue_defended_nodes = {} - for blue_id in blue_agents: - eff = effects[blue_id] - if eff.success: - # E.g., eff.state_deltas might contain: {"hosts.10.0.0.5.port.80": "closed"} - for delta_key, delta_val in eff.state_deltas.items(): - if 'hosts/' in delta_key: - target_ip = delta_key.split('/')[1] - if target_ip not in blue_defended_nodes: - blue_defended_nodes[target_ip] = [] - blue_defended_nodes[target_ip].append(delta_val) - - # 2. Evaluate Red attacks against the compiled simultaneous defenses - for red_id in red_agents: - red_eff = effects[red_id] - if not red_eff.success: - continue # Already failed natively, ignore - - collision_detected = False - for delta_key in list(red_eff.state_deltas.keys()): - if 'hosts/' in delta_key: - target_ip = delta_key.split('/')[1] - - # If Red is targeting a node that Blue is simultaneously modifying - if target_ip in blue_defended_nodes: - # For now, we apply a hard Zero-Trust temporal priority: Blue Defense always wins ties - collision_detected = True - break - - if collision_detected: - # Nullify Red's attack effect entirely and alert the network telemetry - effects[red_id].success = False - effects[red_id].state_deltas = {} - effects[red_id].observation_data['alert'] = ( - 'TEMPORAL_COLLISION_DEFENSE_SUPREMACY' - ) - - return effects - - def _apply_state_deltas(self, effects: Dict[str, ActionEffect]): - """Applies validated deltas to the GlobalNetworkState. - - Only called AFTER temporal collisions have been mathematically - resolved. - """ - for agent_id, effect in effects.items(): - if effect.success: - for delta_key, delta_val in effect.state_deltas.items(): - self.global_state.apply_delta(delta_key, delta_val) - - def _calculate_reward( - self, agent_id: str, state, effect: ActionEffect = None - ) -> float: - """Delegates reward logic directly to the localized Scenario module.""" - return self.scenario.calculate_reward(agent_id, state, effect) - - def _extract_agent_infos(self, observations: dict, resolved_effects: dict) -> dict: - """Extracts security metrics for TensorBoard and CSV logging callbacks. - - Args: - observations: Dictionary of agent observations for this step. - resolved_effects: Dictionary of resolved action effects. - - Returns: - Dictionary mapping agent_id to an info dictionary with security metrics. - """ - infos = {} - for agent in list(observations.keys()): - agent_effect = resolved_effects.get(agent) - info: dict = {} - - # Count security-relevant events from this step - false_positives = 0 - successful_exploits = 0 - hosts_isolated = 0 - services_restored = 0 - - if agent_effect and agent_effect.success: - for delta_key, delta_val in agent_effect.state_deltas.items(): - if 'status' in delta_key and delta_val == 'isolated': - hosts_isolated += 1 - # Check if the isolated host was actually compromised - parts = delta_key.split('/') - if len(parts) >= 2: - ip = parts[1] - host = self.global_state.all_hosts.get(ip) - if host and host.compromised_by == 'None': - false_positives += 1 # Isolated a clean host - elif 'privilege' in delta_key and delta_val in ('User', 'Root'): - successful_exploits += 1 - elif 'status' in delta_key and delta_val == 'online': - services_restored += 1 - - info['false_positives'] = float(false_positives) - info['successful_exploits'] = float(successful_exploits) - info['hosts_isolated'] = float(hosts_isolated) - info['services_restored'] = float(services_restored) - - # Extra context for analysis - info['agent_energy'] = float(self.global_state.agent_energy.get(agent, 0)) - info['compromised_hosts'] = float( - sum( - 1 - for h in self.global_state.all_hosts.values() - if h.compromised_by != 'None' - ) - ) - info['isolated_hosts'] = float( - sum( - 1 - for h in self.global_state.all_hosts.values() - if h.status == 'isolated' - ) - ) - - infos[agent] = info - - return infos diff --git a/marl_cyborg/topologies/network_generator.py b/marl_cyborg/topologies/network_generator.py deleted file mode 100644 index 6334a0c..0000000 --- a/marl_cyborg/topologies/network_generator.py +++ /dev/null @@ -1,104 +0,0 @@ -import random -import yaml -from pathlib import Path -from typing import Optional -from marl_cyborg.core.state import GlobalNetworkState, Subnet, Host - - -class NetworkGenerator: - """Procedurally generates or loads dynamic network topologies for MARL - - training. - - Prevents agents from overfitting to a static 10-node architecture. - """ - - def __init__(self, config_path: Optional[str] = None): - self.config_path = config_path - - def generate(self, seed: Optional[int] = None) -> GlobalNetworkState: - """Generates the architecture. - - If a config path was provided, loads deterministically. - Otherwise, procedurally generates a randomized topology. - """ - if seed is not None: - random.seed(seed) - - if self.config_path and Path(self.config_path).exists(): - return self._load_from_yaml(self.config_path) - - return self._generate_procedural() - - def _generate_procedural(self) -> GlobalNetworkState: - """Creates a randomized network with 2-4 subnets and 5-15 hosts. - - Randomizes IP bounds and initial decoy placements. - """ - state = GlobalNetworkState() - - # Determine number of subnets (e.g., DMZ, Corp, Secure, Guest) - num_subnets = random.randint(2, 4) - subnet_names = ['DMZ', 'Corporate', 'Secure', 'Guest'] - base_ips = ['192.168.1', '10.0.0', '10.0.1', '172.16.0'] - - for i in range(num_subnets): - cidr = f'{base_ips[i]}.0/24' - subnet = Subnet(cidr=cidr, name=subnet_names[i]) - state.add_subnet(subnet) - - # 2 to 6 hosts per subnet - num_hosts = random.randint(2, 6) - for j in range(1, num_hosts + 1): - host_ip = f'{base_ips[i]}.{j * random.randint(1, 5)}' - host = Host( - ip=host_ip, hostname=f'{subnet_names[i]}_Node_{j}', subnet_cidr=cidr - ) - - # Randomly place a Blue Team decoy (15% chance) - if random.random() < 0.15: - host.decoy = random.choice(['Apache', 'SSHD', 'Tomcat', 'active']) - else: - # Assign legitimate OS profiles and CVEs to real hosts - profiles = [ - ('Windows_10', ['RDP', 'SMB'], ['CVE-2019-0708', 'MS17-010']), - ( - 'Windows_Server_2016', - ['SMB', 'IIS'], - ['MS17-010', 'CVE-2021-44228'], - ), - ('Linux_Ubuntu', ['SSH', 'Apache'], ['CVE-2021-44228', 'V4L2']), - ('Linux_CentOS', ['SSH', 'Tomcat'], ['CVE-2021-44228']), - ] - chosen_os, chosen_services, potential_cves = random.choice(profiles) - host.os = chosen_os - host.services = chosen_services - # Randomly assign 0 to 2 specific vulnerabilities from the valid pool to prevent guaranteed exploitation - num_vulns = random.randint(0, min(2, len(potential_cves))) - host.vulnerabilities = random.sample(potential_cves, num_vulns) - - state.register_host(host) - - # Ensure Red always knows at least one entry node (DMZ) at step 0 - if '192.168.1.0/24' in state.subnets: - dmz_hosts = list(state.subnets['192.168.1.0/24'].hosts.values()) - if dmz_hosts: - # Add default knowledge for Red Commander - state.update_knowledge('red_commander', dmz_hosts[0].ip) - state.update_knowledge('red_operator', dmz_hosts[0].ip) - - # Blue knows everything initially - for host in state.all_hosts.values(): - state.update_knowledge('blue_commander', host.ip) - state.update_knowledge('blue_operator', host.ip) - - return state - - def _load_from_yaml(self, path: str) -> GlobalNetworkState: - """Loads a deterministic graph from a YAML configuration.""" - with open(path, 'r') as f: - _ = yaml.safe_load(f) - - # Implementation left for future expansion if YAML is required. - # Defaults to procedural if parsing fails. - return self._generate_procedural() diff --git a/marl_cyborg/__init__.py b/netforge_rl/__init__.py similarity index 59% rename from marl_cyborg/__init__.py rename to netforge_rl/__init__.py index 3a33d0d..aae6533 100644 --- a/marl_cyborg/__init__.py +++ b/netforge_rl/__init__.py @@ -1,17 +1,10 @@ -"""MARL_CybORG v3.0 Library Multi-Agent Cybersecurity Simulator based on - -CybORG. -""" - -__version__ = '3.0.0' - -from .environment.parallel_env import ParallelMarlCyborg +from .environment.parallel_env import NetForgeRLEnv from .core.action import BaseAction, ActionEffect from .core.state import GlobalNetworkState, Host, Subnet from .core.observation import BaseObservation __all__ = [ - 'ParallelMarlCyborg', + 'NetForgeRLEnv', 'BaseAction', 'ActionEffect', 'GlobalNetworkState', diff --git a/marl_cyborg/actions/__init__.py b/netforge_rl/actions/__init__.py similarity index 81% rename from marl_cyborg/actions/__init__.py rename to netforge_rl/actions/__init__.py index 348d266..6796da0 100644 --- a/marl_cyborg/actions/__init__.py +++ b/netforge_rl/actions/__init__.py @@ -26,6 +26,7 @@ V4L2KernelExploit, KillProcess, ShareIntelligence, + OverloadPLC, ) __all__ = [ @@ -54,4 +55,12 @@ 'V4L2KernelExploit', 'KillProcess', 'ShareIntelligence', + 'OverloadPLC', + 'SecurityAwarenessTraining', + 'DeployHoneytoken', ] + +from .blue import SecurityAwarenessTraining +from .blue import DeployHoneytoken + +__all__.extend(['SecurityAwarenessTraining', 'DeployHoneytoken']) diff --git a/marl_cyborg/actions/blue/__init__.py b/netforge_rl/actions/blue/__init__.py similarity index 63% rename from marl_cyborg/actions/blue/__init__.py rename to netforge_rl/actions/blue/__init__.py index e797bf0..5f82275 100644 --- a/marl_cyborg/actions/blue/__init__.py +++ b/netforge_rl/actions/blue/__init__.py @@ -4,9 +4,17 @@ Remove, RestoreFromBackup, ConfigureACL, + SecurityAwarenessTraining, ) from .analysis import Monitor, Analyze -from .deception import DeployDecoy, DecoyApache, DecoySSHD, DecoyTomcat, Misinform +from .deception import ( + DeployDecoy, + DecoyApache, + DecoySSHD, + DecoyTomcat, + Misinform, + DeployHoneytoken, +) __all__ = [ 'IsolateHost', @@ -21,4 +29,6 @@ 'DecoyTomcat', 'Misinform', 'ConfigureACL', + 'SecurityAwarenessTraining', + 'DeployHoneytoken', ] diff --git a/marl_cyborg/actions/blue/analysis.py b/netforge_rl/actions/blue/analysis.py similarity index 95% rename from marl_cyborg/actions/blue/analysis.py rename to netforge_rl/actions/blue/analysis.py index 6ddf60b..13b8f94 100644 --- a/marl_cyborg/actions/blue/analysis.py +++ b/netforge_rl/actions/blue/analysis.py @@ -1,6 +1,8 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +@action_registry.register('blue_operator', 2) class Monitor(BaseAction): """Deploys active traffic analysis scanning on a specific subnet or host. @@ -64,6 +66,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_operator', 3) class Analyze(BaseAction): """Executes a forensic deep scan of a specific host for malware indicators diff --git a/marl_cyborg/actions/blue/deception.py b/netforge_rl/actions/blue/deception.py similarity index 79% rename from marl_cyborg/actions/blue/deception.py rename to netforge_rl/actions/blue/deception.py index 3873220..545d6fc 100644 --- a/marl_cyborg/actions/blue/deception.py +++ b/netforge_rl/actions/blue/deception.py @@ -1,6 +1,8 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +@action_registry.register('blue_commander', 0) class DeployDecoy(BaseAction): """Deploys a generic high-interaction honeypot/decoy service to a target @@ -46,6 +48,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_commander', 1) class DecoyApache(BaseAction): """Deploys a specifically profiled Apache Web Server (Port 80) honeypot. @@ -86,6 +89,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_commander', 2) class DecoySSHD(BaseAction): """Deploys a fake SSH daemon (Port 22) honeypot specifically designed to @@ -124,6 +128,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_commander', 3) class DecoyTomcat(BaseAction): """Deploys a fake Tomcat server (Port 8080) to deceive application port @@ -162,6 +167,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_commander', 4) class Misinform(BaseAction): """Injects false host telemetry or alters logging infrastructure to feed @@ -202,3 +208,33 @@ def execute(self, global_state) -> ActionEffect: 'alert': f'Misinformation campaign active on {self.target_ip}.' }, ) + + +@action_registry.register('blue_commander', 5) +class DeployHoneytoken(BaseAction): + """ + Injects fake, highly-monitored credentials into the memory space of a real host. + + If a Red agent successfully compromises this host and attempts to perform + post-exploitation (e.g., Pass-the-Hash, credential dumping), they ingest the + Honeytoken instead. This triggers an immediate, 100% confidence SIEM Alert + exposing the Red agent's exact location natively. + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__( + agent_id, target_ip=target_ip, cost=5, financial_cost=50, duration=1 + ) + + def validate(self, global_state) -> bool: + return self.target_ip in global_state.all_hosts + + def execute(self, global_state) -> ActionEffect: + return ActionEffect( + success=True, + state_deltas={f'hosts/{self.target_ip}/contains_honeytokens': True}, + observation_data={ + 'alert': f'Honeytokens actively deployed in RAM on {self.target_ip}.' + }, + eta=self.duration, + ) diff --git a/marl_cyborg/actions/blue/mitigation.py b/netforge_rl/actions/blue/mitigation.py similarity index 75% rename from marl_cyborg/actions/blue/mitigation.py rename to netforge_rl/actions/blue/mitigation.py index bc39cd5..e5aaa06 100644 --- a/marl_cyborg/actions/blue/mitigation.py +++ b/netforge_rl/actions/blue/mitigation.py @@ -1,6 +1,13 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +from netforge_rl.core.commands import ( + UpdateHostStatusCommand, + DropSessionCommand, + BlockPortCommand, +) +@action_registry.register('blue_operator', 0) class IsolateHost(BaseAction): """Disconnects a compromised host completely from the network @@ -41,11 +48,15 @@ def execute(self, global_state) -> ActionEffect: """ return ActionEffect( success=True, - state_deltas={f'hosts/{self.target_ip}/status': 'isolated'}, + state_deltas=[ + UpdateHostStatusCommand(self.target_ip, 'isolated'), + DropSessionCommand(self.target_ip), + ], observation_data={'alert': 'Host isolated securely.'}, ) +@action_registry.register('blue_operator', 1) class RestoreHost(BaseAction): """Re-establishes network connectivity for a previously isolated host. @@ -91,6 +102,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_operator', 4) class Remove(BaseAction): """Evicts unauthorized threat actors from a compromised element. @@ -137,6 +149,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_operator', 5) class RestoreFromBackup(BaseAction): """Executes a bare-metal imaging recovery to purge advanced persistent @@ -186,6 +199,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('blue_operator', 6) class ConfigureACL(BaseAction): """ Dynamically modifies the implicit routing Firewall to block specific port @@ -211,11 +225,51 @@ def execute(self, global_state) -> ActionEffect: """ Calculates the physics delta to apply the firewall block rule. """ - safe_subnet = self.target_ip.replace('/', '_slash_') return ActionEffect( success=True, - state_deltas={f'firewall/block/{safe_subnet}/{self.port}': 'True'}, + state_deltas=[BlockPortCommand(self.target_ip, self.port)], observation_data={ 'alert': f'ACL configured: Drop Port {self.port} to {self.target_ip}' }, ) + + +@action_registry.register('blue_operator', 7) +class SecurityAwarenessTraining(BaseAction): + """ + Deploys rapid, intensive anti-phishing training to a targeted subnet. + + Temporarily slashes the `human_vulnerability_score` of all users in the subset, + drastically lowering the success rate of Red Team SpearPhishing campaigns. + Costs significant Financial budget due to operational lost time. + """ + + def __init__(self, agent_id: str, target_subnet: str): + super().__init__( + agent_id, target_ip=target_subnet, cost=2, financial_cost=2000, duration=3 + ) + + def validate(self, global_state) -> bool: + return self.target_ip in global_state.subnets + + def execute(self, global_state) -> ActionEffect: + subnet = global_state.subnets.get(self.target_ip) + if not subnet: + return ActionEffect(success=False, state_deltas={}, observation_data={}) + + deltas = {} + for host in subnet.hosts.values(): + if hasattr(host, 'human_vulnerability_score'): + current_score = host.human_vulnerability_score + # Slash vulnerability by 80% + new_score = round(current_score * 0.2, 2) + deltas[f'hosts/{host.ip}/human_vulnerability_score'] = new_score + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={ + 'alert': f'Security Awareness Training completed on {self.target_ip}. Vulnerability drastically lowered.' + }, + eta=self.duration, + ) diff --git a/marl_cyborg/actions/network/ip_fragmentation.py b/netforge_rl/actions/network/ip_fragmentation.py similarity index 93% rename from marl_cyborg/actions/network/ip_fragmentation.py rename to netforge_rl/actions/network/ip_fragmentation.py index 22a37e9..38c0f1b 100644 --- a/marl_cyborg/actions/network/ip_fragmentation.py +++ b/netforge_rl/actions/network/ip_fragmentation.py @@ -1,9 +1,9 @@ from typing import TYPE_CHECKING -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect if TYPE_CHECKING: - from marl_cyborg.core.state import GlobalNetworkState + from netforge_rl.core.state import GlobalNetworkState class IPFragmentationAction(BaseAction): diff --git a/marl_cyborg/actions/red/__init__.py b/netforge_rl/actions/red/__init__.py similarity index 85% rename from marl_cyborg/actions/red/__init__.py rename to netforge_rl/actions/red/__init__.py index 6924d08..5e754e1 100644 --- a/marl_cyborg/actions/red/__init__.py +++ b/netforge_rl/actions/red/__init__.py @@ -8,6 +8,7 @@ from .privilege_escalation import PrivilegeEscalate, JuicyPotato, V4L2KernelExploit from .impact import Impact, KillProcess from .coordination import ShareIntelligence +from .kinetic import OverloadPLC __all__ = [ 'NetworkScan', @@ -23,4 +24,8 @@ 'Impact', 'KillProcess', 'ShareIntelligence', + 'OverloadPLC', + 'SpearPhishing', ] + +from .social_engineering import SpearPhishing diff --git a/marl_cyborg/actions/red/coordination.py b/netforge_rl/actions/red/coordination.py similarity index 93% rename from marl_cyborg/actions/red/coordination.py rename to netforge_rl/actions/red/coordination.py index 5253e62..6c4b15f 100644 --- a/marl_cyborg/actions/red/coordination.py +++ b/netforge_rl/actions/red/coordination.py @@ -1,6 +1,8 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +@action_registry.register('red_commander', 3) class ShareIntelligence(BaseAction): """Explicitly shares the current agent's 'Fog of War' knowledge graph with diff --git a/marl_cyborg/actions/red/exploits.py b/netforge_rl/actions/red/exploits.py similarity index 74% rename from marl_cyborg/actions/red/exploits.py rename to netforge_rl/actions/red/exploits.py index bb7bd9a..786eee0 100644 --- a/marl_cyborg/actions/red/exploits.py +++ b/netforge_rl/actions/red/exploits.py @@ -1,6 +1,12 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +from netforge_rl.core.commands import ( + UpdateHostPrivilegeCommand, + EstablishSessionCommand, +) +@action_registry.register('red_operator', 0) class ExploitRemoteService(BaseAction): """Attempts to weaponize a generic remote code execution vulnerability on a @@ -16,7 +22,13 @@ class ExploitRemoteService(BaseAction): """ def __init__(self, agent_id: str, target_ip: str, port: int = 80): - super().__init__(agent_id, target_ip=target_ip) + super().__init__( + agent_id, + target_ip=target_ip, + cost=5, + duration=5, + required_prior_state='DiscoverNetworkServices', + ) self.port = port def validate(self, global_state) -> bool: @@ -30,53 +42,54 @@ def validate(self, global_state) -> bool: Returns: bool: True if physically routable, False if blocked by the physics engine. """ + if not super().validate(global_state): + return False return global_state.can_route_to(self.target_ip) def execute(self, global_state) -> ActionEffect: - """Executes the payload impact mathematical deltas on the host's - - privilege table. Fails if no inherent vulnerabilities are found. - - Args: - global_state (GlobalNetworkState): State snapshot prior to temporal resolution. - - Returns: - ActionEffect: A structured delta mapping dictating a compromise to 'User'. - """ import random host = global_state.all_hosts.get(self.target_ip) - if host and host.vulnerabilities: - roll = random.random() - if roll < 0.15: - return ActionEffect( - success=False, - state_deltas={}, - observation_data={'exploit': 'failed silently'}, - ) - elif roll < 0.25: - return ActionEffect( - success=False, - state_deltas={f'hosts/{self.target_ip}/status': 'kernel_panic'}, - observation_data={'exploit': 'failed - kernel panic'}, - ) + if not host or not host.vulnerabilities: + return ActionEffect(success=False, state_deltas=[], observation_data={}) + + # CVSS-Weighted Stochastics (1.0 = 100% success on 10.0 CVSS, 0.2 = 20% on 2.0 CVSS) + cvss = getattr(host, 'cvss_score', 5.0) # Default average vulnerability logic + probability_of_success = cvss / 10.0 + if host.decoy == 'active' or random.random() > probability_of_success: return ActionEffect( - success=True, - state_deltas={ - f'hosts/{self.target_ip}/privilege': 'User', - f'hosts/{self.target_ip}/compromised_by': self.agent_id, + success=False, + state_deltas=[], + observation_data={ + 'failed_exploit': self.target_ip, + 'reason': 'stochastic_cvss_failure', }, - observation_data={'exploit': 'success'}, ) + # Build OOP Delta List + deltas = [ + UpdateHostPrivilegeCommand( + self.target_ip, 'User', compromised_by=self.agent_id + ), + EstablishSessionCommand(self.agent_id, self.target_ip, port=self.port), + ] + + obs_data = { + 'exploit': self.target_ip, + 'status': 'User_Access_Gained', + 'active_session_established': True, + } + return ActionEffect( - success=False, - state_deltas={}, - observation_data={'exploit': 'failed - target lacks vulnerabilities'}, + success=True, + state_deltas=deltas, + observation_data=obs_data, + eta=self.duration, ) +@action_registry.register('red_operator', 3) class ExploitBlueKeep(BaseAction): """Executes the CVE-2019-0708 (BlueKeep) vulnerability against Remote @@ -90,7 +103,13 @@ class ExploitBlueKeep(BaseAction): """ def __init__(self, agent_id: str, target_ip: str): - super().__init__(agent_id, target_ip=target_ip) + super().__init__( + agent_id, + target_ip=target_ip, + cost=3, + duration=4, + required_prior_state='DiscoverNetworkServices', + ) def validate(self, global_state) -> bool: """Verifies logical network accessibility traversing through DMZs. @@ -101,6 +120,8 @@ def validate(self, global_state) -> bool: Returns: bool: True if Port 3389 routing is valid. """ + if not super().validate(global_state): + return False return global_state.can_route_to(self.target_ip) def execute(self, global_state) -> ActionEffect: @@ -150,6 +171,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_operator', 4) class ExploitEternalBlue(BaseAction): """Executes the MS17-010 (EternalBlue) exploit targeting poorly configured @@ -163,7 +185,13 @@ class ExploitEternalBlue(BaseAction): """ def __init__(self, agent_id: str, target_ip: str): - super().__init__(agent_id, target_ip=target_ip) + super().__init__( + agent_id, + target_ip=target_ip, + cost=4, + duration=6, + required_prior_state='DiscoverNetworkServices', + ) def validate(self, global_state) -> bool: """Ensures target accessibility within standard MARL routing @@ -176,6 +204,8 @@ def validate(self, global_state) -> bool: Returns: bool: Evaluation boolean for execution clearance. """ + if not super().validate(global_state): + return False return global_state.can_route_to(self.target_ip) def execute(self, global_state) -> ActionEffect: @@ -225,6 +255,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_operator', 5) class ExploitHTTP_RFI(BaseAction): """Simulates a Remote File Inclusion (RFI) web application attack vector @@ -239,10 +270,18 @@ class ExploitHTTP_RFI(BaseAction): """ def __init__(self, agent_id: str, target_ip: str): - super().__init__(agent_id, target_ip=target_ip) + super().__init__( + agent_id, + target_ip=target_ip, + cost=3, + duration=3, + required_prior_state='DiscoverNetworkServices', + ) def validate(self, global_state) -> bool: """Requires valid routing to the web interface.""" + if not super().validate(global_state): + return False return global_state.can_route_to(self.target_ip) def execute(self, global_state) -> ActionEffect: diff --git a/marl_cyborg/actions/red/impact.py b/netforge_rl/actions/red/impact.py similarity index 64% rename from marl_cyborg/actions/red/impact.py rename to netforge_rl/actions/red/impact.py index e6f63cf..604a730 100644 --- a/marl_cyborg/actions/red/impact.py +++ b/netforge_rl/actions/red/impact.py @@ -1,6 +1,8 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +@action_registry.register('red_operator', 2) class Impact(BaseAction): """Executes the final localized objective of the Cyber Kill Chain (e.g., @@ -46,6 +48,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_operator', 8) class KillProcess(BaseAction): """ Terminates a specific process (e.g., EDR sensor) on a compromised host. @@ -77,3 +80,35 @@ def execute(self, global_state) -> ActionEffect: state_deltas={f'hosts/{self.target_ip}/edr_active': False}, observation_data={'kill_process': 'EDR blinded'}, ) + + +@action_registry.register('red_operator', 10) +class ExfiltrateData(BaseAction): + """Exfiltrates sensitive data out of a compromised node. + + Generates enormous network traffic traversing the egress point. + Susceptible to Volumetric SIEM detection based on dynamic SNR thresholds. + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__(agent_id, target_ip=target_ip, duration=3) + + def validate(self, global_state) -> bool: + host = global_state.all_hosts.get(self.target_ip) + if not host or host.privilege not in ['User', 'Root']: + return False + return global_state.can_route_to(self.target_ip) + + def execute(self, global_state) -> ActionEffect: + from netforge_rl.core.commands import ConsumeBandwidthCommand + + host = global_state.all_hosts.get(self.target_ip) + target_subnet = host.subnet_cidr if host else 'unknown' + + deltas = [ConsumeBandwidthCommand(target_subnet, amount=500)] + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={'action': 'exfiltrated_data_chunk'}, + ) diff --git a/netforge_rl/actions/red/kinetic.py b/netforge_rl/actions/red/kinetic.py new file mode 100644 index 0000000..f70bb2e --- /dev/null +++ b/netforge_rl/actions/red/kinetic.py @@ -0,0 +1,76 @@ +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry + + +@action_registry.register('red_operator', 20) +class OverloadPLC(BaseAction): + """Initiates a devastating Kinetic Impact on a compromised Cyber-Physical OT Node. + + If the target PLC is compromised to Root level and belongs to the Operational Technology + infrastructure, this action radically forces hardware temperatures past catastrophic thresholds. + + Args: + agent_id (str): Reference string for the agent executing the attack. + target_ip (str): IP address belonging to a PLC_Firmware host inside the OT_Subnet. + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__( + agent_id, + target_ip=target_ip, + cost=20, # High Energy + financial_cost=1000, # High Financial Requirement + duration=10, + required_prior_state='DiscoverNetworkServices', + ) + + def validate(self, global_state) -> bool: + """Ensures the target exists, is routeable, and is ACTUALLY an OT device.""" + if not super().validate(global_state): + return False + + host = global_state.all_hosts.get(self.target_ip) + if not host: + return False + + # Target must be physically OT Infrastructure + if host.os != 'PLC_Firmware' or host.subnet_cidr != '10.0.99.0/24': + return False + + # Red must have maximum execution privileges over the mechanical interface + if host.privilege != 'Root': + return False + + return global_state.can_route_to(self.target_ip) + + def execute(self, global_state) -> ActionEffect: + import random + + host = global_state.all_hosts.get(self.target_ip) + if not host: + return ActionEffect(success=False, state_deltas={}, observation_data={}) + + current_temperature = getattr(host, 'temperature', 50.0) + + # Inject rapid thermal escalation +90 to +150 delta + spike = random.uniform(90.0, 150.0) + new_temp = current_temperature + spike + + deltas = { + f'hosts/{self.target_ip}/temperature': new_temp, + f'hosts/{self.target_ip}/system_integrity': 'kinetic_destruction', + } + + # Stuxnet-level alert severity for the Blue Team SIEM + obs_data = { + 'action': 'overload_plc', + 'status': 'kinetic_impact_achieved', + 'terminal_temperature': new_temp, + } + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data=obs_data, + eta=self.duration, + ) diff --git a/marl_cyborg/actions/red/privilege_escalation.py b/netforge_rl/actions/red/privilege_escalation.py similarity index 73% rename from marl_cyborg/actions/red/privilege_escalation.py rename to netforge_rl/actions/red/privilege_escalation.py index 1b5906b..9e195d0 100644 --- a/marl_cyborg/actions/red/privilege_escalation.py +++ b/netforge_rl/actions/red/privilege_escalation.py @@ -1,6 +1,8 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +@action_registry.register('red_operator', 1) class PrivilegeEscalate(BaseAction): """Executes a generic local privilege escalation exploit on a compromised @@ -50,6 +52,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_operator', 6) class JuicyPotato(BaseAction): """Simulates the JuicyPotato local privilege escalation vector leveraging @@ -103,6 +106,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_operator', 7) class V4L2KernelExploit(BaseAction): """Executes a specific kernel-level vulnerability via Video4Linux (V4L2) on @@ -156,3 +160,51 @@ def execute(self, global_state) -> ActionEffect: }, observation_data={'privilege': 'V4L2 Kernel escalated'}, ) + + +@action_registry.register('red_operator', 9) +class PassTheHash(BaseAction): + """Executes a lateral movement attack bypassing authentication using + Kerberos / NTLM hashes extracted from a Domain Controller. + + Args: + agent_id (str): Reference to the executing Red operator. + target_ip (str): Target IPv4 string (can be un-exploited if DC is cracked). + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__(agent_id, target_ip=target_ip, cost=1) + + def validate(self, global_state) -> bool: + """Validates if the agent has previously Rooted ANY Domain Controller.""" + has_dc_hash = False + for host in global_state.all_hosts.values(): + if host.is_domain_controller and host.privilege in ['Root', 'SYSTEM']: + if host.compromised_by == self.agent_id: + has_dc_hash = True + break + + if not has_dc_hash: + return False + + return global_state.can_route_to(self.target_ip) + + def execute(self, global_state) -> ActionEffect: + """Applies instantaneous SYSTEM access based on Golden Ticket leverage. + + Returns: + ActionEffect: Elevated root control unconditionally on target node. + """ + from netforge_rl.core.commands import UpdateHostPrivilegeCommand + + deltas = [ + UpdateHostPrivilegeCommand( + self.target_ip, 'Root', compromised_by=self.agent_id + ) + ] + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={'privilege': 'Pass-The-Hash lateral pivot successful.'}, + ) diff --git a/marl_cyborg/actions/red/reconnaissance.py b/netforge_rl/actions/red/reconnaissance.py similarity index 90% rename from marl_cyborg/actions/red/reconnaissance.py rename to netforge_rl/actions/red/reconnaissance.py index ee96d2d..00ef012 100644 --- a/marl_cyborg/actions/red/reconnaissance.py +++ b/netforge_rl/actions/red/reconnaissance.py @@ -1,6 +1,8 @@ -from marl_cyborg.core.action import BaseAction, ActionEffect +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.registry import action_registry +@action_registry.register('red_commander', 0) class NetworkScan(BaseAction): """Executes a wide network scan across a specified subnet to map active IP @@ -49,6 +51,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_commander', 1) class DiscoverRemoteSystems(BaseAction): """Executes a targeted Ping Sweep against a subnet to explicitly identify @@ -108,6 +111,7 @@ def execute(self, global_state) -> ActionEffect: ) +@action_registry.register('red_commander', 2) class DiscoverNetworkServices(BaseAction): """Executes an intrusive port scan against a specific host to enumerate @@ -121,7 +125,7 @@ class DiscoverNetworkServices(BaseAction): """ def __init__(self, agent_id: str, target_ip: str): - super().__init__(agent_id, target_ip=target_ip, cost=2) + super().__init__(agent_id, target_ip=target_ip, cost=2, duration=3) def validate(self, global_state) -> bool: """Confirms target host is active and packet routing is unblocked by @@ -161,8 +165,11 @@ def execute(self, global_state) -> ActionEffect: obs_data['os'] = host.os obs_data['vulnerabilities'] = host.vulnerabilities - # Update knowledge that we scanned this host - knowledge_deltas = {f'knowledge/{self.agent_id}/{self.target_ip}': 'True'} + # Update knowledge that we scanned this host and add to history + knowledge_deltas = { + f'knowledge/{self.agent_id}/{self.target_ip}': 'True', + f'history/{self.agent_id}/DiscoverNetworkServices:{self.target_ip}': 'add', + } return ActionEffect( success=True, state_deltas=knowledge_deltas, observation_data=obs_data diff --git a/netforge_rl/actions/red/social_engineering.py b/netforge_rl/actions/red/social_engineering.py new file mode 100644 index 0000000..1f763b9 --- /dev/null +++ b/netforge_rl/actions/red/social_engineering.py @@ -0,0 +1,80 @@ +import random +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.commands import ( + EstablishSessionCommand, + UpdateHostPrivilegeCommand, +) +from netforge_rl.core.registry import action_registry + + +@action_registry.register('red_operator', 21) +class SpearPhishing(BaseAction): + """Executes a targeted Social Engineering campaign against a Corporate End-User. + + Unlike standard Exploits, SpearPhishing leverages email protocols and bypasses + perimeter firewalls and DMZ routing constraints entirely. Its success probability + is purely dictated by the `human_vulnerability_score` of the human operator + assigned to the generated Endpoint, simulating clicks on malicious attachments. + + Args: + agent_id (str): Reference ID of the Red operating unit. + target_ip (str): IP address of the target User Node (typically Corporate/Secure subnet). + """ + + def __init__(self, agent_id: str, target_ip: str): + super().__init__( + agent_id, + target_ip=target_ip, + cost=2, + financial_cost=50, # Costs minor operational budget to purchase domain infrastructure + duration=15, # High duration (waiting for users to organically check email) + required_prior_state=None, # Can be shot blindly without structural discovery + ) + + def validate(self, global_state) -> bool: + """Overrides parent zone constraints to simulate out-of-band email protocol delivery.""" + if not self.target_ip or self.target_ip not in global_state.all_hosts: + return False + + host = global_state.all_hosts[self.target_ip] + # Nobody reads emails on PLCs or dedicated Servers; restricted to Windows Endpoints here + if 'Windows' not in getattr(host, 'os', ''): + return False + + return True + + def execute(self, global_state) -> ActionEffect: + host = global_state.all_hosts.get(self.target_ip) + + # Pull the phishability score generated procedurally + phish_chance = getattr(host, 'human_vulnerability_score', 0.1) + + if random.random() > phish_chance: + return ActionEffect( + success=False, + state_deltas=[], + observation_data={ + 'phishing': 'failed', + 'reason': 'user reported suspicious email', + }, + ) + + # Build OOP Delta List granting User-level reverse shell from the clicked attachment + deltas = [ + UpdateHostPrivilegeCommand( + self.target_ip, 'User', compromised_by=self.agent_id + ), + EstablishSessionCommand( + self.agent_id, self.target_ip, port=443 + ), # Emulate C2 over HTTPS + ] + + return ActionEffect( + success=True, + state_deltas=deltas, + observation_data={ + 'phishing': 'success', + 'status': 'C2 Session Established via user execution', + }, + eta=self.duration, + ) diff --git a/marl_cyborg/agents/__init__.py b/netforge_rl/agents/__init__.py similarity index 100% rename from marl_cyborg/agents/__init__.py rename to netforge_rl/agents/__init__.py diff --git a/marl_cyborg/agents/b_line.py b/netforge_rl/agents/b_line.py similarity index 98% rename from marl_cyborg/agents/b_line.py rename to netforge_rl/agents/b_line.py index f3d0d13..b28c15e 100644 --- a/marl_cyborg/agents/b_line.py +++ b/netforge_rl/agents/b_line.py @@ -20,7 +20,7 @@ def __init__(self, agent_id: str): self.step_count = 0 def get_action(self, observation: np.ndarray, global_state) -> Any: - from marl_cyborg.actions import ( + from netforge_rl.actions import ( DiscoverRemoteSystems, DiscoverNetworkServices, ExploitRemoteService, diff --git a/netforge_rl/agents/green_agent.py b/netforge_rl/agents/green_agent.py new file mode 100644 index 0000000..ccae9fb --- /dev/null +++ b/netforge_rl/agents/green_agent.py @@ -0,0 +1,74 @@ +import random +from typing import Any, Dict + + +class GreenAgent: + """Simulates benign corporate users to generate behavioral background noise + and false-positive alerts, masking Red Agent activity. + + It operates on a Day/Night cycle across the simulated business hours. + """ + + def __init__(self, agent_id: str = 'green_agent_0'): + self.agent_id = agent_id + + def generate_noise(self, current_tick: int, global_state: Any) -> Dict[str, Any]: + """Generates random telemetry alerts based on the current tick's position + within the day/night cycle. + + Args: + current_tick (int): The current chronological tick of the environment. + global_state (GlobalNetworkState): The architecture state to extract subnets/hosts. + + Returns: + Dict[str, Any]: A dictionary representing telemetry generated by this tick. + """ + # Day: Ticks 0 to 100, Night: Ticks 101 to 149 + cycle_position = current_tick % 150 + is_day = cycle_position <= 100 + + noise_logs = [] + hosts = list(global_state.all_hosts.values()) + if not hosts: + return {'alerts': []} + + # Higher activity and more false positives during the day + probability_of_noise = 0.8 if is_day else 0.1 + probability_of_false_positive = 0.05 if is_day else 0.01 + + if random.random() < probability_of_noise: + # Generate benign background traffic + source = random.choice(hosts) + target = random.choice(hosts) + if source.ip != target.ip: + noise_logs.append( + { + 'type': 'benign_traffic', + 'source': source.ip, + 'target': target.ip, + 'protocol': random.choice(['TCP', 'UDP', 'HTTP', 'DNS']), + 'severity': 0, + } + ) + + if random.random() < probability_of_false_positive: + # Generate a false positive anomaly that could trip Blue's SIEM + target = random.choice(hosts) + noise_logs.append( + { + 'type': 'anomaly', + 'source': 'unknown_external', + 'target': target.ip, + 'signature': random.choice( + [ + 'Failed_Login_Spike', + 'Malformed_Packet', + 'Suspicious_User_Agent', + ] + ), + 'severity': random.randint(1, 4), + 'false_positive': True, + } + ) + + return {'alerts': noise_logs} diff --git a/netforge_rl/core/action.py b/netforge_rl/core/action.py new file mode 100644 index 0000000..eb60ac7 --- /dev/null +++ b/netforge_rl/core/action.py @@ -0,0 +1,95 @@ +from abc import ABC, abstractmethod +from typing import Dict, Any, Optional, TYPE_CHECKING, Union, List + +if TYPE_CHECKING: + from netforge_rl.core.state import GlobalNetworkState + from netforge_rl.core.commands import IStateDeltaCommand + + +class ActionEffect: + """Encapsulates the resulting state changes from an action for conflict + + resolution. + """ + + def __init__( + self, + success: bool, + state_deltas: Union[Dict[str, Any], List['IStateDeltaCommand']], + observation_data: Dict[str, Any], + eta: int = 0, + ): + self.success = success + self.state_deltas = state_deltas + self.observation_data = observation_data + self.eta = eta + + +class BaseAction(ABC): + """Modular Base Action for the MARL CybORG Environment. + + All highly specific network attacks (Layer 2 - Layer 7) inherit from this class. + """ + + def __init__( + self, + agent_id: str, + target_ip: Optional[str] = None, + source_ip: Optional[str] = None, + cost: int = 1, + financial_cost: int = 0, + compute_cost: int = 0, + duration: int = 1, + required_prior_state: Optional[str] = None, + ): + self.agent_id = agent_id + self.target_ip = target_ip + self.source_ip = source_ip + self.cost = cost + self.financial_cost = financial_cost + self.compute_cost = compute_cost + self.duration = duration + self.required_prior_state = required_prior_state + + def validate(self, global_state: 'GlobalNetworkState') -> bool: + """Checks if the action is physically possible in the current network + state (e.g., is there a route, are preconditions met). + """ + if self.target_ip and self.target_ip not in global_state.all_hosts: + return False + + if self.required_prior_state: + # Check Action History state logic + agent_history = global_state.action_history.get(self.agent_id, set()) + expected_record = f'{self.required_prior_state}:{self.target_ip}' + if expected_record not in agent_history: + return False + + if self.target_ip: + host = global_state.all_hosts[self.target_ip] + # Simple declarative Zone constraints example + if 'red' in self.agent_id.lower() and host.subnet_cidr == '10.0.1.0/24': + # Secure Data targets cannot be touched without pivoting via DMZ or Internal User privileges first + has_dmz = any( + h.privilege in ['User', 'Root'] + for h in global_state.all_hosts.values() + if h.subnet_cidr == '192.168.1.0/24' + ) + has_internal = any( + h.privilege in ['User', 'Root'] + for h in global_state.all_hosts.values() + if h.subnet_cidr == '10.0.0.0/24' + ) + if not (has_dmz or has_internal): + return False + + return True + + @abstractmethod + def execute(self, global_state: 'GlobalNetworkState') -> ActionEffect: + """Computes the theoretical effect of the action. + + Note: State is NOT mutated directly here. Mutations are returned via ActionEffect + to allow the Environment to resolve simultaneous multi-agent collisions. + """ + pass diff --git a/marl_cyborg/core/agent_interface.py b/netforge_rl/core/agent_interface.py similarity index 86% rename from marl_cyborg/core/agent_interface.py rename to netforge_rl/core/agent_interface.py index 262c325..016063e 100644 --- a/marl_cyborg/core/agent_interface.py +++ b/netforge_rl/core/agent_interface.py @@ -1,7 +1,7 @@ from abc import ABC, abstractmethod from typing import Optional -from marl_cyborg.core.observation import BaseObservation -from marl_cyborg.core.action import BaseAction +from netforge_rl.core.observation import BaseObservation +from netforge_rl.core.action import BaseAction class AgentInterface(ABC): diff --git a/netforge_rl/core/commands.py b/netforge_rl/core/commands.py new file mode 100644 index 0000000..82f45f4 --- /dev/null +++ b/netforge_rl/core/commands.py @@ -0,0 +1,190 @@ +from abc import ABC, abstractmethod +from typing import Any, Optional + + +class IStateDeltaCommand(ABC): + """Abstract Command interface for Object-Oriented state mutation. + Allows for decoupled physics application dynamically processed by the Resolve engine. + """ + + @abstractmethod + def execute(self, global_state: Any): + pass + + @property + @abstractmethod + def target_ip(self) -> Optional[str]: + """Exposes the primary target IP so the Action Resolver can detect temporal collisions.""" + pass + + +class UpdateKnowledgeCommand(IStateDeltaCommand): + def __init__(self, agent_id: str, ip: str, value: Any = True): + self.agent_id = agent_id + self._target_ip = ip + self.value = value + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + global_state.update_knowledge(self.agent_id, self.target_ip) + + +class UpdateHostPrivilegeCommand(IStateDeltaCommand): + def __init__(self, ip: str, privilege: str, compromised_by: Optional[str] = None): + self._target_ip = ip + self.privilege = privilege + self.compromised_by = compromised_by + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + if self._target_ip in global_state.all_hosts: + host = global_state.all_hosts[self._target_ip] + host.privilege = self.privilege + if self.compromised_by: + host.compromised_by = self.compromised_by + + +class UpdateHostStatusCommand(IStateDeltaCommand): + def __init__(self, ip: str, status: str): + self._target_ip = ip + self.status = status + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + if self._target_ip in global_state.all_hosts: + global_state.all_hosts[self._target_ip].status = self.status + + +class UpdateServiceCommand(IStateDeltaCommand): + def __init__(self, ip: str, service: str, action: str = 'remove'): + self._target_ip = ip + self.service = service + self.action = action + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + if self._target_ip in global_state.all_hosts: + host = global_state.all_hosts[self._target_ip] + if self.action == 'remove' and self.service in host.services: + host.services.remove(self.service) + elif self.action == 'add' and self.service not in host.services: + host.services.append(self.service) + + +class BlockPortCommand(IStateDeltaCommand): + def __init__(self, subnet: str, port: int): + self.subnet = subnet + self.port = port + + @property + def target_ip(self) -> Optional[str]: + return None # Targets a subnet firewall, not a single node + + def execute(self, global_state: Any): + if 'global' not in global_state.firewalls: + # We import here to avoid circular dependencies if needed depending on global state structure + from netforge_rl.core.state import Firewall + + global_state.firewalls['global'] = Firewall('global') + global_state.firewalls['global'].block_port(self.subnet, self.port) + + +class AddHistoryCommand(IStateDeltaCommand): + def __init__(self, agent_id: str, record: str): + self.agent_id = agent_id + self.record = record + + @property + def target_ip(self) -> Optional[str]: + return None # Targeting agent logic + + def execute(self, global_state: Any): + if self.agent_id not in global_state.action_history: + global_state.action_history[self.agent_id] = set() + global_state.action_history[self.agent_id].add(self.record) + + +class UpdateDecoyCommand(IStateDeltaCommand): + def __init__(self, ip: str, decoy_type: str): + self._target_ip = ip + self.decoy_type = decoy_type + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + if self._target_ip in global_state.all_hosts: + global_state.all_hosts[self._target_ip].decoy = self.decoy_type + + +class EstablishSessionCommand(IStateDeltaCommand): + def __init__(self, agent_id: str, ip: str, port: int): + self.agent_id = agent_id + self._target_ip = ip + self.port = port + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + if self.agent_id not in global_state.active_sessions: + global_state.active_sessions[self.agent_id] = [] + global_state.active_sessions[self.agent_id].append( + {'ip': self._target_ip, 'port': self.port} + ) + + +class DropSessionCommand(IStateDeltaCommand): + def __init__(self, ip: str): + self._target_ip = ip + + @property + def target_ip(self) -> Optional[str]: + return self._target_ip + + def execute(self, global_state: Any): + for agent_id, sessions in global_state.active_sessions.items(): + global_state.active_sessions[agent_id] = [ + s for s in sessions if s['ip'] != self._target_ip + ] + + +class ConsumeBandwidthCommand(IStateDeltaCommand): + def __init__(self, subnet: str, amount: int): + self.subnet = subnet + self.amount = amount + + @property + def target_ip(self) -> Optional[str]: + return None # Targets a subnet-wide telemetry pipe, not a single node + + def execute(self, global_state: Any): + if self.subnet not in global_state.subnet_bandwidth: + global_state.subnet_bandwidth[self.subnet] = 0 + + global_state.subnet_bandwidth[self.subnet] += self.amount + + # Volumetric SIEM Trigger Rule + # If any subnet spikes above 1000 units in a single tick, generate a SIEM log. + if global_state.subnet_bandwidth[self.subnet] > 1000: + volumetric_alert = { + 'type': 'volumetric_anomaly', + 'subnet': self.subnet, + 'severity': 'High', + } + if volumetric_alert not in global_state.siem_log_buffer: + global_state.siem_log_buffer.append(volumetric_alert) diff --git a/marl_cyborg/core/observation.py b/netforge_rl/core/observation.py similarity index 65% rename from marl_cyborg/core/observation.py rename to netforge_rl/core/observation.py index 03430a3..728422a 100644 --- a/marl_cyborg/core/observation.py +++ b/netforge_rl/core/observation.py @@ -23,6 +23,9 @@ def __init__(self, agent_id: str): # Tracks anomalies like 802.11 Deauths, Fragmented IP packets, etc. self.network_telemetry = {} + # SIEM Logs + self.siem_alerts = [] + def update_from_state(self, global_state: Any, action_effects: List[Any]): """Filters the global state down to only what is observable by this @@ -40,17 +43,37 @@ def update_from_state(self, global_state: Any, action_effects: List[Any]): for ip in known_ips: if ip in global_state.all_hosts: host = global_state.all_hosts[ip] - self.visible_hosts[ip] = { - 'state': 'compromised' - if host.privilege in ['User', 'Root'] - else 'clean', - 'status': host.status, - 'decoy': host.decoy, # For Blue Team sensor logic - } - - if 'commander' in self.agent_id.lower(): + + if 'blue' in self.agent_id.lower(): + # Strict POMDP: Blue cannot see physical truth vectors. + # They must rely on SIEM telemetry alone for detection. + self.visible_hosts[ip] = { + 'state': 'unknown', + 'status': host.status, + 'decoy': host.decoy, + } + else: + # Red Team directly monitors nodes they root. + self.visible_hosts[ip] = { + 'state': 'compromised' + if host.privilege in ['User', 'Root'] + else 'clean', + 'status': host.status, + 'decoy': 'unknown', + } + + if 'commander' in self.agent_id.lower() or 'blue' in self.agent_id.lower(): + # Pull SIEM logs that have arrived (arrival_tick <= current_tick) + if hasattr(global_state, 'siem_log_buffer'): + for log in global_state.siem_log_buffer: + if log.get('arrival_tick', 0) <= getattr( + global_state, 'current_tick', 0 + ): + self.siem_alerts.append(log) + self.network_telemetry['global_alert_level'] = np.random.uniform(0, 1) self.network_telemetry['total_isolated_subnets'] = np.random.randint(0, 5) + self.network_telemetry['active_alerts'] = len(self.siem_alerts) if 'operator' in self.agent_id.lower(): self.objective_vector[2] = 1.0 @@ -76,6 +99,12 @@ def to_numpy(self, max_size: int = 256) -> np.ndarray: ) # Normalized idx += 1 + if 'active_alerts' in self.network_telemetry and idx < max_size: + vector[idx] = float( + min(self.network_telemetry['active_alerts'] / 20.0, 1.0) + ) + idx += 1 + for val in self.objective_vector: if idx < max_size: vector[idx] = val diff --git a/netforge_rl/core/physics.py b/netforge_rl/core/physics.py new file mode 100644 index 0000000..9ba2bb8 --- /dev/null +++ b/netforge_rl/core/physics.py @@ -0,0 +1,68 @@ +from typing import Dict +from netforge_rl.core.action import ActionEffect + + +class ConflictResolutionEngine: + """Strategy pattern engine defining the physical constraints of action collisions. + Mathematically resolves simultaneous temporal collisions. + """ + + @staticmethod + def resolve(effects: Dict[str, ActionEffect]) -> Dict[str, ActionEffect]: + """Core physics engine. + + Priority: Blue Defensive actions generally supersede Red Offensive actions + on the exact same network node if executed in the exact same elapsed fractional tick. + """ + red_agents = [a for a in effects if 'red' in a.lower()] + blue_agents = [a for a in effects if 'blue' in a.lower()] + + # 1. Compile all Blue defensive targets + blue_defended_nodes = {} + for blue_id in blue_agents: + eff = effects[blue_id] + if eff.success: + if isinstance(eff.state_deltas, dict): + for delta_key in eff.state_deltas.keys(): + if 'hosts/' in delta_key: + target_ip = delta_key.split('/')[1] + blue_defended_nodes[target_ip] = True + elif isinstance(eff.state_deltas, list): + for delta_obj in eff.state_deltas: + if getattr(delta_obj, 'target_ip', None): + blue_defended_nodes[delta_obj.target_ip] = True + + # 2. Evaluate Red attacks against the compiled simultaneous defenses + for red_id in red_agents: + red_eff = effects[red_id] + if not red_eff.success: + continue + + collision_detected = False + + # Check dictionary deltas + if isinstance(red_eff.state_deltas, dict): + for delta_key in list(red_eff.state_deltas.keys()): + if 'hosts/' in delta_key: + target_ip = delta_key.split('/')[1] + if target_ip in blue_defended_nodes: + collision_detected = True + break + # Check command object deltas + elif isinstance(red_eff.state_deltas, list): + for delta_obj in red_eff.state_deltas: + if getattr(delta_obj, 'target_ip', None) in blue_defended_nodes: + collision_detected = True + break + + if collision_detected: + # Nullify Red's attack effect entirely and alert the network telemetry + effects[red_id].success = False + effects[red_id].state_deltas = ( + [] if isinstance(red_eff.state_deltas, list) else {} + ) + effects[red_id].observation_data['alert'] = ( + 'TEMPORAL_COLLISION_DEFENSE_SUPREMACY' + ) + + return effects diff --git a/netforge_rl/core/registry.py b/netforge_rl/core/registry.py new file mode 100644 index 0000000..ded6f4a --- /dev/null +++ b/netforge_rl/core/registry.py @@ -0,0 +1,98 @@ +from typing import Dict, Type, Optional, Callable +import inspect + + +class ActionRegistry: + """A Factory Registry for dynamically tracking and instantiating + BaseAction subclasses without monolithic if/else blocks. + + Adheres strictly to the Open-Closed Principle. + """ + + def __init__(self): + # Maps (team, action_group_id) -> ActionClass + self._actions: Dict[str, Dict[int, Type]] = { + 'red': {}, + 'red_commander': {}, + 'blue': {}, + 'blue_commander': {}, + } + + def register(self, team: str, group_id: int) -> Callable: + """Class decorator for registering an Action.""" + + def decorator(cls): + if team not in self._actions: + self._actions[team] = {} + self._actions[team][group_id] = cls + return cls + + return decorator + + def get_action_class(self, agent_id: str, group_id: int) -> Optional[Type]: + """Retrieves the class constructor for a specific integer offset.""" + if 'red' in agent_id.lower(): + team = 'red_commander' if 'commander' in agent_id.lower() else 'red' + else: + team = 'blue_commander' if 'commander' in agent_id.lower() else 'blue' + + return self._actions.get(team, {}).get(group_id) + + def instantiate_action( + self, agent_id: str, action_data: object, target_ips: list + ) -> Optional[object]: + """Factory method to resolve the generic action payload to an instance. + + Supports legacy integer decoding or advanced Hierarchical MultiDiscrete + arrays: [action_type_id, target_ip_index]. + """ + if not target_ips: + target_ips = ['127.0.0.1'] + + if ( + isinstance(action_data, (list, tuple)) + or type(action_data).__name__ == 'ndarray' + ): + # Hierarchical MultiDiscrete format + action_type_id = int(action_data[0]) + target_index = int(action_data[1]) + target_ip = target_ips[target_index % len(target_ips)] + else: + # Legacy PettingZoo flat discrete space math + action_int = int(action_data) + target_ip = target_ips[action_int % len(target_ips)] + action_group = action_int // len(target_ips) + + if 'red' in agent_id.lower(): + mod = 4 if 'commander' in agent_id.lower() else 11 + else: + mod = 5 if 'commander' in agent_id.lower() else 7 + + action_type_id = action_group % mod + + ActionCls = self.get_action_class(agent_id, action_type_id) + if not ActionCls: + return None + + # Pass required kwargs dynamically based on the action archetype + # Determine accepted arguments dynamically + sig = inspect.signature(ActionCls.__init__) + params = sig.parameters + + kwargs = {'agent_id': agent_id} + if 'target_ip' in params: + kwargs['target_ip'] = target_ip + elif 'target_subnet' in params: + # Approximate subnet from target_ip for actions requiring Subnets + parts = target_ip.split('.') + kwargs['target_subnet'] = f'{parts[0]}.{parts[1]}.{parts[2]}.0/24' + elif 'target_agent_id' in params: + # Map target_agent_id randomly or conventionally for Coordination actions + kwargs['target_agent_id'] = ( + 'red_operator' if agent_id == 'red_commander' else 'red_commander' + ) + + return ActionCls(**kwargs) + + +action_registry = ActionRegistry() diff --git a/marl_cyborg/core/state.py b/netforge_rl/core/state.py similarity index 77% rename from marl_cyborg/core/state.py rename to netforge_rl/core/state.py index 65cec5e..0458575 100644 --- a/marl_cyborg/core/state.py +++ b/netforge_rl/core/state.py @@ -1,4 +1,4 @@ -from typing import Dict, Set +from typing import Dict, Set, Any class Host: @@ -14,6 +14,13 @@ def __init__(self, ip: str, hostname: str, subnet_cidr: str): self.os: str = 'Unknown' # OS profile assigned by NetworkGenerator self.services: list = [] # Running services (SSH, SMB, etc.) self.vulnerabilities: list = [] # CVEs present on this host + self.is_domain_controller: bool = False # Allows Pass-the-Hash if Rooted + self.human_vulnerability_score: float = ( + 0.5 # Phishability indicator (0.0 to 1.0) + ) + self.contains_honeytokens: bool = ( + False # Triggers 100% confidence active deception traps + ) def __repr__(self): return ( @@ -58,9 +65,19 @@ def __init__(self): self.agent_knowledge: Dict[str, Set[str]] = {} # Tracks remaining energy/budget for temporal action constraints self.agent_energy: Dict[str, int] = {} + # Advanced Attack Economics Constraints + self.agent_funds: Dict[str, int] = {} + self.agent_compute: Dict[str, int] = {} + self.business_downtime_score: float = 0.0 + # Tracks asynchronous execution locks (ETA system) self.agent_locked_until: Dict[str, int] = {} + self.action_history: Dict[str, set] = {} self.pending_effects: list = [] + self.siem_log_buffer: list = [] + self.current_tick: int = 0 + self.active_sessions: Dict[str, list] = {} + self.subnet_bandwidth: Dict[str, int] = {} def update_knowledge(self, agent_id: str, ip: str): """Adds an IP address to the agent's knowledge graph.""" @@ -80,12 +97,25 @@ def register_host(self, host: Host): if host.subnet_cidr in self.subnets: self.subnets[host.subnet_cidr].add_host(host) - def apply_delta(self, delta_key: str, delta_value: str): - """Dynamically mutates the network graph based on dot-notation paths. + def apply_delta(self, delta_key: Any, delta_value: Any = None): + """Dynamically mutates the network graph. - Example: apply_delta("hosts/10.0.0.5/status", "isolated") - Example: apply_delta("knowledge/red_agent_0/10.0.0.5", "True") + Now supports standard OOP `IStateDeltaCommand` objects executing their + own state mutations, while retaining legacy string-path parsing for compatibility. """ + # Command Pattern Standard Execution + if hasattr(delta_key, 'execute') and callable(getattr(delta_key, 'execute')): + delta_key.execute(self) + return + + # Legacy String parsing (Deprecation Path) + if not isinstance(delta_key, str): + from netforge_rl.core.commands import IStateDeltaCommand + + if isinstance(delta_key, IStateDeltaCommand): + delta_key.execute(self) + return + parts = delta_key.split('/') if parts[0] == 'hosts' and len(parts) == 3: ip = parts[1] @@ -109,6 +139,13 @@ def apply_delta(self, delta_key: str, delta_value: str): self.firewalls['global'] = Firewall('global') self.firewalls['global'].block_port(subnet, port) + elif parts[0] == 'history' and len(parts) == 3: + agent_id = parts[1] + record = parts[2] + if agent_id not in self.action_history: + self.action_history[agent_id] = set() + self.action_history[agent_id].add(record) + def can_route_to(self, target_ip: str, port: int = None) -> bool: """Evaluates complex network topology rules for routing reachability and explicit firewall port blocks. diff --git a/netforge_rl/environment/__init__.py b/netforge_rl/environment/__init__.py new file mode 100644 index 0000000..06d267d --- /dev/null +++ b/netforge_rl/environment/__init__.py @@ -0,0 +1,4 @@ +from .base_env import BaseNetForgeRLEnv +from .parallel_env import NetForgeRLEnv + +__all__ = ['BaseNetForgeRLEnv', 'NetForgeRLEnv'] diff --git a/marl_cyborg/environment/base_env.py b/netforge_rl/environment/base_env.py similarity index 81% rename from marl_cyborg/environment/base_env.py rename to netforge_rl/environment/base_env.py index 189fe01..07ca0fc 100644 --- a/marl_cyborg/environment/base_env.py +++ b/netforge_rl/environment/base_env.py @@ -4,7 +4,7 @@ from typing import Dict, Tuple, Any -class BaseMarlCyborg(ParallelEnv, abc.ABC): +class BaseNetForgeRLEnv(ParallelEnv, abc.ABC): """Abstract Base Class for all Continuous-Time MARL environments in CybORG. This guarantees that future environments (e.g., custom network @@ -43,11 +43,3 @@ def step( Must strictly return: (observations, rewards, terminations, truncations, infos) """ pass - - @abc.abstractmethod - def _resolve_conflicts(self, intended_effects: Dict[str, Any]) -> Dict[str, Any]: - """Sub-classes must implement their own conflict resolution metric for - - simultaneous collisions. - """ - pass diff --git a/netforge_rl/environment/parallel_env.py b/netforge_rl/environment/parallel_env.py new file mode 100644 index 0000000..d44eb96 --- /dev/null +++ b/netforge_rl/environment/parallel_env.py @@ -0,0 +1,424 @@ +from typing import Dict, Tuple +import numpy as np +import gymnasium as gym + +from netforge_rl.core.action import BaseAction, ActionEffect +from netforge_rl.core.observation import BaseObservation +from netforge_rl.core.registry import action_registry +from netforge_rl.core.physics import ConflictResolutionEngine +from netforge_rl.environment.base_env import BaseNetForgeRLEnv +from netforge_rl.topologies.network_generator import NetworkGenerator +from netforge_rl.agents.green_agent import GreenAgent + + +class NetForgeRLEnv(BaseNetForgeRLEnv): + """MARL Environment for CybORG. + + Follows the PettingZoo Parallel API standard for simultaneous Multi- + Agent execution and relies exclusively on Gymnasium spaces natively. + """ + + metadata = {'render_modes': ['ansi'], 'name': 'netforge_rl_v3'} + + def __init__(self, scenario_config: dict): + # Default to procedural generation if no specific architecture config is provided + topology_path = ( + scenario_config.get('topology_path') if scenario_config else None + ) + self.network_generator = NetworkGenerator(config_path=topology_path) + + scenario_type = ( + scenario_config.get('scenario_type', 'ransomware') + if scenario_config + else 'ransomware' + ) + self.log_latency = ( + scenario_config.get('log_latency', 2) if scenario_config else 2 + ) + self.green_agent = GreenAgent() + self.possible_agents = [ + 'red_commander', + 'red_operator', + 'blue_commander', + 'blue_operator', + ] + self.agents = self.possible_agents[:] + + if scenario_type.lower() == 'ransomware': + from netforge_rl.scenarios.ransomware import RansomwareScenario + + self.scenario = RansomwareScenario(self.agents) + else: + from netforge_rl.scenarios.apt_espionage import AptEspionageScenario + + self.scenario = AptEspionageScenario(self.agents) + + self.global_state = self.network_generator.generate() + self.resolution_engine = ConflictResolutionEngine() + + # Native Gymnasium Spaces for PettingZoo API + RLlib Mapping + self.observation_spaces = { + agent: gym.spaces.Dict( + { + 'obs': gym.spaces.Box( + low=-1.0, high=1.0, shape=(256,), dtype=np.float32 + ), + 'action_mask': gym.spaces.Box( + low=0, high=1, shape=(62,), dtype=np.int8 + ), # 12 action types + 50 IPs + } + ) + for agent in self.possible_agents + } + self.action_spaces = { + agent: gym.spaces.MultiDiscrete( + [12, 50] + ) # [Action Type (max 12), Target IP Index (max 50 padded)] + for agent in self.possible_agents + } + self.max_ticks = 1000 + self.current_tick = 0 + self.event_queue = [] + + def reset( + self, seed=None, options=None + ) -> Tuple[Dict[str, np.ndarray], Dict[str, dict]]: + """Resets the network state to initial configuration natively + + (Gymnasium style + PettingZoo). + """ + self.global_state = self.network_generator.generate(seed=seed) + self.agents = self.possible_agents[:] + self.global_state.agent_energy = {agent: 50 for agent in self.agents} + self.global_state.agent_funds = { + agent: 10000 if 'blue' in agent else 5000 for agent in self.agents + } + self.global_state.agent_compute = {agent: 1000 for agent in self.agents} + self.global_state.business_downtime_score = 0.0 + observations = {} + for agent_id in self.agents: + obs = BaseObservation(agent_id) + obs.update_from_state(self.global_state, []) + observations[agent_id] = { + 'obs': obs.to_numpy(max_size=256), + 'action_mask': self.action_mask(agent_id), + } + self.current_tick = 0 + self.event_queue = [] + + return observations, {agent: {} for agent in self.agents} + + def observation_space(self, agent): + return self.observation_spaces[agent] + + def action_space(self, agent): + return self.action_spaces[agent] + + def action_mask(self, agent: str) -> np.ndarray: + """Returns a binary mask denoting valid and distinct action integers for the agent, + pruning out computationally redundant modulo duplicates. + """ + # RLlib explicitly requires MultiDiscrete action masks to be concatenated flat boolean layers. + # Action space: [12 types, 50 IPs]. Therefore Mask shape = (62,) + mask = np.zeros(62, dtype=np.int8) + + # 1. Action Type Dimension (0-11) + if 'red' in agent.lower(): + valid_action_types = 4 if 'commander' in agent.lower() else 9 + else: + valid_action_types = 5 if 'commander' in agent.lower() else 7 + mask[:valid_action_types] = 1 + + # 2. Target IP Dimension (12-61) + target_ips = sorted(list(self.global_state.all_hosts.keys())) + num_targets = min(len(target_ips), 50) + mask[12 : 12 + num_targets] = 1 + + return mask + + def step( + self, agent_actions: Dict[str, int] + ) -> Tuple[ + Dict[str, BaseObservation], + Dict[str, float], + Dict[str, bool], + Dict[str, bool], + Dict[str, dict], + ]: + """ + Simultaneous Step Execution Logic: + + 1. PROCESS NEW ACTIONS: Validate budgets and enqueue async events. + 2. INTERRUPTION LOGIC: Immediate cancel operations for specific defensive tasks. + 3. ADVANCE TIME: `current_tick` progresses by 1. + 4. RESOLVE MATURE EVENTS: Apply ActionEffects that reach `completion_tick`. + 5. OBSERVATION: Agents receive POMDP updates every tick. + """ + intended_effects = {} + + # 1. PROCESS NEW ACTIONS + blue_active_actions_count = sum( + 1 for event in self.event_queue if 'blue' in event['agent'].lower() + ) + + for agent, action_int in agent_actions.items(): + # Validate temporal locks + if self.current_tick < self.global_state.agent_locked_until.get(agent, 0): + continue + + if isinstance(action_int, BaseAction): + action = action_int + else: + target_ips = sorted(list(self.global_state.all_hosts.keys())) + action = action_registry.instantiate_action( + agent, action_int, target_ips + ) + if action is None: + continue # Invalid action/unmapped action bounds + + # SOC Budget Check (Max 2 active defensive actions) + if 'blue' in agent.lower(): + if blue_active_actions_count >= 2: + continue # SOC is busy, silently ignore + blue_active_actions_count += 1 + + # Validate temporal energy constraints + if self.global_state.agent_energy.get(agent, 0) < action.cost: + continue + + # Expend energy and validate state + self.global_state.agent_energy[agent] -= action.cost + + if action.validate(self.global_state): + eta = getattr(action, 'duration', 1) + completion_tick = self.current_tick + eta + + # Generate intended effect (though state might shift by completion time) + effect = action.execute(self.global_state) + + self.global_state.agent_locked_until[agent] = completion_tick + self.event_queue.append( + { + 'completion_tick': completion_tick, + 'agent': agent, + 'action': action, + 'effect': effect, + 'target_ip': getattr(action, 'target_ip', None), + } + ) + + # 2. INTERRUPTION LOGIC (e.g., IsolateHost Immediately Cancels Ongoing Attacks) + for event in list(self.event_queue): + if ( + type(event['action']).__name__ == 'IsolateHost' + and event['completion_tick'] > self.current_tick + ): + # Isolate is queued or starting now; interrupt Red + target_to_isolate = event['target_ip'] + for red_event in list(self.event_queue): + if ( + 'red' in red_event['agent'].lower() + and red_event['target_ip'] == target_to_isolate + ): + if red_event in self.event_queue: + self.event_queue.remove(red_event) + # Unlock Red agent since their attack was disrupted + self.global_state.agent_locked_until[red_event['agent']] = ( + self.current_tick + ) + + # 3. ADVANCE TIME + self.current_tick += 1 + self.global_state.current_tick = self.current_tick + self.global_state.subnet_bandwidth.clear() + + # GENERATE BACKGROUND NOISE & DELAYED ALERTS + noise_data = self.green_agent.generate_noise( + self.current_tick, self.global_state + ) + for anomaly in noise_data.get('alerts', []): + anomaly['arrival_tick'] = self.current_tick + self.log_latency + self.global_state.siem_log_buffer.append(anomaly) + + # 4. RESOLVE MATURE EVENTS + remaining_events = [] + for event in self.event_queue: + if self.current_tick >= event['completion_tick']: + intended_effects[event['agent']] = event['effect'] + else: + remaining_events.append(event) + self.event_queue = remaining_events + + resolved_effects = self.resolution_engine.resolve(intended_effects) + + self._apply_state_deltas(resolved_effects) + + # Generate True Positive telemetry from attacks that hit SIEM + for res_agent, res_effect in resolved_effects.items(): + if 'red' in res_agent and res_effect.success: + target_ip = res_effect.observation_data.get('exploit', 'unknown') + + # Active Deception intercept + host = self.global_state.all_hosts.get(target_ip) + is_honeytoken_trap = host and host.contains_honeytokens + + signature = ( + 'HONEYTOKEN_TRIGGERED' + if is_honeytoken_trap + else 'RED_ACTION_DETECTED' + ) + severity = 10 if is_honeytoken_trap else 5 + log_delay = 0 if is_honeytoken_trap else self.log_latency + + self.global_state.siem_log_buffer.append( + { + 'type': 'anomaly', + 'source': res_agent, + 'target': target_ip, + 'signature': signature, + 'severity': severity, + 'false_positive': False, + 'arrival_tick': self.current_tick + log_delay, + } + ) + + observations = {} + rewards = {} + terminate = self.scenario.check_termination(self.global_state) + + # Trigger dynamic topology mutations mid-episode + if self.current_tick % 40 == 0: + self.global_state.reallocate_dhcp() + + is_truncated = self.current_tick >= self.max_ticks + truncate = {agent: is_truncated for agent in self.agents} + + for agent in self.agents: + obs = BaseObservation(agent) + obs.update_from_state(self.global_state, resolved_effects) + + obs_array = obs.to_numpy(max_size=256) + if 'operator' in agent: + commander_id = agent.replace('operator', 'commander') + if commander_id in agent_actions: + cmd_action = agent_actions[commander_id] + # Normalize the MultiDiscrete action to a float between 0.0 and 1.0 + cmd_val = ( + (float(cmd_action[0]) / 12.0) + if getattr(cmd_action, '__iter__', False) + and not isinstance(cmd_action, BaseAction) + else 1.0 + ) + obs_array[0] = cmd_val + + observations[agent] = { + 'obs': obs_array, + 'action_mask': self.action_mask(agent), + } + # Reward shaping applied here natively factoring in immediate action outcomes + agent_effect = resolved_effects.get(agent) + rewards[agent] = self._calculate_reward( + agent, self.global_state, agent_effect + ) + + self.agents = [ + agent + for agent in self.agents + if not terminate[agent] and not truncate[agent] + ] + + # ── Build info dicts with security metrics for callbacks ── + infos = self._extract_agent_infos(observations, resolved_effects) + + return observations, rewards, terminate, truncate, infos + + def render(self): + """Standard PettingZoo GUI logging render hook.""" + pass + + def _decode_action(self, agent_id: str, action_int: int) -> BaseAction: + target_ips = sorted(list(self.global_state.all_hosts.keys())) + return action_registry.instantiate_action(agent_id, action_int, target_ips) + + def _apply_state_deltas(self, effects: Dict[str, ActionEffect]): + """Applies validated deltas to the GlobalNetworkState. + + Only called AFTER temporal collisions have been mathematically resolved. + """ + for agent_id, effect in effects.items(): + if effect.success: + if isinstance(effect.state_deltas, dict): + for delta_key, delta_val in effect.state_deltas.items(): + self.global_state.apply_delta(delta_key, delta_val) + elif isinstance(effect.state_deltas, list): + for delta_cmd in effect.state_deltas: + self.global_state.apply_delta(delta_cmd) + + def _calculate_reward( + self, agent_id: str, state, effect: ActionEffect = None + ) -> float: + """Delegates reward logic directly to the localized Scenario module.""" + return self.scenario.calculate_reward(agent_id, state, effect) + + def _extract_agent_infos(self, observations: dict, resolved_effects: dict) -> dict: + """Extracts security metrics for TensorBoard and CSV logging callbacks. + + Args: + observations: Dictionary of agent observations for this step. + resolved_effects: Dictionary of resolved action effects. + + Returns: + Dictionary mapping agent_id to an info dictionary with security metrics. + """ + infos = {} + for agent in list(observations.keys()): + agent_effect = resolved_effects.get(agent) + info: dict = {} + + # Count security-relevant events from this step + false_positives = 0 + successful_exploits = 0 + hosts_isolated = 0 + services_restored = 0 + + if agent_effect and agent_effect.success: + for delta_key, delta_val in agent_effect.state_deltas.items(): + if 'status' in delta_key and delta_val == 'isolated': + hosts_isolated += 1 + # Check if the isolated host was actually compromised + parts = delta_key.split('/') + if len(parts) >= 2: + ip = parts[1] + host = self.global_state.all_hosts.get(ip) + if host and host.compromised_by == 'None': + false_positives += 1 # Isolated a clean host + elif 'privilege' in delta_key and delta_val in ('User', 'Root'): + successful_exploits += 1 + elif 'status' in delta_key and delta_val == 'online': + services_restored += 1 + + info['false_positives'] = float(false_positives) + info['successful_exploits'] = float(successful_exploits) + info['hosts_isolated'] = float(hosts_isolated) + info['services_restored'] = float(services_restored) + + # Extra context for analysis + info['agent_energy'] = float(self.global_state.agent_energy.get(agent, 0)) + info['compromised_hosts'] = float( + sum( + 1 + for h in self.global_state.all_hosts.values() + if h.compromised_by != 'None' + ) + ) + info['isolated_hosts'] = float( + sum( + 1 + for h in self.global_state.all_hosts.values() + if h.status == 'isolated' + ) + ) + + infos[agent] = info + + return infos diff --git a/marl_cyborg/environment/pcap_synthesizer.py b/netforge_rl/environment/pcap_synthesizer.py similarity index 99% rename from marl_cyborg/environment/pcap_synthesizer.py rename to netforge_rl/environment/pcap_synthesizer.py index a44a92d..ded8c16 100644 --- a/marl_cyborg/environment/pcap_synthesizer.py +++ b/netforge_rl/environment/pcap_synthesizer.py @@ -22,7 +22,7 @@ class PCAPSynthesizer: - """Translates abstract RL actions (marl_cyborg_v3) into modeled Scapy + """Translates abstract RL actions (netforge_rl_v3) into modeled Scapy packets for offline IDS ML model training. diff --git a/netforge_rl/models/recurrent_mask_model.py b/netforge_rl/models/recurrent_mask_model.py new file mode 100644 index 0000000..5f1b46d --- /dev/null +++ b/netforge_rl/models/recurrent_mask_model.py @@ -0,0 +1,95 @@ +import torch +from torch import nn + +from ray.rllib.models.torch.recurrent_net import RecurrentNetwork as TorchRNN +from ray.rllib.utils.annotations import override +from ray.rllib.utils.typing import ModelConfigDict, TensorType +from typing import List, Tuple + + +class MaskedLSTMModel(TorchRNN, nn.Module): + """ + A custom PyTorch model integrating native RLlib LSTM cells with strict Action Masking. + + We subclass TorchRNN to allow Ray to handle complex `seq_lens` padding and tensor + BPTT dimension tracking natively. We extract the mask out of the flattened array manually. + """ + + def __init__( + self, + obs_space, + action_space, + num_outputs: int, + model_config: ModelConfigDict, + name: str, + ): + nn.Module.__init__(self) + super().__init__(obs_space, action_space, num_outputs, model_config, name) + + self.cell_size = model_config.get('custom_model_config', {}).get( + 'lstm_cell_size', 128 + ) + + # 1. Feature Extractor (Dense Layers) + # Input size is 256 sliced from the flattened 318 Dict space + self.fc1 = nn.Linear(256, 128) + self.fc2 = nn.Linear(128, 128) + + # 2. LSTM Memory Unit + self.lstm = nn.LSTM( + input_size=128, + hidden_size=self.cell_size, + batch_first=True, + ) + + # 3. Action Type & Logit Masking Arrays + self.action_branch = nn.Linear(self.cell_size, num_outputs) + self.value_branch = nn.Linear(self.cell_size, 1) + + self._cur_value = None + + @override(TorchRNN) + def forward_rnn( + self, inputs: TensorType, state: List[TensorType], seq_lens: TensorType + ) -> Tuple[TensorType, List[TensorType]]: + # Ray flatly concatenates spaces in alphanumeric order. + # action_mask Box(62) + # obs Box(256) + # Therefore: action_mask is [:62], obs is [62:] + action_mask = inputs[:, :, :62] + obs = inputs[:, :, 62:] + + # 1. Core Embeddings over Observation Sequence + x = nn.functional.relu(self.fc1(obs)) + x = nn.functional.relu(self.fc2(x)) + + # 2. Evaluate Temporal Memory + h_in, c_in = state[0].unsqueeze(0), state[1].unsqueeze(0) + x, (h_out, c_out) = self.lstm(x, (h_in, c_in)) + + # 3. Finalize Output Logit Distribution Branches + logits = self.action_branch(x) + self._cur_value = torch.reshape(self.value_branch(x), [-1]) + + # 4. Apply Action Mask dynamically over the sequence batch + masked_logits = torch.where( + action_mask == 0.0, + torch.tensor(-1e10, device=logits.device, dtype=logits.dtype), + logits, + ) + + return masked_logits, [h_out.squeeze(0), c_out.squeeze(0)] + + @override(TorchRNN) + def value_function(self) -> TensorType: + assert self._cur_value is not None, ( + 'Evaluate forward_rnn() before value_function() call.' + ) + return self._cur_value + + @override(TorchRNN) + def get_initial_state(self) -> List[TensorType]: + return [ + torch.zeros(self.cell_size, dtype=torch.float32), + torch.zeros(self.cell_size, dtype=torch.float32), + ] diff --git a/marl_cyborg/scenarios/apt_espionage.py b/netforge_rl/scenarios/apt_espionage.py similarity index 94% rename from marl_cyborg/scenarios/apt_espionage.py rename to netforge_rl/scenarios/apt_espionage.py index da2dea1..48ba6a0 100644 --- a/marl_cyborg/scenarios/apt_espionage.py +++ b/netforge_rl/scenarios/apt_espionage.py @@ -1,9 +1,9 @@ from typing import TYPE_CHECKING, Dict -from marl_cyborg.scenarios.base_scenario import BaseScenario +from netforge_rl.scenarios.base_scenario import BaseScenario if TYPE_CHECKING: - from marl_cyborg.core.state import GlobalNetworkState - from marl_cyborg.core.action import ActionEffect + from netforge_rl.core.state import GlobalNetworkState + from netforge_rl.core.action import ActionEffect class AptEspionageScenario(BaseScenario): diff --git a/marl_cyborg/scenarios/base_scenario.py b/netforge_rl/scenarios/base_scenario.py similarity index 88% rename from marl_cyborg/scenarios/base_scenario.py rename to netforge_rl/scenarios/base_scenario.py index fd9ef8e..911d0ac 100644 --- a/marl_cyborg/scenarios/base_scenario.py +++ b/netforge_rl/scenarios/base_scenario.py @@ -2,8 +2,8 @@ from typing import TYPE_CHECKING, Dict if TYPE_CHECKING: - from marl_cyborg.core.state import GlobalNetworkState - from marl_cyborg.core.action import ActionEffect + from netforge_rl.core.state import GlobalNetworkState + from netforge_rl.core.action import ActionEffect class BaseScenario(ABC): diff --git a/marl_cyborg/scenarios/ransomware.py b/netforge_rl/scenarios/ransomware.py similarity index 82% rename from marl_cyborg/scenarios/ransomware.py rename to netforge_rl/scenarios/ransomware.py index 49b907a..a16f436 100644 --- a/marl_cyborg/scenarios/ransomware.py +++ b/netforge_rl/scenarios/ransomware.py @@ -1,9 +1,9 @@ from typing import TYPE_CHECKING, Dict -from marl_cyborg.scenarios.base_scenario import BaseScenario +from netforge_rl.scenarios.base_scenario import BaseScenario if TYPE_CHECKING: - from marl_cyborg.core.state import GlobalNetworkState - from marl_cyborg.core.action import ActionEffect + from netforge_rl.core.state import GlobalNetworkState + from netforge_rl.core.action import ActionEffect class RansomwareScenario(BaseScenario): @@ -72,6 +72,13 @@ def _red_reward( elif 'compromised_by' in delta_key and delta_val != 'None': reward += 2.0 + # OT / SCADA Catastrophic Win + elif ( + 'system_integrity' in delta_key + and delta_val == 'kinetic_destruction' + ): + reward += 10000.0 + # ── Observation-based rewards ───────────────────────── if effect.observation_data: obs = effect.observation_data @@ -135,6 +142,13 @@ def _blue_reward( elif 'decoy' in delta_key and delta_val != 'inactive': reward += 1.0 # Proactive defense + # Failed to prevent Kinetic Destruction of SCADA framework + elif ( + 'system_integrity' in delta_key + and delta_val == 'kinetic_destruction' + ): + reward -= 10000.0 + # ── Per-step network health (small, bounded) ────────── # Only count the RATIO of healthy hosts to avoid scale issues total_hosts = max(len(global_state.all_hosts), 1) @@ -155,6 +169,17 @@ def _blue_reward( if compromised > 0: reward -= (compromised / total_hosts) * 2.0 # 0.0 to -2.0 + # ── Business Downtime Penalty (Economics) ───────────── + total_isolated = sum( + 1 + for h in global_state.all_hosts.values() + if h.status == 'isolated' and h.subnet_cidr != '169.254.0.0/16' + ) + if total_isolated > 0: + business_loss = (total_isolated / total_hosts) * 5.0 + global_state.business_downtime_score += business_loss + reward -= business_loss + # ── Failed action penalty ───────────────────────────── if effect and not effect.success: reward -= 0.1 @@ -177,4 +202,11 @@ def check_termination(self, global_state: 'GlobalNetworkState') -> Dict[str, boo ): return {agent: True for agent in self.agents} + # Terminal Fail-State for Blue: A PLC has been physically destroyed + if any( + getattr(h, 'system_integrity', 'clean') == 'kinetic_destruction' + for h in global_state.all_hosts.values() + ): + return {agent: True for agent in self.agents} + return terminate diff --git a/netforge_rl/topologies/network_generator.py b/netforge_rl/topologies/network_generator.py new file mode 100644 index 0000000..03f3010 --- /dev/null +++ b/netforge_rl/topologies/network_generator.py @@ -0,0 +1,180 @@ +import random +import yaml +from pathlib import Path +from typing import Optional +from netforge_rl.core.state import GlobalNetworkState, Subnet, Host + + +class NetworkGenerator: + """Procedurally generates or loads dynamic network topologies for MARL + + training. + + Prevents agents from overfitting to a static 10-node architecture. + """ + + def __init__(self, config_path: Optional[str] = None): + self.config_path = config_path + + def generate(self, seed: Optional[int] = None) -> GlobalNetworkState: + """Generates the architecture. + + If a config path was provided, loads deterministically. + Otherwise, procedurally generates a randomized topology. + """ + if seed is not None: + random.seed(seed) + + if self.config_path and Path(self.config_path).exists(): + return self._load_from_yaml(self.config_path) + + return self._generate_procedural() + + def _generate_procedural(self) -> GlobalNetworkState: + """Creates a randomized network using NetworkX hierarchical patterns. + + Enforces a constant size of 50 hosts for Neural Network dimension consistency. + Active topology spans 15-30 nodes; the rest are instantiated as inactive padding. + """ + import networkx as nx + + state = GlobalNetworkState() + G = nx.DiGraph() + + # Generate hierarchy parameters + num_subnets = random.randint(3, 4) + subnet_names = ['DMZ', 'Corporate', 'Secure', 'Guest'][:num_subnets] + base_ips = ['192.168.1', '10.0.0', '10.0.1', '172.16.0'][:num_subnets] + + # 25% Chance to spawn a critical Cyber-Physical OT Subnet + if random.random() < 0.25: + subnet_names.append('OT_Subnet') + base_ips.append('10.0.99') + + active_hosts = [] + domain_controllers = [] + + # Build Subnets and distribute hosts + for i, name in enumerate(subnet_names): + cidr = f'{base_ips[i]}.0/24' + subnet = Subnet(cidr=cidr, name=name) + state.add_subnet(subnet) + + # Weight more hosts into Corp and Secure zones + num_hosts = ( + random.randint(3, 8) + if name in ['Corporate', 'Secure'] + else random.randint(2, 5) + ) + + for j in range(1, num_hosts + 1): + host_ip = f'{base_ips[i]}.{j * random.randint(1, 3)}' + + # Check for duplicates due to random gap intervals + while host_ip in [h.ip for h in active_hosts]: + host_ip = f'{base_ips[i]}.{j * random.randint(1, 10)}' + + host = Host(ip=host_ip, hostname=f'{name}_Node_{j}', subnet_cidr=cidr) + + # Assign Decoys vs Real Systems + if random.random() < 0.15 and name != 'OT_Subnet': + host.decoy = random.choice(['Apache', 'SSHD', 'Tomcat', 'active']) + else: + if name == 'OT_Subnet': + chosen_os = 'PLC_Firmware' + chosen_services = ['Modbus', 'S7Comm'] + potential_cves = ['CVE-2010-2772', 'Stuxnet_0day'] + setattr(host, 'temperature', float(random.randint(40, 60))) + setattr(host, 'pressure', float(random.randint(90, 110))) + else: + profiles = [ + ( + 'Windows_Server_2016', + ['SMB', 'IIS'], + ['MS17-010', 'CVE-2021-44228'], + ), + ( + 'Windows_10', + ['RDP', 'SMB'], + ['CVE-2019-0708', 'MS17-010'], + ), + ( + 'Linux_Ubuntu', + ['SSH', 'Apache'], + ['CVE-2021-44228', 'V4L2'], + ), + ('Linux_CentOS', ['SSH', 'Tomcat'], ['CVE-2021-44228']), + ] + chosen_os, chosen_services, potential_cves = random.choice( + profiles + ) + + host.os = chosen_os + host.services = chosen_services + host.cvss_score = round(random.uniform(3.5, 9.8), 1) + + # Human error dynamics: Linux admins fall for phishing less often than generalized Windows Corporate users + base_phish = ( + random.uniform(0.1, 0.4) + if 'Linux' in chosen_os + else random.uniform(0.3, 0.9) + ) + host.human_vulnerability_score = round(base_phish, 2) + + num_vulns = random.randint(0, min(2, len(potential_cves))) + host.vulnerabilities = random.sample(potential_cves, num_vulns) + + # Designate Domain Controllers only in Corp or Secure Windows servers + if 'Windows' in chosen_os and name in ['Corporate', 'Secure']: + if random.random() < 0.3: + domain_controllers.append(host) + + active_hosts.append(host) + state.register_host(host) + G.add_node(host.ip, type=name) + + # Assure at least 1 Domain Controller exists + if domain_controllers: + random.choice(domain_controllers).is_domain_controller = True + else: + # Force upgrade a random Windows host + win_hosts = [h for h in active_hosts if 'Windows' in h.os] + if win_hosts: + random.choice(win_hosts).is_domain_controller = True + + # Fill strictly to 50 nodes for Neural Network shape constant + padding_needed = 50 - len(state.all_hosts) + for p in range(padding_needed): + pad_ip = f'169.254.0.{p + 1}' + pad_host = Host( + ip=pad_ip, hostname=f'Pad_Node_{p}', subnet_cidr='169.254.0.0/16' + ) + pad_host.status = 'isolated' # Native Action Masking bounds + state.register_host(pad_host) + + self._configure_procedural_vision(state) + return state + + def _configure_procedural_vision(self, state: GlobalNetworkState): + """Builds fog-of-war vision depending on the layout.""" + # Red baseline starts in DMZ + for host in state.all_hosts.values(): + if host.subnet_cidr == '192.168.1.0/24' and host.status != 'isolated': + state.update_knowledge('red_commander', host.ip) + state.update_knowledge('red_operator', host.ip) + break + + # Blue knows all active topology natively but is blind to zero-padded isolated objects + for host in state.all_hosts.values(): + if host.status != 'isolated': + state.update_knowledge('blue_commander', host.ip) + state.update_knowledge('blue_operator', host.ip) + + def _load_from_yaml(self, path: str) -> GlobalNetworkState: + """Loads a deterministic graph from a YAML configuration.""" + with open(path, 'r') as f: + _ = yaml.safe_load(f) + + # Implementation left for future expansion if YAML is required. + # Defaults to procedural if parsing fails. + return self._generate_procedural() diff --git a/pyproject.toml b/pyproject.toml index 92b9245..3332c22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,7 @@ requires = ["setuptools>=61.0.0", "wheel"] build-backend = "setuptools.build_meta" [project] -name = "marl_cyborg" +name = "netforge_rl" version = "3.0.0" description = "Multi-Agent Cybersecurity Simulator based on CybORG" authors = [