Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions features/F47-advanced-rbac/REPORT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
id: F47-advanced-rbac
type: report
summary: "Implemented Role-Based Access Control (Admin, Operator, Viewer)"
status: completed
validations:
- unit_tests: "tests/unit/test_auth.py: 100% pass"
- integration_tests: "tests/integration/test_cli_auth_rbac_admin.py: 100% pass"
- lint: "ruff check passed"
- typecheck: "mypy passed"
security_review:
verdict: approved
risks:
- risk: "Privilege escalation via token issue"
mitigation: "Only 'auth:manage' permission (admin) can issue tokens."
- risk: "Role confusion"
mitigation: "Explicit Role enum and strict Pydantic validation."
- risk: "Inconsistent enforcement"
mitigation: "Centralized _resolve_principal_id check in CLI entry points."
notes: "The implementation relies on the filesystem for persistence (auth-registry.json). File permissions (0600) are enforced by AuthRegistryStore."
---

# Feature Report: F47 - Advanced RBAC

## Contexto

The AIgnt OS CLI previously operated with a flat authorization model (all valid tokens had full access). As the system scales to support different user personas (viewers vs operators vs admins), a more granular permission system was required.

## Mudanças Realizadas

1. **Auth Model**: Introduced `AuthRole` (admin, operator, viewer) and `Permission` sets.
2. **Registry**: Updated `AuthRegistryStore` to persist roles alongside principal IDs.
3. **CLI Enforcement**: Added `_resolve_principal_id(permission=...)` checks to all sensitive commands.
- `runs submit`, `runs stop` -> `run:write`
- `runs list`, `runs show`, `runs watch`, `runs follow` -> `run:read`
- `runtime *` -> `runtime:manage`
- `auth issue`, `auth disable` -> `auth:manage`
4. **CLI UX**: Added `--role` flag to `auth issue` and `auth init` to create principals with specific roles.

## Validação

- **Unit Tests**: Verified `is_authorized` logic and `AuthRegistryStore` persistence.
- **Integration Tests**: Verified CLI behavior for different roles (viewer cannot submit run, operator cannot issue tokens).
- **Regression**: Verified existing auth flows remain functional (backward compatibility for tokens without explicit role in old registries, though new registry structure enforces it).

## Próximos Passos

- Consider moving to a database-backed auth store for higher concurrency/security in future.
- Add "service account" concept if non-human actors need specific subsets of permissions beyond standard roles.
53 changes: 53 additions & 0 deletions features/F47-advanced-rbac/SPEC.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
---
id: F47-advanced-rbac
type: feature
summary: "Implementação de controle de acesso baseado em papéis (RBAC) para comandos da CLI"
inputs:
- "Flag --role no comando aignt auth issue"
- "Configuração de papéis e permissões no AuthRegistryStore"
outputs:
- "Tokens vinculados a papéis específicos (admin, operator, viewer)"
- "Erro de permissão (403/Forbidden) ao tentar executar comando não autorizado"
acceptance_criteria:
- "Deve ser possível emitir token com papel específico via CLI (ex: --role viewer)"
- "Token com papel 'viewer' deve conseguir listar/ver runs mas falhar ao tentar submit ou stop"
- "Token com papel 'operator' deve conseguir gerenciar runs e runtime"
- "Token com papel 'admin' deve ter acesso irrestrito"
- "O default para novos tokens deve ser 'admin' para compatibilidade (ou 'operator'? Melhor manter admin por enquanto para não quebrar fluxo existente)"
- "A verificação de permissão deve ocorrer antes da execução da lógica do comando"
non_goals:
- "Interface gráfica para gestão de papéis"
- "Papéis customizados definidos pelo usuário (apenas fixos no código por enquanto)"
- "Hierarquia complexa de herança de permissões"
---

## Contexto

Atualmente, a autenticação no AIgnt OS é binária: ou o cliente possui um token válido e tem acesso total (sujeito apenas a restrições de `initiated_by` para operações de runtime), ou não tem acesso. Com a evolução para um modelo multi-usuário ou multi-agente, é necessário limitar o raio de ação de certos tokens (ex: um dashboard de visualização não deve poder parar o runtime).

## Objetivo

Implementar um sistema de RBAC (Role-Based Access Control) nativo na CLI.

### Mudanças Principais

1. **Modelo de Dados**:
- Estender `AuthRegistryStore` para armazenar o `role` associado a cada `token_hash`.
- Definir enumeração de `AuthRole` (`ADMIN`, `OPERATOR`, `VIEWER`).
- Mapear `AuthRole` para lista de `Permission` (strings como `run:read`, `run:write`, `runtime:manage`, `auth:manage`).

2. **CLI de Auth**:
- Adicionar opção `--role` ao comando `aignt auth issue`.

3. **Enforcement**:
- Atualizar `Authenticator` ou `RequireAuth` para validar permissões exigidas por cada comando.
- Decorar comandos da CLI com `@require_permission(...)` ou similar.

4. **Roles Iniciais**:
- `viewer`: Acesso a `runs list`, `runs show`, `doctor`, `version`.
- `operator`: Acesso a `viewer` + `runs submit`, `runtime start/stop/run`.
- `admin`: Acesso total, incluindo `auth manage`.

### Compatibilidade

- Tokens existentes (sem role definido no JSON) serão tratados como `admin` para não quebrar ambientes em uso.
63 changes: 45 additions & 18 deletions src/aignt_os/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,26 @@
if TYPE_CHECKING:
from aignt_os.config import AppSettings

Role = Literal["viewer", "operator"]
Permission = Literal["runs.submit", "runtime.manage"]

Role = Literal["admin", "operator", "viewer"]
Permission = Literal[
"run:read",
"run:write",
"runtime:manage",
"auth:manage",
]

ROLE_PERMISSIONS: dict[Role, frozenset[Permission]] = {
"viewer": frozenset(),
"operator": frozenset({"runs.submit", "runtime.manage"}),
"admin": frozenset(
{
"run:read",
"run:write",
"runtime:manage",
"auth:manage",
}
),
"operator": frozenset({"run:read", "run:write", "runtime:manage"}),
"viewer": frozenset({"run:read"}),
}


Expand Down Expand Up @@ -57,6 +71,7 @@ class AuthenticatedPrincipal(BaseModel):

principal_id: str = Field(min_length=1)
roles: tuple[Role, ...]
permissions: frozenset[str] = Field(default_factory=frozenset)


@runtime_checkable
Expand Down Expand Up @@ -115,7 +130,7 @@ def write_registry(self, registry: AuthRegistry) -> None:
os.replace(temporary_path, self.path)
os.chmod(self.path, STATE_FILE_MODE)

def initialize_registry(self, *, principal_id: str, role: Role = "operator") -> IssuedAuthToken:
def initialize_registry(self, *, principal_id: str, role: Role = "admin") -> IssuedAuthToken:
if self.path.exists():
raise AuthConfigurationError("Auth registry is already configured.")

Expand Down Expand Up @@ -147,8 +162,11 @@ def issue_token(self, *, principal_id: str, role: Role | None = None) -> IssuedA
resolved_role: Role
if principal is None:
if role is None:
raise ValueError("Role is required when issuing a token for a new principal.")
resolved_role = role
# Default to admin for backward compatibility / ease of use
resolved_role = "admin"
else:
resolved_role = role

registry.principals.append(
AuthPrincipal(principal_id=principal_id, roles=[resolved_role])
)
Expand Down Expand Up @@ -189,22 +207,34 @@ def authenticate(self, token: str) -> AuthenticatedPrincipal | None:
registry = self.load_registry()
token_hash = hash_token(normalized_token)

principal_roles = {
principal.principal_id: principal.roles for principal in registry.principals
}
for token_record in registry.tokens:
if token_record.disabled:
continue
# Constant time comparison
if not hmac.compare_digest(token_record.token_sha256, token_hash):
continue

roles = principal_roles.get(token_record.principal_id)
if roles is None:
# Find principal for this token
principal_record = next(
(p for p in registry.principals if p.principal_id == token_record.principal_id),
None,
)

if principal_record is None:
raise AuthConfigurationError("Auth registry references an unknown principal.")

# Resolve permissions
permissions: set[str] = set()
for role in principal_record.roles:
# Handle legacy or unknown roles gracefully by ignoring them
# or treating them as having no permissions if not in map
if role in ROLE_PERMISSIONS:
permissions.update(ROLE_PERMISSIONS[role])

return AuthenticatedPrincipal(
principal_id=token_record.principal_id,
roles=tuple(roles),
roles=tuple(principal_record.roles),
permissions=frozenset(permissions),
)

return None
Expand Down Expand Up @@ -238,11 +268,8 @@ def hash_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()


def is_authorized(principal: AuthenticatedPrincipal, *, permission: Permission) -> bool:
allowed_permissions: set[Permission] = set()
for role in principal.roles:
allowed_permissions.update(ROLE_PERMISSIONS.get(role, frozenset()))
return permission in allowed_permissions
def is_authorized(principal: AuthenticatedPrincipal, *, permission: str) -> bool:
return permission in principal.permissions


def get_auth_provider(settings: AppSettings) -> AuthProvider:
Expand Down
42 changes: 34 additions & 8 deletions src/aignt_os/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ def _auth_registry_store() -> AuthRegistryStore:

def _validate_role(role: str) -> Role:
normalized = role.strip().lower()
if normalized not in {"viewer", "operator"}:
raise usage_error("role must be one of: viewer, operator.")
if normalized not in {"admin", "operator", "viewer"}:
raise usage_error("role must be one of: admin, operator, viewer.")
return normalized # type: ignore[return-value]


Expand Down Expand Up @@ -435,7 +435,7 @@ def _resolve_principal_id(
@auth_app.command("init")
def auth_init(
principal_id: Annotated[str, typer.Option("--principal-id")] = "",
role: Annotated[str, typer.Option("--role")] = "operator",
role: Annotated[str, typer.Option("--role")] = "admin",
) -> None:
try:
store = _auth_registry_store()
Expand All @@ -461,8 +461,13 @@ def auth_init(
def auth_issue(
principal_id: Annotated[str, typer.Option("--principal-id")] = "",
role: Annotated[str | None, typer.Option("--role")] = None,
auth_token: Annotated[
str | None,
typer.Option("--auth-token", envvar="AIGNT_OS_AUTH_TOKEN"),
] = None,
) -> None:
try:
_resolve_principal_id(permission="auth:manage", auth_token=auth_token)
store = _auth_registry_store()
issued_token = store.issue_token(
principal_id=principal_id.strip(),
Expand All @@ -485,8 +490,13 @@ def auth_issue(
@auth_app.command("disable")
def auth_disable(
token_id: Annotated[str, typer.Option("--token-id")] = "",
auth_token: Annotated[
str | None,
typer.Option("--auth-token", envvar="AIGNT_OS_AUTH_TOKEN"),
] = None,
) -> None:
try:
_resolve_principal_id(permission="auth:manage", auth_token=auth_token)
_auth_registry_store().disable_token(token_id=token_id.strip())
except CLIError as exc:
exit_for_cli_error(exc)
Expand All @@ -509,7 +519,7 @@ def runtime_start(
] = None,
) -> None:
try:
principal_id = _resolve_principal_id(permission="runtime.manage", auth_token=auth_token)
principal_id = _resolve_principal_id(permission="runtime:manage", auth_token=auth_token)
service = _runtime_service()
state = service.start(started_by=principal_id)
except CLIError as exc:
Expand Down Expand Up @@ -547,7 +557,7 @@ def runtime_run(
] = None,
) -> None:
try:
principal_id = _resolve_principal_id(permission="runtime.manage", auth_token=auth_token)
principal_id = _resolve_principal_id(permission="runtime:manage", auth_token=auth_token)
except CLIError as exc:
exit_for_cli_error(exc)

Expand Down Expand Up @@ -600,7 +610,7 @@ def runtime_stop(
] = None,
) -> None:
try:
principal_id = _resolve_principal_id(permission="runtime.manage", auth_token=auth_token)
principal_id = _resolve_principal_id(permission="runtime:manage", auth_token=auth_token)
service = _runtime_service()
state = service.status()
if (
Expand All @@ -626,13 +636,18 @@ def runtime_stop(
def watch(
run_id: str = typer.Argument(..., help="ID of the run to monitor"),
refresh: float = typer.Option(1.0, help="Refresh interval in seconds"),
auth_token: Annotated[
str | None,
typer.Option("--auth-token", envvar="AIGNT_OS_AUTH_TOKEN"),
] = None,
) -> None:
"""
Monitor a run in real-time using a TUI dashboard.
"""
from aignt_os.cli.dashboard import RunDashboard

try:
_resolve_principal_id(permission="run:read", auth_token=auth_token)
repo = _run_repository()
if not repo.get_run(run_id):
typer.echo(f"Error: Run {run_id} not found.", err=True)
Expand All @@ -648,8 +663,14 @@ def watch(


@runs_app.command("list")
def runs_list() -> None:
def runs_list(
auth_token: Annotated[
str | None,
typer.Option("--auth-token", envvar="AIGNT_OS_AUTH_TOKEN"),
] = None,
) -> None:
try:
_resolve_principal_id(permission="run:read", auth_token=auth_token)
repository = _run_repository()
runs = repository.list_runs()
except CLIError as exc:
Expand Down Expand Up @@ -682,7 +703,7 @@ def runs_submit(
] = None,
) -> None:
try:
principal_id = _resolve_principal_id(permission="runs.submit", auth_token=auth_token)
principal_id = _resolve_principal_id(permission="run:write", auth_token=auth_token)
dispatch_service = (
_dispatch_service(initiated_by=principal_id)
if principal_id is not None
Expand Down Expand Up @@ -715,8 +736,13 @@ def runs_submit(
def runs_show(
run_id: str,
preview: Annotated[str | None, typer.Option("--preview")] = None,
auth_token: Annotated[
str | None,
typer.Option("--auth-token", envvar="AIGNT_OS_AUTH_TOKEN"),
] = None,
) -> None:
try:
_resolve_principal_id(permission="run:read", auth_token=auth_token)
repository = _run_repository()
artifact_store = _artifact_store()
run = repository.get_run(run_id)
Expand Down
Loading
Loading