-
Notifications
You must be signed in to change notification settings - Fork 0
EXAMPLES
Nick edited this page Mar 10, 2026
·
1 revision
Practical examples of using PATAS API based on real code
Python:
import requests
# Prepare messages
messages = [
{
"id": "msg_001",
"text": "Buy now! http://spam.com",
"is_spam": True,
"meta": {"sender": "user123", "source": "chat456"}
},
{
"id": "msg_002",
"text": "Click here: http://spam.com",
"is_spam": True,
"meta": {"sender": "user456", "source": "chat789"}
}
]
# Send request
response = requests.post(
"http://localhost:8000/api/v1/analyze",
json={
"messages": messages,
"run_mining": True,
"run_evaluation": True,
"export_backend": "sql"
}
)
result = response.json()
# Process results
print(f"Discovered {len(result['patterns'])} patterns")
print(f"Generated {len(result['rules'])} rules")
# Output SQL queries
for pattern in result["patterns"]:
print(f"\nPattern: {pattern['description']}")
print(f"Group size: {pattern['group_size']}")
print(f"SQL: {pattern['sql_query']}")
# Export rules
if result.get("export"):
print(f"\nExported SQL Rules:\n{result['export']}")JavaScript:
const messages = [
{
id: 'msg_001',
text: 'Buy now! http://spam.com',
is_spam: true,
meta: { sender: 'user123', source: 'chat456' }
}
];
const response = await fetch('http://localhost:8000/api/v1/analyze', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages,
run_mining: true,
run_evaluation: true,
export_backend: 'sql'
})
});
const result = await response.json();
console.log(`Discovered ${result.patterns.length} patterns`);
result.patterns.forEach(pattern => {
console.log(`Pattern: ${pattern.description}`);
console.log(`SQL: ${pattern.sql_query}`);
});For large datasets, use lower-level endpoints:
Python:
import requests
from datetime import datetime, timedelta
BASE_URL = "http://localhost:8000/api/v1"
# 1. Ingest messages in batches
def ingest_batch(messages_batch):
response = requests.post(
f"{BASE_URL}/messages/ingest",
json={"messages": messages_batch}
)
return response.json()
# 2. Run pattern mining
def mine_patterns(days=7):
response = requests.post(
f"{BASE_URL}/patterns/mine",
json={"days": days, "use_llm": True, "min_spam_count": 10}
)
return response.json()
# 3. Evaluate shadow rules
def evaluate_rules(days=7):
response = requests.post(
f"{BASE_URL}/rules/eval-shadow",
json={"days": days}
)
return response.json()
# 4. Promote rules
def promote_rules():
response = requests.post(f"{BASE_URL}/rules/promote")
return response.json()
# 5. Export active rules
def export_rules(backend="sql"):
response = requests.get(
f"{BASE_URL}/rules/export",
params={"backend": backend}
)
if backend == "sql":
return response.text
else:
return response.json()
# Full workflow
def full_workflow(messages_batches):
# Ingest
for batch in messages_batches:
ingest_batch(batch)
# Mine patterns
mining_result = mine_patterns(days=7)
print(f"Mined {mining_result['patterns_created']} patterns")
# Evaluate
eval_result = evaluate_rules(days=7)
print(f"Evaluated {eval_result['evaluated_count']} rules")
# Promote
promote_result = promote_rules()
print(f"Promoted {promote_result['promoted_count']} rules")
# Export
sql_rules = export_rules("sql")
print(f"Exported {len(sql_rules.split(';'))} SQL rules")
return sql_rulesPython:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import time
def create_session_with_retry():
session = requests.Session()
retry_strategy = Retry(
total=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def analyze_with_retry(session, messages, max_retries=3):
for attempt in range(max_retries):
try:
response = session.post(
"http://localhost:8000/api/v1/analyze",
json={"messages": messages, "run_mining": True},
timeout=30
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 400:
# Bad request - don't retry
error_detail = e.response.json().get("detail", "Unknown error")
raise ValueError(f"Bad request: {error_detail}")
elif attempt < max_retries - 1:
# Retry on server errors
wait_time = 2 ** attempt
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
except requests.exceptions.RequestException as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"Network error, retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
raise
# Usage
session = create_session_with_retry()
result = analyze_with_retry(session, messages)Python (asyncio + aiohttp):
import asyncio
import aiohttp
async def analyze_batch_async(session, messages):
async with session.post(
"http://localhost:8000/api/v1/analyze",
json={"messages": messages, "run_mining": True}
) as response:
response.raise_for_status()
return await response.json()
async def process_multiple_batches(messages_batches):
async with aiohttp.ClientSession() as session:
tasks = [
analyze_batch_async(session, batch)
for batch in messages_batches
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Batch {i} failed: {result}")
else:
print(f"Batch {i}: {len(result['patterns'])} patterns")
# Usage
asyncio.run(process_multiple_batches(messages_batches))Python:
import requests
def get_rules_with_metrics(status="active"):
response = requests.get(
"http://localhost:8000/api/v1/rules",
params={
"status": status,
"include_evaluation": True,
"limit": 100
}
)
return response.json()
def filter_high_precision_rules(rules, min_precision=0.95):
return [
rule for rule in rules
if rule.get("evaluation") and rule["evaluation"].get("precision", 0) >= min_precision
]
def analyze_rule_performance(rules):
for rule in rules:
eval_data = rule.get("evaluation")
if eval_data:
print(f"Rule {rule['id']}:")
print(f" Precision: {eval_data.get('precision', 0):.2%}")
print(f" Coverage: {eval_data.get('coverage', 0):.2%}")
print(f" Hits: {eval_data.get('hits_total', 0)}")
print(f" Ham hits: {eval_data.get('ham_hits', 0)}")
print()
# Usage
rules = get_rules_with_metrics("active")
high_precision = filter_high_precision_rules(rules, min_precision=0.95)
analyze_rule_performance(high_precision)Python (SQLite):
import sqlite3
import requests
def apply_patas_rules_to_db(db_path, sql_rules):
"""Apply PATAS rules to local database"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Get rules from PATAS
response = requests.get(
"http://localhost:8000/api/v1/rules/export",
params={"backend": "sql"}
)
sql_rules = response.text
# Execute each SQL query
for rule_sql in sql_rules.split(";"):
rule_sql = rule_sql.strip()
if rule_sql:
try:
cursor.execute(rule_sql)
results = cursor.fetchall()
print(f"Rule matched {len(results)} messages")
except sqlite3.Error as e:
print(f"Error executing rule: {e}")
conn.close()
# Usage
apply_patas_rules_to_db("messages.db", None)Python:
import requests
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def analyze_with_logging(messages):
start_time = datetime.now()
try:
response = requests.post(
"http://localhost:8000/api/v1/analyze",
json={"messages": messages, "run_mining": True},
timeout=30
)
response.raise_for_status()
result = response.json()
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(f"Analysis completed in {elapsed:.2f}s")
logger.info(f"Patterns: {len(result['patterns'])}")
logger.info(f"Rules: {len(result['rules'])}")
if result.get("meta", {}).get("timings"):
timings = result["meta"]["timings"]
logger.info(f"Timings: {timings}")
return result
except requests.exceptions.RequestException as e:
logger.error(f"Request failed: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
# Usage
result = analyze_with_logging(messages)Bash:
# Ingest logs
patas ingest-logs api 7 1000
# Mine patterns
patas mine-patterns 7 true
# Evaluate rules
patas eval-rules 7 10
# Promote rules
patas promote-rules
# Demo for Telegram
patas demo-telegram --profile=conservative --input=sample.jsonl --out=./demo_report
# Explain rule
patas explain-rule --id=123 --max-examples=5-
Batch size: Use batches up to 10,000 messages for
/api/v1/analyze - Retry logic: Always add retry for production
- Error handling: Check response status and handle errors
- Timeout: Set reasonable timeout (30-60 seconds)
- Monitoring: Log metrics and timing
- Async: Use async for multiple requests
- API Reference — Complete Documentation
- Integration Guide — Integration Guide
- Configuration Guide — Configuration settings