Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions .github/workflows/trigger-infra.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# =============================================================================
# Trigger Infra Deployment on Push
# =============================================================================
# This workflow triggers the infra repo deployment when changes are pushed
# to the main branch of worker.
#
# Required Secret:
# INFRA_DISPATCH_TOKEN - A GitHub Personal Access Token (PAT) with:
# - repo scope (or fine-grained: contents:write on Luxia-AI/infra)
# Create this secret in: Settings > Secrets and variables > Actions
# =============================================================================

name: Trigger Infra Deploy

on:
push:
branches:
- main

jobs:
trigger-deploy:
name: Trigger Infra Workflow
runs-on: ubuntu-latest

steps:
- name: Log dispatch info
run: |
echo "=========================================="
echo "Triggering infra deployment"
echo "=========================================="
echo "Service: worker"
echo "Commit: ${{ github.sha }}"
echo "Ref: ${{ github.ref }}"
echo "=========================================="

- name: Dispatch to infra repo
run: |
curl -X POST \
-H "Accept: application/vnd.github+json" \
-H "Authorization: Bearer ${{ secrets.INFRA_DISPATCH_TOKEN }}" \
-H "X-GitHub-Api-Version: 2022-11-28" \
https://api.github.com/repos/Luxia-AI/infra/dispatches \
-d '{"event_type":"service-updated","client_payload":{"service":"worker","sha":"${{ github.sha }}"}}'
echo "Dispatch sent successfully"
23 changes: 23 additions & 0 deletions app/constants/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,29 @@
Centralized settings for models, domains, API endpoints, and pipeline thresholds.
"""

from enum import Enum

# ============================================================================
# VALIDATION & VERDICT STATE ENUMS
# ============================================================================


class ValidationState(Enum):
"""Evidence validation state for domain trust resolution."""

TRUSTED = "trusted"
UNTRUSTED = "untrusted"
PENDING_DOMAIN_TRUST = "pending_domain_trust"


class VerdictState(Enum):
"""Verdict state: confirms confidence level and domain trust timing."""

CONFIRMED = "confirmed" # Domain was trusted at verdict time
PROVISIONAL = "provisional" # Domain approval pending; verdict may change
REVOKED = "revoked" # Domain trust was removed after verdict


# ============================================================================
# PIPELINE THRESHOLDS & SETTINGS
# ============================================================================
Expand Down
39 changes: 39 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,46 @@ class Settings(BaseSettings):
REDIS_URL: str = Field(default="redis://localhost:6379", description="Redis connection URL")
LOG_DB_PATH: str = Field(default="logs.db", description="SQLite database path for persistent logs")

# Kafka Configuration
KAFKA_BOOTSTRAP: str = Field(default="kafka:29092", description="Kafka bootstrap servers")
KAFKA_SECURITY_PROTOCOL: str = Field(
default="PLAINTEXT",
description="Kafka security protocol (PLAINTEXT or SASL_SSL for Azure Event Hubs)",
)
KAFKA_SASL_MECHANISM: str = Field(default="PLAIN", description="Kafka SASL mechanism")
KAFKA_SASL_USERNAME: str = Field(
default="",
description="Kafka SASL username ($ConnectionString for Azure Event Hubs)",
)
KAFKA_SASL_PASSWORD: str = Field(
default="",
description="Kafka SASL password (connection string for Azure Event Hubs)",
)

model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(env_file=".env", extra="ignore")

def get_kafka_config(self) -> dict:
"""Build Kafka client configuration with optional SASL/SSL for Azure Event Hubs."""
import ssl

config = {
"bootstrap_servers": self.KAFKA_BOOTSTRAP,
}

# Azure Event Hubs requires SASL_SSL
if self.KAFKA_SECURITY_PROTOCOL == "SASL_SSL":
ssl_context = ssl.create_default_context()
config.update(
{
"security_protocol": "SASL_SSL",
"sasl_mechanism": self.KAFKA_SASL_MECHANISM,
"sasl_plain_username": self.KAFKA_SASL_USERNAME,
"sasl_plain_password": self.KAFKA_SASL_PASSWORD,
"ssl_context": ssl_context,
}
)

return config


settings = Settings()
89 changes: 85 additions & 4 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import asyncio
import json

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI

from app.core.config import settings
Expand All @@ -10,13 +14,57 @@

logger = get_logger(__name__)

# Global LogManager instance
# Global instances
_log_manager: LogManager | None = None
_kafka_consumer: AIOKafkaConsumer | None = None
_kafka_producer: AIOKafkaProducer | None = None


async def process_jobs():
"""Background task to process jobs from Kafka."""
global _kafka_consumer, _kafka_producer
if not _kafka_consumer or not _kafka_producer:
logger.error("Kafka consumer or producer not initialized")
return

try:
async for message in _kafka_consumer:
job_data = message.value
job_id = job_data.get("job_id")
logger.info(f"Received job: {job_id}")

# Send started update
await _kafka_producer.send(
"jobs.results",
{
"job_id": job_id,
"status": "processing",
"timestamp": asyncio.get_event_loop().time(),
},
)

# Simulate processing (replace with actual processing)
await asyncio.sleep(5) # Simulate work

# Send completed update
await _kafka_producer.send(
"jobs.results",
{
"job_id": job_id,
"status": "completed",
"results": {"message": "Job completed"},
"timestamp": asyncio.get_event_loop().time(),
},
)
logger.info(f"Completed job: {job_id}")

except Exception as e:
logger.error(f"Error processing jobs: {e}")


async def startup_event() -> None:
"""Initialize logging system on app startup."""
global _log_manager # noqa: F841
"""Initialize logging system and Kafka consumer on app startup."""
global _log_manager, _kafka_consumer, _kafka_producer

try:
# Create LogManager with Redis + SQLite
Expand All @@ -34,8 +82,33 @@ async def startup_event() -> None:

logger.info(f"[Main] LogManager initialized with Redis={settings.REDIS_URL}, DB={settings.LOG_DB_PATH}")

# Get Kafka configuration (supports SASL/SSL for Azure Event Hubs)
kafka_config = settings.get_kafka_config()

# Initialize Kafka producer
_kafka_producer = AIOKafkaProducer(
**kafka_config,
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
)
await _kafka_producer.start()

# Initialize Kafka consumer for jobs
_kafka_consumer = AIOKafkaConsumer(
"jobs.general", # For now, consume from general
**kafka_config,
group_id="worker-general-group",
auto_offset_reset="latest",
value_deserializer=lambda v: json.loads(v.decode("utf-8")),
)
await _kafka_consumer.start()

# Start background job processor
asyncio.create_task(process_jobs())

logger.info(f"[Main] Kafka producer and consumer started (bootstrap={settings.KAFKA_BOOTSTRAP})")

except Exception as e:
logger.error(f"[Main] Failed to initialize LogManager: {e}")
logger.error(f"[Main] Failed to initialize: {e}")


async def shutdown_event() -> None:
Expand All @@ -44,6 +117,14 @@ async def shutdown_event() -> None:
await _log_manager.stop()
logger.info("[Main] LogManager stopped")

if _kafka_consumer:
await _kafka_consumer.stop()
logger.info("[Main] Kafka consumer stopped")

if _kafka_producer:
await _kafka_producer.stop()
logger.info("[Main] Kafka producer stopped")


app = FastAPI(title="Luxia Worker Service", version="1.0.0")

Expand Down
Loading