Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions examples/registration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ Services can configure keepalive behavior:
| `SERVICEKIT_ORCHESTRATOR_URL` | (required) | Orchestrator registration endpoint |
| `SERVICEKIT_HOST` | auto-detected | Service hostname (override auto-detection) |
| `SERVICEKIT_PORT` | 8000 | Service port |
| `SERVICEKIT_REGISTRATION_KEY` | (optional) | Service key for authenticated registration |

### Builder Configuration

Expand All @@ -218,9 +219,56 @@ Services can configure keepalive behavior:
retry_delay=2.0,
fail_on_error=False, # Don't abort on failure
timeout=10.0,
service_key=None, # Service key for authentication
service_key_env="SERVICEKIT_REGISTRATION_KEY",
)
```

### Service Key Authentication

When registering with an orchestrator that requires authentication, you can configure a service key that will be sent as an `X-Service-Key` header with all registration requests (register, keepalive pings, deregister).

**Using environment variable (recommended for production):**

```yaml
services:
my-service:
image: my-service:latest
environment:
SERVICEKIT_ORCHESTRATOR_URL: http://orchestrator:9000/services/$register
SERVICEKIT_REGISTRATION_KEY: ${REGISTRATION_SECRET}
```

**Using direct parameter (for testing):**

```python
app = (
BaseServiceBuilder(info=ServiceInfo(display_name="My Service"))
.with_registration(
orchestrator_url="http://orchestrator:9000/services/$register",
service_key="my-secret-key",
)
.build()
)
```

**Using custom environment variable name:**

```python
app = (
BaseServiceBuilder(info=ServiceInfo(display_name="My Service"))
.with_registration(
service_key_env="MY_APP_REGISTRATION_KEY",
)
.build()
)
```

The service key is included in:
- Initial registration (`POST /services/$register`)
- Keepalive pings (`PUT /services/{id}/$ping`)
- Deregistration (`DELETE /services/{id}`)

## Examples

### Basic Registration (main.py)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "servicekit"
version = "0.6.0"
version = "0.7.0"
description = "Async SQLAlchemy framework with FastAPI integration - reusable foundation for building data services"
readme = "README.md"
authors = [{ name = "Morten Hansen", email = "morten@winterop.com" }]
Expand Down
51 changes: 47 additions & 4 deletions src/servicekit/api/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@
_keepalive_task: asyncio.Task | None = None
_service_id: str | None = None
_ping_url: str | None = None
_service_key: str | None = None


def _resolve_service_key(service_key: str | None, service_key_env: str) -> str | None:
"""Resolve service key from parameter or environment variable."""
if service_key:
return service_key
return os.getenv(service_key_env)


async def register_service(
Expand All @@ -31,8 +39,17 @@ async def register_service(
retry_delay: float = 2.0,
fail_on_error: bool = False,
timeout: float = 10.0,
service_key: str | None = None,
service_key_env: str = "SERVICEKIT_REGISTRATION_KEY",
) -> dict[str, Any] | None:
"""Register service with orchestrator for service discovery and return registration info."""
# Resolve service key for authentication
resolved_service_key = _resolve_service_key(service_key, service_key_env)

# Store globally for keepalive
global _service_key
_service_key = resolved_service_key

# Resolve orchestrator URL
resolved_orchestrator_url = orchestrator_url or os.getenv(orchestrator_url_env)
if not resolved_orchestrator_url:
Expand Down Expand Up @@ -114,12 +131,18 @@ async def register_service(
# Registration with retry logic
last_error: Exception | None = None

# Build headers with optional service key
headers: dict[str, str] = {}
if resolved_service_key:
headers["X-Service-Key"] = resolved_service_key

for attempt in range(1, max_retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
resolved_orchestrator_url,
json=payload,
headers=headers if headers else None,
)
response.raise_for_status()

Expand Down Expand Up @@ -187,16 +210,21 @@ async def register_service(
return None


async def _keepalive_loop(ping_url: str, interval: float, timeout: float) -> None:
async def _keepalive_loop(ping_url: str, interval: float, timeout: float, service_key: str | None) -> None:
"""Background task to periodically ping the orchestrator."""
logger.info("keepalive.started", ping_url=ping_url, interval_seconds=interval)

# Build headers with optional service key
headers: dict[str, str] = {}
if service_key:
headers["X-Service-Key"] = service_key

while True:
try:
await asyncio.sleep(interval)

async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.put(ping_url)
response = await client.put(ping_url, headers=headers if headers else None)
response.raise_for_status()

response_data = response.json()
Expand Down Expand Up @@ -226,6 +254,8 @@ async def start_keepalive(
ping_url: str,
interval: float = 10.0,
timeout: float = 10.0,
service_key: str | None = None,
service_key_env: str = "SERVICEKIT_REGISTRATION_KEY",
) -> None:
"""Start background keepalive task to ping orchestrator."""
global _keepalive_task
Expand All @@ -234,7 +264,10 @@ async def start_keepalive(
logger.warning("keepalive.already_running")
return

_keepalive_task = asyncio.create_task(_keepalive_loop(ping_url, interval, timeout))
# Resolve service key (use global from registration if not provided)
resolved_service_key = _resolve_service_key(service_key, service_key_env) or _service_key

_keepalive_task = asyncio.create_task(_keepalive_loop(ping_url, interval, timeout, resolved_service_key))
logger.info("keepalive.task_started", ping_url=ping_url, interval_seconds=interval)


Expand All @@ -257,6 +290,8 @@ async def deregister_service(
service_id: str,
orchestrator_url: str,
timeout: float = 10.0,
service_key: str | None = None,
service_key_env: str = "SERVICEKIT_REGISTRATION_KEY",
) -> None:
"""Explicitly deregister service from orchestrator."""
# Build deregister URL from orchestrator base URL
Expand All @@ -265,9 +300,17 @@ async def deregister_service(
base_url = orchestrator_url.replace("/$register", "")
deregister_url = f"{base_url}/{service_id}"

# Resolve service key (use global from registration if not provided)
resolved_service_key = _resolve_service_key(service_key, service_key_env) or _service_key

# Build headers with optional service key
headers: dict[str, str] = {}
if resolved_service_key:
headers["X-Service-Key"] = resolved_service_key

try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.delete(deregister_url)
response = await client.delete(deregister_url, headers=headers if headers else None)
response.raise_for_status()

logger.info(
Expand Down
12 changes: 12 additions & 0 deletions src/servicekit/api/service_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class _RegistrationOptions:
enable_keepalive: bool
keepalive_interval: float
auto_deregister: bool
service_key: str | None
service_key_env: str


class ServiceInfo(BaseModel):
Expand Down Expand Up @@ -321,6 +323,8 @@ def with_registration(
enable_keepalive: bool = True,
keepalive_interval: float = 10.0,
auto_deregister: bool = True,
service_key: str | None = None,
service_key_env: str = "SERVICEKIT_REGISTRATION_KEY",
) -> Self:
"""Enable service registration with orchestrator for service discovery."""
self._registration_options = _RegistrationOptions(
Expand All @@ -337,6 +341,8 @@ def with_registration(
enable_keepalive=enable_keepalive,
keepalive_interval=keepalive_interval,
auto_deregister=auto_deregister,
service_key=service_key,
service_key_env=service_key_env,
)
return self

Expand Down Expand Up @@ -666,6 +672,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
retry_delay=registration_options.retry_delay,
fail_on_error=registration_options.fail_on_error,
timeout=registration_options.timeout,
service_key=registration_options.service_key,
service_key_env=registration_options.service_key_env,
)

# Start keepalive if registration succeeded and enabled
Expand All @@ -676,6 +684,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
ping_url=ping_url,
interval=registration_options.keepalive_interval,
timeout=registration_options.timeout,
service_key=registration_options.service_key,
service_key_env=registration_options.service_key_env,
)

try:
Expand All @@ -698,6 +708,8 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
service_id=service_id,
orchestrator_url=orchestrator_url,
timeout=registration_options.timeout,
service_key=registration_options.service_key,
service_key_env=registration_options.service_key_env,
)

for hook in shutdown_hooks:
Expand Down
Loading