diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5f6cbdf..4020ca0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,3 +32,7 @@ jobs: run: | sudo apt-get update && sudo apt-get install -y shellcheck find scripts -name "*.sh" -print0 | xargs -0 -I{} shellcheck {} + - name: Allowlist dry-run + run: | + python scripts/resolve_allowlist.py --dry-run --output build/generated/lockdown_allowlist.nft + test -s build/generated/lockdown_allowlist.nft diff --git a/.gitignore b/.gitignore index 6a030bf..ffc750c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *.log __pycache__/ dist/ +build/ diff --git a/README.md b/README.md index 34258be..d127cc1 100644 --- a/README.md +++ b/README.md @@ -63,10 +63,23 @@ These principles converge in Azazel’s design: **defense is not about passive p OpenCanary等を利用し、攻撃者を観察ではなく誘導・拘束。正規ユーザーには影響を与えずに隔離。 *Leverages tools like OpenCanary to mislead and isolate attackers—not merely observe them—without affecting legitimate users.* -- **可搬型設計 / Portable Deployment** - 軽量構成でRaspberry Piに最適化。災害対応や一時的な現場展開にも対応。 +- **可搬型設計 / Portable Deployment** + 軽量構成でRaspberry Piに最適化。災害対応や一時的な現場展開にも対応。 *Lightweight and optimized for Raspberry Pi, enabling easy deployment in disaster recovery or temporary field operations.* +## What's new + +- Mode-aware presets backed by `azazel.yaml` apply delay/shape/block actions as + the daemon transitions between portal, shield, and lockdown. +- Vector remap normalization now emits a unified schema (ts/node/src/dst/proto + fields) validated via unit tests. +- QoS plans derive HTB class rate/ceil values per profile using + `configs/tc/classes.htb`. +- Lockdown tooling ships with a templated nftables ruleset and an + `resolve_allowlist.py` utility that resolves medical FQDNs into CIDRs. +- CI enforces schema validation, pytest, shellcheck, and allowlist generation + to ensure release tags remain deployable on clean systems. + --- ## 使用技術 / Stack diff --git a/azazel_core/__init__.py b/azazel_core/__init__.py index 1dfd874..a09abd1 100644 --- a/azazel_core/__init__.py +++ b/azazel_core/__init__.py @@ -1,10 +1,11 @@ """Core modules for the Azazel SOC/NOC controller.""" -from .state_machine import StateMachine, State, Transition +from .state_machine import Event, StateMachine, State, Transition from .scorer import ScoreEvaluator from .config import AzazelConfig __all__ = [ + "Event", "StateMachine", "State", "Transition", diff --git a/azazel_core/qos/apply.py b/azazel_core/qos/apply.py index 4d35299..f2aea0c 100644 --- a/azazel_core/qos/apply.py +++ b/azazel_core/qos/apply.py @@ -1,34 +1,84 @@ """Render QoS classifier results to actionable plans.""" from __future__ import annotations +import re from dataclasses import dataclass -from typing import Dict, Iterable, List +from pathlib import Path +from typing import Any, Dict, List -from ..actions import ActionResult + +CLASS_PATTERN = re.compile( + r"^class\s+(?P[A-Za-z0-9_-]+)\s+prio\s+(?P\d+)\s+share\s+(?P[0-9.]+)" +) + + +@dataclass(frozen=True) +class HTBClass: + """Represents a class definition from classes.htb.""" + + name: str + priority: int + share: float + + +def _parse_classes(path: str | Path) -> List[HTBClass]: + classes: List[HTBClass] = [] + for line in Path(path).read_text().splitlines(): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + match = CLASS_PATTERN.match(stripped) + if match: + classes.append( + HTBClass( + name=match.group("name"), + priority=int(match.group("priority")), + share=float(match.group("share")), + ) + ) + if not classes: + raise ValueError(f"No HTB classes defined in {path}") + return classes @dataclass class QoSPlan: - """Container for actions derived from QoS policy.""" + """Container for tc class calculations.""" - commands: List[ActionResult] + profile: str + uplink_kbps: int + classes: Dict[str, Dict[str, int]] @classmethod - def from_matches(cls, matches: Iterable[str]) -> "QoSPlan": - commands: List[ActionResult] = [] - for match in matches: - commands.append( - ActionResult( - command="tc class add", - parameters={"class": match}, - ) - ) - return cls(commands=commands) + def from_profile( + cls, + profiles: Dict[str, Dict[str, Any]], + profile_name: str, + classes_path: str | Path, + ) -> "QoSPlan": + try: + profile = profiles[profile_name] + except KeyError as exc: # pragma: no cover - defensive guard + raise KeyError(f"Unknown profile: {profile_name}") from exc + + uplink = int(profile.get("uplink_kbps", 0) or 0) + if uplink <= 0: + raise ValueError(f"Profile {profile_name} must define uplink_kbps > 0") + + classes = _parse_classes(classes_path) + plan: Dict[str, Dict[str, int]] = {} + for entry in classes: + rate = max(1, int(round(uplink * (entry.share / 100.0)))) + plan[entry.name] = { + "priority": entry.priority, + "rate_kbps": rate, + "ceil_kbps": uplink, + } + return cls(profile=profile_name, uplink_kbps=uplink, classes=plan) - def as_dict(self) -> Dict[str, List[Dict[str, str]]]: + def as_dict(self) -> Dict[str, Any]: return { - "commands": [ - {"command": result.command, **result.parameters} - for result in self.commands - ] + "profile": self.profile, + "uplink_kbps": self.uplink_kbps, + "classes": self.classes, } diff --git a/azazel_core/state_machine.py b/azazel_core/state_machine.py index 1925fcc..6b06c3c 100644 --- a/azazel_core/state_machine.py +++ b/azazel_core/state_machine.py @@ -1,8 +1,16 @@ """Light-weight state machine driving Azazel defensive posture changes.""" from __future__ import annotations +import time +from collections import deque from dataclasses import dataclass, field -from typing import Callable, Dict, List, Optional +from pathlib import Path +from typing import Any, Callable, Deque, Dict, List, Optional + +import yaml + + +CONFIG_PATH = Path(__file__).resolve().parents[1] / "configs" / "azazel.yaml" @dataclass(frozen=True) @@ -33,10 +41,13 @@ class Transition: @dataclass class StateMachine: - """Simple but testable state machine implementation.""" + """Mode-aware state machine with YAML-backed presets.""" initial_state: State transitions: List[Transition] = field(default_factory=list) + config_path: str | Path | None = None + window_size: int = 5 + clock: Callable[[], float] = field(default=time.monotonic, repr=False) current_state: State = field(init=False) def __post_init__(self) -> None: @@ -44,7 +55,13 @@ def __post_init__(self) -> None: self._transition_map: Dict[str, List[Transition]] = {} for transition in self.transitions: self.add_transition(transition) + self._config_cache: Dict[str, Any] | None = None + self._score_window: Deque[int] = deque(maxlen=max(self.window_size, 1)) + self._unlock_until: Dict[str, float] = {} + # ------------------------------------------------------------------ + # Transition helpers + # ------------------------------------------------------------------ def add_transition(self, transition: Transition) -> None: """Register a new transition.""" @@ -58,6 +75,7 @@ def dispatch(self, event: Event) -> State: if transition.condition(event): previous = self.current_state self.current_state = transition.target + self._handle_transition(previous, self.current_state) if transition.action: transition.action(previous, self.current_state, event) return self.current_state @@ -67,6 +85,8 @@ def reset(self) -> None: """Reset the state machine to its initial state.""" self.current_state = self.initial_state + self._score_window.clear() + self._unlock_until.clear() def summary(self) -> Dict[str, str]: """Return a serializable summary of the state machine.""" @@ -75,3 +95,129 @@ def summary(self) -> Dict[str, str]: "state": self.current_state.name, "description": self.current_state.description, } + + # ------------------------------------------------------------------ + # Configuration helpers + # ------------------------------------------------------------------ + def _resolve_config_path(self) -> Path: + if self.config_path is not None: + return Path(self.config_path) + return CONFIG_PATH + + def _load_config(self) -> Dict[str, Any]: + if self._config_cache is None: + path = self._resolve_config_path() + data = yaml.safe_load(path.read_text()) + if not isinstance(data, dict): + raise ValueError("Configuration root must be a mapping") + self._config_cache = data + return self._config_cache + + def reload_config(self) -> None: + """Force re-reading of the YAML configuration.""" + + self._config_cache = None + + def get_thresholds(self) -> Dict[str, Any]: + """Return shield/lockdown thresholds and unlock windows.""" + + config = self._load_config() + thresholds = config.get("thresholds", {}) + unlock = thresholds.get("unlock_wait_secs", {}) + return { + "t1": int(thresholds.get("t1_shield", 0) or 0), + "t2": int(thresholds.get("t2_lockdown", 0) or 0), + "unlock_wait_secs": { + "shield": int(unlock.get("shield", 0) or 0), + "portal": int(unlock.get("portal", 0) or 0), + }, + } + + def get_actions_preset(self) -> Dict[str, Any]: + """Return the action plan preset for the current mode.""" + + config = self._load_config() + actions = config.get("actions", {}) + preset = actions.get(self.current_state.name, {}) + shape = preset.get("shape_kbps") + return { + "delay_ms": int(preset.get("delay_ms", 0) or 0), + "shape_kbps": int(shape) if shape not in (None, "", False) else None, + "block": bool(preset.get("block", False)), + } + + # ------------------------------------------------------------------ + # Score window evaluation + # ------------------------------------------------------------------ + def evaluate_window(self, severity: int) -> Dict[str, Any]: + """Append a severity score and compute moving average decisions.""" + + self._score_window.append(max(int(severity), 0)) + average = sum(self._score_window) / len(self._score_window) + thresholds = self.get_thresholds() + desired_mode = "portal" + if average >= thresholds["t2"]: + desired_mode = "lockdown" + elif average >= thresholds["t1"]: + desired_mode = "shield" + return {"average": average, "desired_mode": desired_mode} + + def apply_score(self, severity: int) -> Dict[str, Any]: + """Evaluate the score window and transition to the appropriate mode.""" + + evaluation = self.evaluate_window(severity) + desired_mode = evaluation["desired_mode"] + now = self.clock() + target_mode = desired_mode + if desired_mode == "portal": + target_mode = self._target_for_portal(now) + elif desired_mode == "shield": + target_mode = self._target_for_shield(now) + + if target_mode != self.current_state.name: + self.dispatch(Event(name=target_mode, severity=severity)) + + evaluation.update({ + "target_mode": target_mode, + "applied_mode": self.current_state.name, + }) + return evaluation + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + def _handle_transition(self, previous: State, current: State) -> None: + thresholds = self.get_thresholds() + unlocks = thresholds.get("unlock_wait_secs", {}) + now = self.clock() + if current.name == "lockdown": + wait_shield = unlocks.get("shield", 0) + if wait_shield: + self._unlock_until["shield"] = now + wait_shield + elif current.name == "shield": + wait_portal = unlocks.get("portal", 0) + if wait_portal: + self._unlock_until["portal"] = now + wait_portal + self._unlock_until.pop("shield", None) + elif current.name == "portal": + self._unlock_until.clear() + + def _target_for_shield(self, now: float) -> str: + if self.current_state.name == "lockdown": + unlock_at = self._unlock_until.get("shield", 0.0) + if now < unlock_at: + return "lockdown" + return "shield" + + def _target_for_portal(self, now: float) -> str: + if self.current_state.name == "lockdown": + unlock_at = self._unlock_until.get("shield", 0.0) + if now < unlock_at: + return "lockdown" + # Step-down path: lockdown -> shield before portal. + return "shield" + if self.current_state.name == "shield": + unlock_at = self._unlock_until.get("portal", 0.0) + if now < unlock_at: + return "shield" + return "portal" diff --git a/azctl/cli.py b/azctl/cli.py index 9309c66..9ab98af 100644 --- a/azctl/cli.py +++ b/azctl/cli.py @@ -11,22 +11,51 @@ def build_machine() -> StateMachine: - idle = State(name="idle", description="Nominal operations") + portal = State(name="portal", description="Nominal operations") shield = State(name="shield", description="Heightened monitoring") + lockdown = State(name="lockdown", description="Full containment mode") - machine = StateMachine(initial_state=idle) + machine = StateMachine(initial_state=portal) machine.add_transition( Transition( - source=idle, + source=portal, target=shield, - condition=lambda event: event.name == "escalate", + condition=lambda event: event.name == "shield", + ) + ) + machine.add_transition( + Transition( + source=portal, + target=lockdown, + condition=lambda event: event.name == "lockdown", + ) + ) + machine.add_transition( + Transition( + source=shield, + target=portal, + condition=lambda event: event.name == "portal", ) ) machine.add_transition( Transition( source=shield, - target=idle, - condition=lambda event: event.name == "recover", + target=lockdown, + condition=lambda event: event.name == "lockdown", + ) + ) + machine.add_transition( + Transition( + source=lockdown, + target=shield, + condition=lambda event: event.name == "shield", + ) + ) + machine.add_transition( + Transition( + source=lockdown, + target=portal, + condition=lambda event: event.name == "portal", ) ) return machine diff --git a/azctl/daemon.py b/azctl/daemon.py index 0269210..f3d2784 100644 --- a/azctl/daemon.py +++ b/azctl/daemon.py @@ -1,8 +1,10 @@ """Runtime daemon glue for Azazel.""" from __future__ import annotations -from dataclasses import dataclass -from typing import Iterable +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Iterable, List from azazel_core import ScoreEvaluator, StateMachine from azazel_core.state_machine import Event @@ -12,11 +14,37 @@ class AzazelDaemon: machine: StateMachine scorer: ScoreEvaluator + decisions_log: Path = field(default_factory=lambda: Path("decisions.log")) def process_events(self, events: Iterable[Event]) -> None: - score = self.scorer.evaluate(events) - classification = self.scorer.classify(score) - if classification in {"elevated", "critical"}: - self.machine.dispatch(Event(name="escalate", severity=score)) - else: - self.machine.dispatch(Event(name="recover", severity=0)) + entries: List[dict] = [] + for event in events: + score = self.scorer.evaluate([event]) + classification = self.scorer.classify(score) + evaluation = self.machine.apply_score(score) + actions = self.machine.get_actions_preset() + entries.append( + { + "event": event.name, + "score": score, + "classification": classification, + "average": evaluation["average"], + "desired_mode": evaluation["desired_mode"], + "target_mode": evaluation["target_mode"], + "mode": evaluation["applied_mode"], + "actions": actions, + } + ) + + if entries: + self._append_decisions(entries) + + # ------------------------------------------------------------------ + # Persistence helpers + # ------------------------------------------------------------------ + def _append_decisions(self, entries: List[dict]) -> None: + self.decisions_log.parent.mkdir(parents=True, exist_ok=True) + with self.decisions_log.open("a", encoding="utf-8") as handle: + for entry in entries: + handle.write(json.dumps(entry, sort_keys=True)) + handle.write("\n") diff --git a/configs/azazel.schema.json b/configs/azazel.schema.json index c2e591a..7f14857 100644 --- a/configs/azazel.schema.json +++ b/configs/azazel.schema.json @@ -32,11 +32,41 @@ }, "soc": { "type": "object", - "required": ["suricata_ruleset", "canary_services"] + "required": ["suricata_ruleset", "canary_services"], + "properties": { + "suricata_ruleset": {"type": "string"}, + "canary_services": { + "type": "array", + "items": {"type": "string"} + } + } + }, + "actions": { + "type": "object", + "required": ["portal", "shield", "lockdown"], + "additionalProperties": false, + "properties": { + "portal": {"$ref": "#/definitions/actionPreset"}, + "shield": {"$ref": "#/definitions/actionPreset"}, + "lockdown": {"$ref": "#/definitions/actionPreset"} + } }, "thresholds": { "type": "object", - "required": ["t1_shield", "t2_lockdown", "unlock_wait_secs"] + "required": ["t1_shield", "t2_lockdown", "unlock_wait_secs"], + "additionalProperties": false, + "properties": { + "t1_shield": {"type": "integer", "minimum": 0}, + "t2_lockdown": {"type": "integer", "minimum": 0}, + "unlock_wait_secs": { + "type": "object", + "required": ["shield", "portal"], + "properties": { + "shield": {"type": "integer", "minimum": 0}, + "portal": {"type": "integer", "minimum": 0} + } + } + } }, "notify": { "type": "object", @@ -50,5 +80,68 @@ "type": "object", "required": ["pii_minimize"] } + }, + "definitions": { + "actionPreset": { + "type": "object", + "required": ["delay_ms", "shape_kbps", "block"], + "additionalProperties": false, + "properties": { + "delay_ms": {"type": "integer", "minimum": 0}, + "shape_kbps": {"type": ["integer", "null"], "minimum": 0}, + "block": {"type": "boolean"} + } + }, + "normalizedEvent": { + "type": "object", + "required": [ + "ts", + "node", + "event", + "src", + "dst", + "proto", + "sig_id", + "score", + "severity", + "actions", + "mode", + "qos_class", + "evidence_ref" + ], + "additionalProperties": false, + "properties": { + "ts": {"type": "string"}, + "node": {"type": ["string", "null"]}, + "event": {"type": ["string", "null"]}, + "src": { + "type": "object", + "required": ["ip", "port"], + "properties": { + "ip": {"type": ["string", "null"]}, + "port": {"type": ["integer", "null"]} + } + }, + "dst": { + "type": "object", + "required": ["ip", "port"], + "properties": { + "ip": {"type": ["string", "null"]}, + "port": {"type": ["integer", "null"]} + } + }, + "proto": {"type": ["string", "null"]}, + "sig_id": {"type": ["integer", "null"]}, + "score": {"type": ["integer", "null"]}, + "severity": {"type": ["string", "null"]}, + "actions": { + "type": "array", + "items": {"type": "string"} + }, + "mode": {"type": ["string", "null"]}, + "qos_class": {"type": ["string", "null"]}, + "evidence_ref": {"type": ["string", "null"]} + } + } } } diff --git a/configs/azazel.yaml b/configs/azazel.yaml index 86ed260..0cad5e8 100644 --- a/configs/azazel.yaml +++ b/configs/azazel.yaml @@ -12,6 +12,10 @@ qos: soc: suricata_ruleset: balanced canary_services: ["ssh", "http", "pgsql"] +actions: + portal: { delay_ms: 100, shape_kbps: null, block: false } + shield: { delay_ms: 200, shape_kbps: 128, block: false } + lockdown: { delay_ms: 300, shape_kbps: 64, block: true } thresholds: t1_shield: 50 t2_lockdown: 80 diff --git a/configs/nftables/lockdown.nft b/configs/nftables/lockdown.nft index 7cf4370..c665cc2 100644 --- a/configs/nftables/lockdown.nft +++ b/configs/nftables/lockdown.nft @@ -1,7 +1,17 @@ +# Azazel lockdown template +# +# Load the generated allowlist and drop all other outbound traffic when +# lockdown mode is active. The allowlist should define the sets +# `medical_allow_v4` and `medical_allow_v6` via resolve_allowlist.py. + table inet azazel_lockdown { - chain input { - type filter hook input priority 0; - policy drop; - ip saddr 192.168.0.0/16 accept - } + include "/build/generated/lockdown_allowlist.nft" + + chain lockdown { + type filter hook forward priority 0; + policy drop; + + ip saddr @medical_allow_v4 accept + ip6 saddr @medical_allow_v6 accept + } } diff --git a/configs/tc/classes.htb b/configs/tc/classes.htb index 95a70db..972924d 100644 --- a/configs/tc/classes.htb +++ b/configs/tc/classes.htb @@ -1,5 +1,14 @@ -# HTB classes for Azazel QoS -class htb 1:1 root rate 50mbit ceil 50mbit -class htb 1:10 parent 1:1 rate 1mbit ceil 5mbit prio 0 -class htb 1:20 parent 1:1 rate 512kbit ceil 2mbit prio 1 -class htb 1:30 parent 1:1 rate 256kbit ceil 1mbit prio 2 +# Azazel HTB class design template +# +# Syntax: class prio share +# The share column represents the minimum guaranteed percentage +# of the configured profile uplink_kbps. +class medical prio 1 share 40 +class ops prio 2 share 25 +class public prio 3 share 25 +class suspect prio 4 share 10 + +# Profile guidance (rate/ceil are computed automatically by azazel_core.qos.apply): +# - sat: uplink ≈ 2000 kbps -> medical rate ≈ 800 kbps, ceil 2000 kbps +# - lte: uplink ≈ 5000 kbps -> medical rate ≈ 2000 kbps, ceil 5000 kbps +# - fiber: uplink ≈ 50000 kbps -> medical rate ≈ 20000 kbps, ceil 50000 kbps diff --git a/configs/vector/vector.toml b/configs/vector/vector.toml index 28f71fc..3919401 100644 --- a/configs/vector/vector.toml +++ b/configs/vector/vector.toml @@ -3,7 +3,129 @@ type = "file" include = ["/var/log/azazel/*.log"] ignore_older_secs = 86400 +[transforms.norm] +type = "remap" +inputs = ["azazel"] +drop_on_error = false +source = ''' +normalized = {} +normalized.ts = format_timestamp!(now(), format: "%+") +if exists(.timestamp) { + normalized.ts = to_string!(.timestamp) +} else if exists(.time) { + normalized.ts = to_string!(.time) +} + +normalized.node = null +if exists(.host) { + normalized.node = to_string!(.host) +} else if exists(.agent.hostname) { + normalized.node = to_string!(.agent.hostname) +} else if exists(.node) { + normalized.node = to_string!(.node) +} + +normalized.event = null +if exists(.event_type) { + normalized.event = to_string!(.event_type) +} else if exists(.alert.signature) { + normalized.event = to_string!(.alert.signature) +} else if exists(.service) { + normalized.event = to_string!(.service) +} else if exists(.message) { + normalized.event = to_string!(.message) +} + +normalized.src = {"ip": null, "port": null} +if exists(.src_ip) { + normalized.src.ip = to_string!(.src_ip) +} else if exists(.src.ip) { + normalized.src.ip = to_string!(.src.ip) +} else if exists(.source.address) { + normalized.src.ip = to_string!(.source.address) +} + +if exists(.src_port) { + normalized.src.port = to_int!(.src_port) +} else if exists(.src.port) { + normalized.src.port = to_int!(.src.port) +} else if exists(.source.port) { + normalized.src.port = to_int!(.source.port) +} + +normalized.dst = {"ip": null, "port": null} +if exists(.dest_ip) { + normalized.dst.ip = to_string!(.dest_ip) +} else if exists(.dest.ip) { + normalized.dst.ip = to_string!(.dest.ip) +} else if exists(.destination.address) { + normalized.dst.ip = to_string!(.destination.address) +} + +if exists(.dest_port) { + normalized.dst.port = to_int!(.dest_port) +} else if exists(.dest.port) { + normalized.dst.port = to_int!(.dest.port) +} else if exists(.destination.port) { + normalized.dst.port = to_int!(.destination.port) +} + +normalized.proto = null +if exists(.proto) { + normalized.proto = to_string!(.proto) +} else if exists(.protocol) { + normalized.proto = to_string!(.protocol) +} else if exists(.transport) { + normalized.proto = to_string!(.transport) +} + +normalized.sig_id = null +if exists(.alert.signature_id) { + normalized.sig_id = to_int!(.alert.signature_id) +} else if exists(.signature_id) { + normalized.sig_id = to_int!(.signature_id) +} + +normalized.score = null +if exists(.score) { + normalized.score = to_int!(.score) +} + +normalized.severity = null +if exists(.severity) { + normalized.severity = to_string!(.severity) +} else if exists(.alert.severity) { + normalized.severity = to_string!(.alert.severity) +} + +normalized.actions = [] +if exists(.actions) && is_array(.actions) { + normalized.actions = map(.actions, |value| to_string!(value)) +} + +normalized.mode = null +if exists(.mode) { + normalized.mode = to_string!(.mode) +} + +normalized.qos_class = null +if exists(.qos_class) { + normalized.qos_class = to_string!(.qos_class) +} + +normalized.evidence_ref = null +if exists(.evidence_ref) { + normalized.evidence_ref = to_string!(.evidence_ref) +} else if exists(.file) { + normalized.evidence_ref = to_string!(.file) +} else if exists(.logfile) { + normalized.evidence_ref = to_string!(.logfile) +} + +. = normalized +''' + [sinks.console] type = "console" -inputs = ["azazel"] +inputs = ["norm"] encoding.codec = "json" diff --git a/docs/API_REFERENCE.md b/docs/API_REFERENCE.md index a5f6e83..b8499b6 100644 --- a/docs/API_REFERENCE.md +++ b/docs/API_REFERENCE.md @@ -9,11 +9,17 @@ the behaviour during testing. - `State(name: str, description: str = "")` - `Event(name: str, severity: int = 0)` - `Transition(source, target, condition, action=None)` -- `StateMachine(initial_state)` provides: +- `StateMachine(initial_state, config_path=None, window_size=5)` provides: - `add_transition(transition)` – register a new transition. - `dispatch(event)` – evaluate transitions from the current state. - - `reset()` – return to the initial state. + - `reset()` – return to the initial state and clear score history. - `summary()` – dictionary suitable for API responses. + - `get_thresholds()` – read shield/lockdown thresholds and unlock timers + from `azazel.yaml`. + - `get_actions_preset()` – fetch the delay/shape/block preset for the + current mode. + - `apply_score(severity)` – update the moving-average score window, + transition to the correct mode, and return evaluation metadata. ## `azazel_core.scorer` @@ -39,9 +45,10 @@ handler `add_health_route(version)` returns a `HealthResponse` dataclass. ## `azctl.cli` -`build_machine()` wires the idle and shield states. `load_events(path)` loads -YAML describing synthetic events. `main(argv)` powers the systemd service by -feeding events into `AzazelDaemon`, which applies score-based decisions. +`build_machine()` wires the portal/shield/lockdown states. `load_events(path)` +loads YAML describing synthetic events. `main(argv)` powers the systemd service +by feeding events into `AzazelDaemon`, which applies score-based decisions and +writes `decisions.log` entries containing the chosen mode and action presets. ## Scripts @@ -49,3 +56,5 @@ feeding events into `AzazelDaemon`, which applies score-based decisions. - `scripts/nft_apply.sh` and `scripts/tc_reset.sh` manage enforcement tools. - `scripts/sanity_check.sh` prints warnings if dependent services are inactive. - `scripts/rollback.sh` removes installed assets. +- `scripts/resolve_allowlist.py` resolves medical FQDNs to CIDRs and writes the + lockdown nftables allowlist used by the generated template. diff --git a/docs/OPERATIONS.md b/docs/OPERATIONS.md index 8d4b2f4..8289865 100644 --- a/docs/OPERATIONS.md +++ b/docs/OPERATIONS.md @@ -43,6 +43,23 @@ into `/etc/azazel`, installs systemd units, and enables the aggregate ``` 3. Reload services: `sudo systemctl restart azctl.target`. +### Mode presets + +The controller maintains three defensive modes. Each mode applies a preset of +delay, traffic shaping, and block behaviour sourced from `azazel.yaml`: + +| Mode | Delay (ms) | Shape (kbps) | Block | +|----------|-----------:|-------------:|:-----:| +| portal | 100 | – | No | +| shield | 200 | 128 | No | +| lockdown | 300 | 64 | Yes | + +Transitions to stricter modes occur when the moving average of recent scores +exceeds the configured thresholds. Unlock timers enforce a cooling-off period +before the daemon steps down to a less restrictive mode. When lockdown is +entered in the field the supervising `azctl/daemon` should apply +`nft -f configs/nftables/lockdown.nft` after updating the generated allowlist. + ## 4. Health checks Use `scripts/sanity_check.sh` to confirm Suricata, Vector, and OpenCanary are diff --git a/scripts/resolve_allowlist.py b/scripts/resolve_allowlist.py new file mode 100755 index 0000000..c4957d9 --- /dev/null +++ b/scripts/resolve_allowlist.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +"""Resolve medical FQDN allowlist entries into nftables CIDRs.""" +from __future__ import annotations + +import argparse +import ipaddress +import socket +from pathlib import Path +from typing import Iterable, List, Set + +import yaml + + +DEFAULT_CONFIG = Path("configs/azazel.yaml") +DEFAULT_OUTPUT = Path("build/generated/lockdown_allowlist.nft") + + +def load_config(path: Path) -> dict: + data = yaml.safe_load(path.read_text()) + if not isinstance(data, dict): # pragma: no cover - defensive guard + raise ValueError("Configuration root must be a mapping") + return data + + +def resolve_fqdns(fqdns: Iterable[str]) -> Set[ipaddress._BaseAddress]: + results: Set[ipaddress._BaseAddress] = set() + for fqdn in fqdns: + try: + infos = socket.getaddrinfo(fqdn, None) + except socket.gaierror: + continue + for info in infos: + ip_str = info[4][0] + try: + results.add(ipaddress.ip_address(ip_str)) + except ValueError: + continue + return results + + +def render_allowlist(cidrs: Iterable[str]) -> str: + ipv4: List[str] = [] + ipv6: List[str] = [] + for item in sorted(set(cidrs)): + network = ipaddress.ip_network(item, strict=False) + if network.version == 4: + ipv4.append(str(network)) + else: + ipv6.append(str(network)) + + def format_set(name: str, addr_type: str, values: List[str]) -> str: + if values: + elements_line = f" elements = {{ {', '.join(values)} }}" + else: + elements_line = " elements = {}" + return ( + f"set {name} {{\n" + f" type {addr_type}\n" + f" flags interval\n" + f"{elements_line}\n" + f"}}" + ) + + parts = [ + "# Generated by resolve_allowlist.py", + format_set("medical_allow_v4", "ipv4_addr", ipv4), + format_set("medical_allow_v6", "ipv6_addr", ipv6), + ] + return "\n\n".join(parts) + + +def build_cidrs(config: dict) -> List[str]: + qos = config.get("qos", {}) + medical = qos.get("medical", {}) + fqdns = medical.get("dest_fqdns", []) or [] + cidrs: Set[str] = set(medical.get("dest_cidrs", []) or []) + + resolved = resolve_fqdns(fqdns) + for ip in resolved: + prefix = 32 if ip.version == 4 else 128 + cidrs.add(f"{ip}/{prefix}") + return sorted(cidrs) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG, help="Path to azazel.yaml") + parser.add_argument( + "--output", + type=Path, + default=DEFAULT_OUTPUT, + help="Destination nftables allowlist file", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Print the generated allowlist while still refreshing the output file", + ) + args = parser.parse_args() + + config = load_config(args.config) + cidrs = build_cidrs(config) + output_path = args.output + + if not cidrs: + if output_path.exists(): + print("No CIDRs resolved; keeping existing allowlist") + return 0 + raise SystemExit("No CIDRs available for lockdown allowlist") + + content = render_allowlist(cidrs) + if args.dry_run: + print(content) + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(content + "\n", encoding="utf-8") + return 0 + + +if __name__ == "__main__": # pragma: no cover - CLI entry point + raise SystemExit(main()) diff --git a/tests/unit/test_cli.py b/tests/unit/test_cli.py index d3027b9..583f92e 100644 --- a/tests/unit/test_cli.py +++ b/tests/unit/test_cli.py @@ -11,6 +11,13 @@ def test_cli_build_machine(tmp_path: Path): config.write_text(yaml.safe_dump(data)) machine = cli.build_machine() - daemon = cli.AzazelDaemon(machine=machine, scorer=cli.ScoreEvaluator()) + daemon = cli.AzazelDaemon( + machine=machine, + scorer=cli.ScoreEvaluator(), + decisions_log=tmp_path / "decisions.log", + ) daemon.process_events(cli.load_events(str(config))) - assert machine.current_state.name == "shield" + assert machine.current_state.name == "lockdown" + log_lines = (tmp_path / "decisions.log").read_text().strip().splitlines() + assert log_lines + assert "\"actions\"" in log_lines[0] diff --git a/tests/unit/test_qos.py b/tests/unit/test_qos.py index 977c2f2..f0b873a 100644 --- a/tests/unit/test_qos.py +++ b/tests/unit/test_qos.py @@ -1,3 +1,5 @@ +from pathlib import Path + from azazel_core.qos import QoSPlan, TrafficClassifier @@ -11,7 +13,16 @@ def test_classifier_and_plan(): bucket = classifier.match("203.0.113.10", 80) assert bucket == "medical" - plan = QoSPlan.from_matches([bucket, classifier.match("198.51.100.5", 22)]) - commands = plan.as_dict()["commands"] - assert commands[0]["class"] == "medical" - assert commands[1]["class"] == "ops" + profiles = { + "lte": {"uplink_kbps": 5000}, + "sat": {"uplink_kbps": 2000}, + } + plan = QoSPlan.from_profile(profiles, "lte", Path("configs/tc/classes.htb")) + data = plan.as_dict() + assert data["profile"] == "lte" + assert data["uplink_kbps"] == 5000 + classes = data["classes"] + assert classes["medical"]["rate_kbps"] == 2000 + assert classes["medical"]["ceil_kbps"] == 5000 + assert classes["ops"]["rate_kbps"] == 1250 + assert classes["suspect"]["priority"] == 4 diff --git a/tests/unit/test_state_machine.py b/tests/unit/test_state_machine.py index 2a50d1b..ac61daa 100644 --- a/tests/unit/test_state_machine.py +++ b/tests/unit/test_state_machine.py @@ -1,21 +1,111 @@ +from dataclasses import dataclass + from azazel_core.state_machine import Event, State, StateMachine, Transition -def test_state_machine_transitions(): - idle = State(name="idle") +@dataclass +class FakeClock: + value: float = 0.0 + + def __call__(self) -> float: + return self.value + + +def build_machine(clock: FakeClock) -> StateMachine: + portal = State(name="portal") shield = State(name="shield") - machine = StateMachine(initial_state=idle) + lockdown = State(name="lockdown") + machine = StateMachine(initial_state=portal, clock=clock) + machine.add_transition( + Transition( + source=portal, + target=shield, + condition=lambda event: event.name == "shield", + ) + ) + machine.add_transition( + Transition( + source=portal, + target=lockdown, + condition=lambda event: event.name == "lockdown", + ) + ) machine.add_transition( - Transition(source=idle, target=shield, condition=lambda event: event.name == "escalate") + Transition( + source=shield, + target=portal, + condition=lambda event: event.name == "portal", + ) ) machine.add_transition( - Transition(source=shield, target=idle, condition=lambda event: event.name == "recover") + Transition( + source=shield, + target=lockdown, + condition=lambda event: event.name == "lockdown", + ) ) + machine.add_transition( + Transition( + source=lockdown, + target=shield, + condition=lambda event: event.name == "shield", + ) + ) + machine.add_transition( + Transition( + source=lockdown, + target=portal, + condition=lambda event: event.name == "portal", + ) + ) + return machine + + +def test_state_machine_transitions(): + clock = FakeClock() + machine = build_machine(clock) + + assert machine.current_state.name == "portal" + machine.dispatch(Event(name="shield", severity=55)) + assert machine.current_state.name == "shield" + machine.dispatch(Event(name="lockdown", severity=90)) + assert machine.current_state.name == "lockdown" + machine.dispatch(Event(name="portal", severity=0)) + assert machine.current_state.name == "portal" + + +def test_state_machine_presets_and_unlocks(): + clock = FakeClock() + machine = build_machine(clock) + + # Start from portal – presets come from configs/azazel.yaml + actions = machine.get_actions_preset() + assert actions["delay_ms"] == 100 + assert actions["shape_kbps"] is None + assert actions["block"] is False + + # Escalate to lockdown based on high score + result = machine.apply_score(90) + assert result["desired_mode"] == "lockdown" + assert machine.current_state.name == "lockdown" + actions = machine.get_actions_preset() + assert actions["delay_ms"] == 300 + assert actions["shape_kbps"] == 64 + assert actions["block"] is True + + # Average drops below threshold but unlock wait keeps us in lockdown + result = machine.apply_score(0) + assert result["target_mode"] == "lockdown" + assert machine.current_state.name == "lockdown" + + # Advance clock past the shield unlock window and drop severity + clock.value += 601 + result = machine.apply_score(0) + assert result["target_mode"] == "shield" + assert machine.current_state.name == "shield" - assert machine.current_state == idle - machine.dispatch(Event(name="noop")) - assert machine.current_state == idle - machine.dispatch(Event(name="escalate")) - assert machine.current_state == shield - machine.dispatch(Event(name="recover")) - assert machine.current_state == idle + # Move clock forward for portal unlock and continue low severity + clock.value += 1800 + result = machine.apply_score(0) + assert result["target_mode"] == "portal" + assert machine.current_state.name == "portal" diff --git a/tests/unit/test_vector_schema.py b/tests/unit/test_vector_schema.py new file mode 100644 index 0000000..136cb72 --- /dev/null +++ b/tests/unit/test_vector_schema.py @@ -0,0 +1,101 @@ +import json +from pathlib import Path + +import jsonschema + + +SCHEMA = json.loads(Path("configs/azazel.schema.json").read_text()) +EVENT_SCHEMA = SCHEMA["definitions"]["normalizedEvent"] + + +def normalize_event(event: dict) -> dict: + def first_of(*keys): + for key in keys: + parts = key.split(".") + cursor = event + for part in parts: + if isinstance(cursor, dict) and part in cursor: + cursor = cursor[part] + else: + break + else: + return cursor + return None + + def as_int(value): + if value is None: + return None + try: + return int(value) + except (TypeError, ValueError): + return None + + def as_str(value): + if value is None: + return None + return str(value) + + normalized = { + "ts": as_str(first_of("timestamp", "time")) or "1970-01-01T00:00:00Z", + "node": as_str(first_of("host", "agent.hostname", "node")), + "event": as_str(first_of("event_type", "alert.signature", "service", "message")), + "src": { + "ip": as_str(first_of("src_ip", "src.ip", "source.address")), + "port": as_int(first_of("src_port", "src.port", "source.port")), + }, + "dst": { + "ip": as_str(first_of("dest_ip", "dest.ip", "destination.address")), + "port": as_int(first_of("dest_port", "dest.port", "destination.port")), + }, + "proto": as_str(first_of("proto", "protocol", "transport")), + "sig_id": as_int(first_of("alert.signature_id", "signature_id")), + "score": as_int(first_of("score")), + "severity": as_str(first_of("severity", "alert.severity")), + "actions": [as_str(item) for item in event.get("actions", []) if as_str(item) is not None], + "mode": as_str(event.get("mode")), + "qos_class": as_str(event.get("qos_class")), + "evidence_ref": as_str( + first_of("evidence_ref", "file", "logfile") + ), + } + # Ensure arrays default to empty list and nested dict keys exist + if not normalized["actions"]: + normalized["actions"] = [] + if normalized["src"]["port"] is None: + normalized["src"]["port"] = None + if normalized["dst"]["port"] is None: + normalized["dst"]["port"] = None + return normalized + + +def test_suricata_event_schema(): + event = { + "timestamp": "2024-03-14T10:00:00Z", + "host": "sensor-1", + "event_type": "alert", + "src_ip": "192.0.2.5", + "src_port": 12345, + "dest_ip": "203.0.113.8", + "dest_port": 80, + "proto": "TCP", + "score": 88, + "mode": "shield", + "qos_class": "medical", + "evidence_ref": "eve.json", + "alert": {"signature_id": 2100001, "severity": "high"}, + "actions": ["delay"], + } + normalized = normalize_event(event) + jsonschema.validate(normalized, EVENT_SCHEMA) + + +def test_canary_event_schema(): + event = { + "timestamp": "2024-03-14T10:01:00Z", + "service": "ssh", + "src_ip": "198.51.100.20", + "username": "root", + "logfile": "/var/log/opencanary.log", + } + normalized = normalize_event(event) + jsonschema.validate(normalized, EVENT_SCHEMA)