-
Notifications
You must be signed in to change notification settings - Fork 0
INTEGRATION_GUIDE
Nick edited this page Mar 10, 2026
·
1 revision
PATAS Integration Guide
1.1. Install dependencies:
poetry install
# or
pip install -r requirements.txt1.2. Configuration:
cp .env.example .env
# Edit .env with your settings1.3. Start API server:
poetry run uvicorn app.api.main:app --host 0.0.0.0 --port 8000
# or
poetry run patas-api1.4. Verification:
curl http://localhost:8000/api/v1/health2.1. Ingest messages:
import requests
messages = [
{
"id": "msg_001",
"text": "Your message text",
"is_spam": True,
"meta": {"sender": "user123", "source": "chat456"}
}
]
response = requests.post(
"http://localhost:8000/api/v1/messages/ingest",
json={"messages": messages}
)2.2. Pattern mining:
response = requests.post(
"http://localhost:8000/api/v1/patterns/mine",
json={"days": 7, "use_llm": False, "min_spam_count": 10}
)2.3. Get rules:
response = requests.get(
"http://localhost:8000/api/v1/rules",
params={"status": "active", "include_evaluation": True}
)
rules = response.json()2.4. Export rules:
response = requests.get(
"http://localhost:8000/api/v1/rules/export",
params={"backend": "sql"}
)
sql_rules = response.text3.1. SQL rules:
# Get SQL rules
response = requests.get(
"http://localhost:8000/api/v1/rules/export",
params={"backend": "sql"}
)
sql_rules = response.text
# Apply to your database
import sqlite3
conn = sqlite3.connect("your_database.db")
cursor = conn.cursor()
for rule_sql in sql_rules.split(";"):
rule_sql = rule_sql.strip()
if rule_sql:
cursor.execute(rule_sql)
results = cursor.fetchall()
# Process results3.2. ROL rules:
response = requests.get(
"http://localhost:8000/api/v1/rules/export",
params={"backend": "rol"}
)
rol_rules = response.json()
# Apply ROL rules in your system
for rule in rol_rules["rules"]:
# Your application logic
passfor large datasets use batch processing:
def process_large_dataset(messages_batches):
# 1. Ingest in batches
for batch in messages_batches:
requests.post(
"http://localhost:8000/api/v1/messages/ingest",
json={"messages": batch}
)
# 2. Mine patterns
requests.post(
"http://localhost:8000/api/v1/patterns/mine",
json={"days": 7}
)
# 3. Evaluate and promote
requests.post("http://localhost:8000/api/v1/rules/eval-shadow")
requests.post("http://localhost:8000/api/v1/rules/promote")
# 4. Export
response = requests.get(
"http://localhost:8000/api/v1/rules/export",
params={"backend": "sql"}
)
return response.textfor continuous processing:
import schedule
import time
def daily_pattern_mining():
# Ingest new messages
requests.post("http://localhost:8000/api/v1/messages/ingest", json={...})
# Mine patterns
requests.post("http://localhost:8000/api/v1/patterns/mine", json={"days": 1})
# Evaluate and promote
requests.post("http://localhost:8000/api/v1/rules/eval-shadow")
requests.post("http://localhost:8000/api/v1/rules/promote")
# Export and apply in system
export_and_apply_rules()
# Start daily
schedule.every().day.at("02:00").do(daily_pattern_mining)
while True:
schedule.run_pending()
time.sleep(60)for real-time integration:
from queue import Queue
import threading
message_queue = Queue()
def ingest_worker():
while True:
messages = []
# Collect batch from queue
while len(messages) < 1000 and not message_queue.empty():
messages.append(message_queue.get())
if messages:
requests.post(
"http://localhost:8000/api/v1/messages/ingest",
json={"messages": messages}
)
# Start worker
threading.Thread(target=ingest_worker, daemon=True).start()
# Adding messages to queue
def on_new_message(message):
message_queue.put(message)Recommendations:
-
/api/v1/analyze: Up to 10,000 messages -
/api/v1/messages/ingest: Up to 5,000 messages at a time - Use pagination for large datasets
import asyncio
import aiohttp
async def analyze_batch_async(session, messages):
async with session.post(
"http://localhost:8000/api/v1/analyze",
json={"messages": messages, "run_mining": True}
) as response:
return await response.json()
async def process_multiple_batches(messages_batches):
async with aiohttp.ClientSession() as session:
tasks = [
analyze_batch_async(session, batch)
for batch in messages_batches
]
results = await asyncio.gather(*tasks)
return resultsfrom functools import lru_cache
import requests
@lru_cache(maxsize=100)
def get_active_rules():
response = requests.get(
"http://localhost:8000/api/v1/rules",
params={"status": "active"}
)
return response.json()import requests
import time
def monitor_health():
while True:
try:
response = requests.get(
"http://localhost:8000/api/v1/health",
timeout=5
)
if response.json()["status"] == "ok":
print("PATAS is healthy")
else:
print("PATAS is unhealthy")
except Exception as e:
print(f"Health check failed: {e}")
time.sleep(60)
# Start Monitoringа
monitor_health()def monitor_rule_metrics():
response = requests.get(
"http://localhost:8000/api/v1/rules",
params={"status": "active", "include_evaluation": True}
)
rules = response.json()
for rule in rules:
eval_data = rule.get("evaluation")
if eval_data:
precision = eval_data.get("precision", 0)
ham_hits = eval_data.get("ham_hits", 0)
# Alert if precision drops
if precision < 0.90:
send_alert(f"Rule {rule['id']} precision is {precision:.2%}")
# Alert if ham hits increase
if ham_hits > 10:
send_alert(f"Rule {rule['id']} has {ham_hits} ham hits")if prometheus_client is installed, PATAS exports metrics:
# Metrics available at /metrics endpoint (if configured)
# or via prometheus_client registry- Используйте batch endpoints for больших датасетов
- Добавьте retry logic For production
- Мониторьте метрики правил регулярно
- Используйте conservative profile For production auto-actions
- Тестируйте правила в shadow mode перед применением
- Логируйте all операции for debugging
- Используйте async for множественных запросов
Solution:
- Check metrics:
GET /api/v1/rules?include_evaluation=true - Проверьте
AGGRESSIVENESS_PROFILE(too strict = fewer promotions) - Ensure rules were evaluated:
POST /api/v1/rules/eval-shadow
Solution:
- Уменьшите
PATTERN_MINING_CHUNK_SIZE - Disable LLM if not needed:
use_llm=false - Use lower-level endpoints for large datasets
Solution:
- Проверьте
OPENAI_API_KEY - Проверьте
LLM_PROVIDER(must beopenai, notnone) - Check network access
- API Reference — Complete API Documentation
- Examples — Practical Examples
- Configuration Guide — settings