Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,27 @@ Modern cyber attacks are increasingly fast and automated, making traditional hon

The core philosophy recognizes that in asymmetric cyber warfare, defenders often cannot prevent initial compromise but can control the attacker's subsequent actions. By implementing strategic delays and misdirection, Azazel creates opportunities for detection, analysis, and response.

### Developer notes and helper API

The `TrafficControlEngine` exposes two helpers to make testing and development easier:

- `TrafficControlEngine.set_subprocess_runner(runner_callable)`
- Inject a custom subprocess runner in tests to simulate `tc`/`nft` outputs without running system commands.
- The runner should accept `(cmd, **kwargs)` and return an object with attributes `returncode`, `stdout`, and `stderr` (a `subprocess.CompletedProcess` is ideal).
- Example usage in tests:

```py
from azazel_pi.core.enforcer.traffic_control import get_traffic_control_engine, make_completed_process

engine = get_traffic_control_engine()
engine.set_subprocess_runner(lambda cmd, **kw: make_completed_process(cmd, 0, stdout='ok'))
```

- `make_completed_process(cmd, returncode=0, stdout='', stderr='')`
- Convenience factory (available at module level in `traffic_control.py`) to produce CompletedProcess-like objects for tests.

These APIs make it simple to unit-test enforcer behavior without requiring root or modifying the host network stack.

## What's New

### Enhanced AI Integration (v3) - November 2024
Expand Down
37 changes: 37 additions & 0 deletions README_ja.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,40 @@
### 開発者向けメモとテスト用ヘルパー API

`TrafficControlEngine` はテストや開発を容易にするためのヘルパーを公開しています。特にネットワーク周りの `tc` / `nft` コマンドを実行する箇所をモックし、root 権限や実際のネットワークスタックに依存せずに単体テストできるようになっています。
- `TrafficControlEngine.set_subprocess_runner(runner_callable)`
- テストでカスタムのサブプロセスランナーを注入して、`tc`/`nft` の出力を模擬できます。
- `runner_callable` は `(cmd, **kwargs)` を受け取り、`returncode` / `stdout` / `stderr` 属性を持つオブジェクト(`subprocess.CompletedProcess` が最適)を返すようにしてください。
- 例(テスト内での利用):

```py
from azazel_pi.core.enforcer.traffic_control import TrafficControlEngine, make_completed_process
from tests.utils.fake_subprocess import FakeSubprocess

fake = FakeSubprocess()
fake.when("tc qdisc show").then_stdout("")
fake.when("nft -a add rule").then_stdout("added rule handle 123")

# クラス属性に注入してインスタンス化すると __init__ 時のセットアップ呼び出しもモックされます
TrafficControlEngine._subprocess_runner = fake
engine = TrafficControlEngine(config_path="runtime/test_azazel.yaml")
try:
ok = engine.apply_dnat_redirect("10.0.0.5", dest_port=2222)
assert ok
rules = engine.get_active_rules()
assert "10.0.0.5" in rules
finally:
# テスト後にクラス属性をクリーンアップ
try:
delattr(TrafficControlEngine, "_subprocess_runner")
except Exception:
pass
```

- `make_completed_process(cmd, returncode=0, stdout='', stderr='')`
- テスト用の `subprocess.CompletedProcess` 相当を簡単に作るためのファクトリ関数です。
- モジュール (`azazel_pi.core.enforcer.traffic_control`) のトップレベルに公開されています。

これらの API により、root 権限や実行環境に依存しないユニットテストが容易になります。テスト用の軽量ユーティリティ `tests/utils/fake_subprocess.py` も用意しており、単純なコマンド substring マッチングで期待する stdout/stderr を返すことができます。
# AZ-01X Azazel-Pi - The Cyber Scapegoat Gateway

[English](README.md) | 日本語
Expand Down
226 changes: 226 additions & 0 deletions azazel_pi/core/async_ai.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Asynchronous Deep AI evaluation worker.

This module provides a simple background queue to run costly Ollama
evaluations asynchronously. Call `enqueue(alert_data, context)` to
schedule a deep analysis. Results are logged and optionally sent via
Mattermost notifications when configured.
"""
from __future__ import annotations

import logging
import threading
import random
import time
from queue import Queue
from typing import Any, Dict, Optional
from pathlib import Path

# lazy resolver for AI evaluator to avoid importing heavy deps at module import time
get_ai_evaluator = None
from . import notify_config
try:
from .notify import MattermostNotifier
except Exception:
MattermostNotifier = None

logger = logging.getLogger(__name__)

_queue: "Queue[Dict[str, Any]]" = Queue()
_started = False

# Sampling/rate defaults
_DEFAULT_SAMPLE_RATE = float(notify_config.get("ai", {}).get("deep_sample_rate", 1.0))
_DEFAULT_MAX_PER_MIN = int(notify_config.get("ai", {}).get("deep_max_per_min", 60) or 60)

# Simple rate limiter state (use time.time())
_tokens = _DEFAULT_MAX_PER_MIN
_last_refill = time.time()
_rate_lock = threading.Lock()


def _allow_enqueue() -> bool:
"""Decide whether to allow enqueue based on sampling and a simple token bucket.

Thread-safe and uses wall-clock minute-based refill. Returns True when
the item is allowed to be enqueued.
"""
# Probabilistic sampling
try:
if random.random() > _DEFAULT_SAMPLE_RATE:
return False
except Exception:
# if random fails for some reason, default to allow
pass

global _tokens, _last_refill
with _rate_lock:
try:
current = int(time.time())
# refill once per minute
if current - int(_last_refill) >= 60:
_tokens = _DEFAULT_MAX_PER_MIN
_last_refill = current
if _tokens <= 0:
return False
_tokens -= 1
return True
except Exception:
return True


def _worker() -> None:
logger.info("Async AI worker started")
while True:
item = _queue.get()
if item is None:
break
alert = item.get("alert") or {}
context = item.get("context") or {}
try:
# resolve evaluator lazily; tests may monkeypatch `get_ai_evaluator` in this module
evaluator = None
try:
if callable(get_ai_evaluator):
evaluator = get_ai_evaluator()
else:
# lazy import (avoid importing requests during test collection)
from .ai_evaluator import get_ai_evaluator as _g
globals()['get_ai_evaluator'] = _g
evaluator = _g()
except Exception:
logger.warning("No AI evaluator available for deep analysis")
continue

sig_safe = str(alert.get('signature') or '')
src_safe = str(alert.get('src_ip') or '')
logger.info(f"Running deep AI analysis for {src_safe} {sig_safe}")
# evaluator may be flaky; retry a small number of times with backoff
max_eval_retries = int(notify_config.get("ai", {}).get("deep_eval_retries", 2) or 2)
eval_attempt = 0
result = None
while eval_attempt <= max_eval_retries:
try:
result = evaluator.evaluate_threat(alert)
break
except Exception:
eval_attempt += 1
wait = 0.5 * (2 ** (eval_attempt - 1))
logger.exception(f"Deep eval attempt {eval_attempt} failed, retrying in {wait}s")
time.sleep(wait)
if result is None:
logger.error("Deep AI evaluation failed after retries")
continue
logger.info(f"Deep AI result: {result}")

# Persist deep result to decisions.log if provided in context
decisions_path = context.get("decisions_log")
if decisions_path:
p = Path(decisions_path)
p.parent.mkdir(parents=True, exist_ok=True)
entry = {
"event": alert.get("signature", "deep_ai"),
"score": result.get("score") or ((result.get("risk", 1) - 1) * 25),
"classification": result.get("category"),
"timestamp": alert.get("timestamp"),
"deep_ai": result,
"note": "deep_followup",
}
import json

# write with retries to tolerate transient FS/permission issues
max_persist_retries = int(notify_config.get("ai", {}).get("deep_persist_retries", 3) or 3)
attempt = 0
while attempt <= max_persist_retries:
try:
with p.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(entry, sort_keys=True, ensure_ascii=False))
fh.write("\n")
fh.flush()
break
except Exception:
attempt += 1
wait = 0.25 * (2 ** (attempt - 1))
logger.exception(f"Failed to persist deep AI result (attempt {attempt}), retrying in {wait}s")
time.sleep(wait)
else:
logger.error("Giving up persisting deep AI result after retries")

# Send a follow-up Mattermost notification if possible (with retries)
if MattermostNotifier is not None:
notifier = None
try:
notifier = MattermostNotifier()
except Exception:
logger.exception("Failed to construct MattermostNotifier")
if notifier is not None:
text = (
f"🔎 Deep AI analysis result for {alert.get('src_ip')} - "
f"risk={result.get('risk')} category={result.get('category')}\n{result.get('reason')}"
)
payload = {
"timestamp": alert.get("timestamp"),
"signature": "🔎 Deep AI Analysis",
"severity": 3,
"src_ip": alert.get("src_ip"),
"details": text,
}
sig_short = (str(alert.get('signature') or '') )[:50]
key = f"deep:{str(alert.get('src_ip') or '')}:{sig_short}"
max_notify_retries = int(notify_config.get("notify", {}).get("notify_retries", 2) or 2)
ntry = 0
while ntry <= max_notify_retries:
try:
notifier.notify(payload, key=key)
break
except Exception:
ntry += 1
wait = 0.5 * (2 ** (ntry - 1))
logger.exception(f"Deep notify attempt {ntry} failed, retrying in {wait}s")
time.sleep(wait)
else:
logger.error("Giving up deep notify after retries")

except Exception:
logger.exception("Async AI worker encountered an error during evaluation")
finally:
try:
_queue.task_done()
except Exception:
pass


def start() -> None:
global _started
if _started:
return
t = threading.Thread(target=_worker, daemon=True, name="azazel-async-ai")
t.start()
_started = True


def enqueue(alert: Dict[str, Any], context: Optional[Dict[str, Any]] = None) -> None:
"""Schedule a deep AI evaluation for the given alert.

This returns immediately; evaluation runs in background.
"""
start()
ctx = context or {}
# sampling / rate limiting
try:
allow = _allow_enqueue()
except Exception:
allow = True
if not allow:
logger.info("Async AI enqueue skipped by sampling/rate limit")
return
_queue.put({"alert": alert, "context": ctx})


def shutdown() -> None:
"""Gracefully stop the worker (for tests).

Note: This will block until the worker thread sees the sentinel.
"""
_queue.put(None)
Loading
Loading