Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ jobs:
run: |
sudo apt-get update && sudo apt-get install -y shellcheck
find scripts -name "*.sh" -print0 | xargs -0 -I{} shellcheck {}
- name: Allowlist dry-run
run: |
python scripts/resolve_allowlist.py --dry-run --output build/generated/lockdown_allowlist.nft
test -s build/generated/lockdown_allowlist.nft
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*.log
__pycache__/
dist/
build/
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,23 @@ These principles converge in Azazel’s design: **defense is not about passive p
OpenCanary等を利用し、攻撃者を観察ではなく誘導・拘束。正規ユーザーには影響を与えずに隔離。
*Leverages tools like OpenCanary to mislead and isolate attackers—not merely observe them—without affecting legitimate users.*

- **可搬型設計 / Portable Deployment**
軽量構成でRaspberry Piに最適化。災害対応や一時的な現場展開にも対応。
- **可搬型設計 / Portable Deployment**
軽量構成でRaspberry Piに最適化。災害対応や一時的な現場展開にも対応。
*Lightweight and optimized for Raspberry Pi, enabling easy deployment in disaster recovery or temporary field operations.*

## What's new

- Mode-aware presets backed by `azazel.yaml` apply delay/shape/block actions as
the daemon transitions between portal, shield, and lockdown.
- Vector remap normalization now emits a unified schema (ts/node/src/dst/proto
fields) validated via unit tests.
- QoS plans derive HTB class rate/ceil values per profile using
`configs/tc/classes.htb`.
- Lockdown tooling ships with a templated nftables ruleset and an
`resolve_allowlist.py` utility that resolves medical FQDNs into CIDRs.
- CI enforces schema validation, pytest, shellcheck, and allowlist generation
to ensure release tags remain deployable on clean systems.

---

## 使用技術 / Stack
Expand Down
3 changes: 2 additions & 1 deletion azazel_core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Core modules for the Azazel SOC/NOC controller."""

from .state_machine import StateMachine, State, Transition
from .state_machine import Event, StateMachine, State, Transition
from .scorer import ScoreEvaluator
from .config import AzazelConfig

__all__ = [
"Event",
"StateMachine",
"State",
"Transition",
Expand Down
88 changes: 69 additions & 19 deletions azazel_core/qos/apply.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,84 @@
"""Render QoS classifier results to actionable plans."""
from __future__ import annotations

import re
from dataclasses import dataclass
from typing import Dict, Iterable, List
from pathlib import Path
from typing import Any, Dict, List

from ..actions import ActionResult

CLASS_PATTERN = re.compile(
r"^class\s+(?P<name>[A-Za-z0-9_-]+)\s+prio\s+(?P<priority>\d+)\s+share\s+(?P<share>[0-9.]+)"
)


@dataclass(frozen=True)
class HTBClass:
"""Represents a class definition from classes.htb."""

name: str
priority: int
share: float


def _parse_classes(path: str | Path) -> List[HTBClass]:
classes: List[HTBClass] = []
for line in Path(path).read_text().splitlines():
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
match = CLASS_PATTERN.match(stripped)
if match:
classes.append(
HTBClass(
name=match.group("name"),
priority=int(match.group("priority")),
share=float(match.group("share")),
)
)
if not classes:
raise ValueError(f"No HTB classes defined in {path}")
return classes


@dataclass
class QoSPlan:
"""Container for actions derived from QoS policy."""
"""Container for tc class calculations."""

commands: List[ActionResult]
profile: str
uplink_kbps: int
classes: Dict[str, Dict[str, int]]

@classmethod
def from_matches(cls, matches: Iterable[str]) -> "QoSPlan":
commands: List[ActionResult] = []
for match in matches:
commands.append(
ActionResult(
command="tc class add",
parameters={"class": match},
)
)
return cls(commands=commands)
def from_profile(
cls,
profiles: Dict[str, Dict[str, Any]],
profile_name: str,
classes_path: str | Path,
) -> "QoSPlan":
try:
profile = profiles[profile_name]
except KeyError as exc: # pragma: no cover - defensive guard
raise KeyError(f"Unknown profile: {profile_name}") from exc

uplink = int(profile.get("uplink_kbps", 0) or 0)
if uplink <= 0:
raise ValueError(f"Profile {profile_name} must define uplink_kbps > 0")

classes = _parse_classes(classes_path)
plan: Dict[str, Dict[str, int]] = {}
for entry in classes:
rate = max(1, int(round(uplink * (entry.share / 100.0))))
plan[entry.name] = {
"priority": entry.priority,
"rate_kbps": rate,
"ceil_kbps": uplink,
}
return cls(profile=profile_name, uplink_kbps=uplink, classes=plan)

def as_dict(self) -> Dict[str, List[Dict[str, str]]]:
def as_dict(self) -> Dict[str, Any]:
return {
"commands": [
{"command": result.command, **result.parameters}
for result in self.commands
]
"profile": self.profile,
"uplink_kbps": self.uplink_kbps,
"classes": self.classes,
}
150 changes: 148 additions & 2 deletions azazel_core/state_machine.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
"""Light-weight state machine driving Azazel defensive posture changes."""
from __future__ import annotations

import time
from collections import deque
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional
from pathlib import Path
from typing import Any, Callable, Deque, Dict, List, Optional

import yaml


CONFIG_PATH = Path(__file__).resolve().parents[1] / "configs" / "azazel.yaml"


@dataclass(frozen=True)
Expand Down Expand Up @@ -33,18 +41,27 @@ class Transition:

@dataclass
class StateMachine:
"""Simple but testable state machine implementation."""
"""Mode-aware state machine with YAML-backed presets."""

initial_state: State
transitions: List[Transition] = field(default_factory=list)
config_path: str | Path | None = None
window_size: int = 5
clock: Callable[[], float] = field(default=time.monotonic, repr=False)
current_state: State = field(init=False)

def __post_init__(self) -> None:
self.current_state = self.initial_state
self._transition_map: Dict[str, List[Transition]] = {}
for transition in self.transitions:
self.add_transition(transition)
self._config_cache: Dict[str, Any] | None = None
self._score_window: Deque[int] = deque(maxlen=max(self.window_size, 1))
self._unlock_until: Dict[str, float] = {}

# ------------------------------------------------------------------
# Transition helpers
# ------------------------------------------------------------------
def add_transition(self, transition: Transition) -> None:
"""Register a new transition."""

Expand All @@ -58,6 +75,7 @@ def dispatch(self, event: Event) -> State:
if transition.condition(event):
previous = self.current_state
self.current_state = transition.target
self._handle_transition(previous, self.current_state)
if transition.action:
transition.action(previous, self.current_state, event)
return self.current_state
Expand All @@ -67,6 +85,8 @@ def reset(self) -> None:
"""Reset the state machine to its initial state."""

self.current_state = self.initial_state
self._score_window.clear()
self._unlock_until.clear()

def summary(self) -> Dict[str, str]:
"""Return a serializable summary of the state machine."""
Expand All @@ -75,3 +95,129 @@ def summary(self) -> Dict[str, str]:
"state": self.current_state.name,
"description": self.current_state.description,
}

# ------------------------------------------------------------------
# Configuration helpers
# ------------------------------------------------------------------
def _resolve_config_path(self) -> Path:
if self.config_path is not None:
return Path(self.config_path)
return CONFIG_PATH

def _load_config(self) -> Dict[str, Any]:
if self._config_cache is None:
path = self._resolve_config_path()
data = yaml.safe_load(path.read_text())
if not isinstance(data, dict):
raise ValueError("Configuration root must be a mapping")
self._config_cache = data
return self._config_cache

def reload_config(self) -> None:
"""Force re-reading of the YAML configuration."""

self._config_cache = None

def get_thresholds(self) -> Dict[str, Any]:
"""Return shield/lockdown thresholds and unlock windows."""

config = self._load_config()
thresholds = config.get("thresholds", {})
unlock = thresholds.get("unlock_wait_secs", {})
return {
"t1": int(thresholds.get("t1_shield", 0) or 0),
"t2": int(thresholds.get("t2_lockdown", 0) or 0),
"unlock_wait_secs": {
"shield": int(unlock.get("shield", 0) or 0),
"portal": int(unlock.get("portal", 0) or 0),
},
}

def get_actions_preset(self) -> Dict[str, Any]:
"""Return the action plan preset for the current mode."""

config = self._load_config()
actions = config.get("actions", {})
preset = actions.get(self.current_state.name, {})
shape = preset.get("shape_kbps")
return {
"delay_ms": int(preset.get("delay_ms", 0) or 0),
"shape_kbps": int(shape) if shape not in (None, "", False) else None,
"block": bool(preset.get("block", False)),
}

# ------------------------------------------------------------------
# Score window evaluation
# ------------------------------------------------------------------
def evaluate_window(self, severity: int) -> Dict[str, Any]:
"""Append a severity score and compute moving average decisions."""

self._score_window.append(max(int(severity), 0))
average = sum(self._score_window) / len(self._score_window)
thresholds = self.get_thresholds()
desired_mode = "portal"
if average >= thresholds["t2"]:
desired_mode = "lockdown"
elif average >= thresholds["t1"]:
desired_mode = "shield"
return {"average": average, "desired_mode": desired_mode}

def apply_score(self, severity: int) -> Dict[str, Any]:
"""Evaluate the score window and transition to the appropriate mode."""

evaluation = self.evaluate_window(severity)
desired_mode = evaluation["desired_mode"]
now = self.clock()
target_mode = desired_mode
if desired_mode == "portal":
target_mode = self._target_for_portal(now)
elif desired_mode == "shield":
target_mode = self._target_for_shield(now)

if target_mode != self.current_state.name:
self.dispatch(Event(name=target_mode, severity=severity))

evaluation.update({
"target_mode": target_mode,
"applied_mode": self.current_state.name,
})
return evaluation

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _handle_transition(self, previous: State, current: State) -> None:
thresholds = self.get_thresholds()
unlocks = thresholds.get("unlock_wait_secs", {})
now = self.clock()
if current.name == "lockdown":
wait_shield = unlocks.get("shield", 0)
if wait_shield:
self._unlock_until["shield"] = now + wait_shield
elif current.name == "shield":
wait_portal = unlocks.get("portal", 0)
if wait_portal:
self._unlock_until["portal"] = now + wait_portal
self._unlock_until.pop("shield", None)
elif current.name == "portal":
self._unlock_until.clear()

def _target_for_shield(self, now: float) -> str:
if self.current_state.name == "lockdown":
unlock_at = self._unlock_until.get("shield", 0.0)
if now < unlock_at:
return "lockdown"
return "shield"

def _target_for_portal(self, now: float) -> str:
if self.current_state.name == "lockdown":
unlock_at = self._unlock_until.get("shield", 0.0)
if now < unlock_at:
return "lockdown"
# Step-down path: lockdown -> shield before portal.
return "shield"
if self.current_state.name == "shield":
unlock_at = self._unlock_until.get("portal", 0.0)
if now < unlock_at:
return "shield"
return "portal"
Loading