Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
592 changes: 379 additions & 213 deletions README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion kuberoast/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
__all__ = []
__version__ = "0.4.0"
__all__ = ["__version__"]
338 changes: 338 additions & 0 deletions kuberoast/attackpaths/sa_chains.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,338 @@
"""Multi-hop ServiceAccount → ServiceAccount escalation paths.

The existing `rbac_escalation.py` flags principals that have risky 1-hop
capabilities (e.g. "this SA can bind RoleBindings"). Real attackers chain
those primitives: SA1 can create pods → SA1 launches pod as SA2 → SA2
has cluster-admin via existing binding → done.

This module builds a directed escalation graph over principals and
finds shortest paths from any non-admin principal to a target with
cluster-wide write power. Each path becomes one finding with the
intermediate hops spelled out and a per-hop PoC.

Edges considered:

* SA-A can `create pods` in NS where SA-B exists → SA-A can mint a pod
with serviceAccountName: SA-B and inherit its identity.
* SA-A can `create pods/exec` on a pod running as SA-B → exec into it,
read /var/run/secrets/kubernetes.io/serviceaccount/token, become SA-B.
* SA-A can `update`/`patch` deployments/statefulsets/daemonsets/jobs in
NS where SA-B is referenced → mutate template.spec.serviceAccountName,
wait for rollout, inherit SA-B.
* SA-A can `create`/`update` RoleBindings/ClusterRoleBindings → bind
self (or any subject) to any Role, including cluster-admin.
* SA-A can `escalate` on roles → grant self extra permissions in any
existing Role (no need for cluster-admin).
* SA-A can `impersonate` users/groups/serviceaccounts → directly act as
target.
* SA-A can `create serviceaccounts/token` → mint a bound token for any
SA they can name.

Targets we consider terminal:
* cluster-admin via direct binding
* any principal with verbs * resources * (effective cluster-admin)
* any principal with `update nodes` (can move pods, label-spoof for IRSA)
* any principal with `get secrets *` (read every secret in cluster)
"""
from collections import defaultdict, deque
from typing import Dict, List, Set, Tuple, Optional, DefaultDict
from ..utils.findings import Finding


def _principal_id(kind: str, name: str, namespace: Optional[str] = None) -> str:
if kind == "ServiceAccount" and namespace:
return f"sa:{namespace}:{name}"
return f"{kind.lower()}:{name}"


def _has_perm(perms: Set[Tuple[str, str]], verb: str, resource: str) -> bool:
return (
(verb, resource) in perms
or (verb, "*") in perms
or ("*", resource) in perms
or ("*", "*") in perms
)


def _is_terminal_admin(perms: Set[Tuple[str, str]]) -> Tuple[bool, str]:
"""Return (is_admin, reason)."""
if ("*", "*") in perms:
return True, "wildcard verbs on wildcard resources (effective cluster-admin)"
if _has_perm(perms, "get", "secrets") and _has_perm(perms, "list", "secrets"):
# Reading every secret in cluster is game-over-grade.
return True, "can read all secrets (credential exfiltration enables full cluster takeover)"
return False, ""


def _build_principal_perms(roles, croles, rbs, crbs) -> Tuple[
DefaultDict[str, Set[Tuple[str, str]]],
DefaultDict[str, Set[str]],
DefaultDict[str, Set[str]],
]:
"""Return (perms_per_principal, role_keys_per_principal, role_namespaces_per_principal)."""
role_rules: Dict[str, List] = {}
for r in roles:
role_rules[f"Role/{r.metadata.namespace}/{r.metadata.name}"] = list(r.rules or [])
for cr in croles:
role_rules[f"ClusterRole/{cr.metadata.name}"] = list(cr.rules or [])

principal_roles: DefaultDict[str, Set[str]] = defaultdict(set)
principal_namespaces: DefaultDict[str, Set[str]] = defaultdict(set)

def add_binding(b, is_crb=False):
rr = b.role_ref
if is_crb or rr.kind == "ClusterRole":
role_key = f"ClusterRole/{rr.name}"
binding_ns = None
else:
binding_ns = b.metadata.namespace
role_key = f"Role/{binding_ns}/{rr.name}"
for s in (b.subjects or []):
ns = getattr(s, "namespace", None)
pid = _principal_id(s.kind, s.name, ns)
principal_roles[pid].add(role_key)
# track which namespaces a principal has any binding in
if binding_ns:
principal_namespaces[pid].add(binding_ns)
elif ns:
principal_namespaces[pid].add(ns)

for b in rbs:
add_binding(b, is_crb=False)
for b in crbs:
add_binding(b, is_crb=True)

principal_perms: DefaultDict[str, Set[Tuple[str, str]]] = defaultdict(set)
for p, rset in principal_roles.items():
for rk in rset:
for rule in role_rules.get(rk, []):
verbs = set(rule.verbs or [])
resources = set(rule.resources or [])
for v in verbs:
for res in resources:
principal_perms[p].add((v, res))

return principal_perms, principal_roles, principal_namespaces


def _build_edges(
principal_perms: DefaultDict[str, Set[Tuple[str, str]]],
sa_to_pods: Dict[str, Set[Tuple[str, str]]],
all_principals: Set[str],
) -> DefaultDict[str, List[Tuple[str, str, str]]]:
"""Build edges principal → (target_principal, reason, poc_command).

Edges represent: "principal A can become principal B via this primitive."
For broad primitives like 'can create rolebindings' we add an edge to
every other principal because A can always bind to ANY role.
"""
edges: DefaultDict[str, List[Tuple[str, str, str]]] = defaultdict(list)

sa_principals = [p for p in all_principals if p.startswith("sa:")]

for principal, perms in principal_perms.items():
# 1. create pods → become any SA in the same namespace
if _has_perm(perms, "create", "pods"):
ns = principal.split(":", 2)[1] if principal.startswith("sa:") else None
for sa in sa_principals:
if sa == principal:
continue
sa_ns = sa.split(":", 2)[1]
if ns and sa_ns != ns:
continue
edges[principal].append((
sa,
"create a pod with serviceAccountName set to target SA",
f"kubectl run pwn --image=busybox --serviceaccount={sa.split(':')[-1]} -n {sa_ns} -- sleep 999\n"
f"kubectl exec -n {sa_ns} pwn -- cat /var/run/secrets/kubernetes.io/serviceaccount/token",
))

# 2. exec into pods → become the SA that runs the target pod
if _has_perm(perms, "create", "pods/exec") or _has_perm(perms, "get", "pods/exec"):
for sa, pods in sa_to_pods.items():
if sa == principal:
continue
for ns, pod_name in pods:
edges[principal].append((
sa,
f"exec into pod {ns}/{pod_name} which runs as target SA",
f"kubectl exec -n {ns} {pod_name} -- cat /var/run/secrets/kubernetes.io/serviceaccount/token",
))
break # one example pod per SA is enough

# 3. update workload templates → swap serviceAccountName, wait for rollout
if _has_perm(perms, "update", "deployments") or _has_perm(perms, "patch", "deployments"):
for sa in sa_principals:
if sa == principal:
continue
ns = sa.split(":", 2)[1]
sa_name = sa.split(":")[-1]
edges[principal].append((
sa,
f"patch any Deployment in {ns} to use serviceAccountName={sa_name}",
f"kubectl patch deployment <name> -n {ns} -p '{{\"spec\":{{\"template\":{{\"spec\":{{\"serviceAccountName\":\"{sa_name}\"}}}}}}}}'",
))

# 4. create/update RoleBindings → bind self to any role (including cluster-admin)
if (
_has_perm(perms, "create", "rolebindings")
or _has_perm(perms, "create", "clusterrolebindings")
or _has_perm(perms, "update", "rolebindings")
or _has_perm(perms, "update", "clusterrolebindings")
):
# synthetic "cluster-admin" target
edges[principal].append((
"ClusterRole/cluster-admin",
"bind self to cluster-admin via (Cluster)RoleBinding write",
"kubectl create clusterrolebinding pwn --clusterrole=cluster-admin --serviceaccount=<ns>:<sa>",
))

# 4b. CSR cert-minting → issue a system:masters client cert
can_create_csr = _has_perm(perms, "create", "certificatesigningrequests")
can_approve_csr = (
_has_perm(perms, "update", "certificatesigningrequests/approval")
or _has_perm(perms, "patch", "certificatesigningrequests/approval")
or _has_perm(perms, "approve", "signers")
)
if can_create_csr and can_approve_csr:
edges[principal].append((
"ClusterRole/cluster-admin",
"mint a system:masters client certificate via the CSR API",
"openssl req -new -nodes -keyout k -subj '/O=system:masters/CN=pwn' -out r; "
"kubectl create csr ... signerName=kubernetes.io/kube-apiserver-client; "
"kubectl certificate approve pwn",
))

# 5. escalate verb → grant self any verbs on existing roles
if _has_perm(perms, "escalate", "roles") or _has_perm(perms, "escalate", "clusterroles"):
edges[principal].append((
"ClusterRole/cluster-admin",
"use escalate verb to elevate existing role with cluster-admin verbs",
"kubectl patch clusterrole <existing> --type=json -p='[{\"op\":\"add\",\"path\":\"/rules/-\",\"value\":{\"apiGroups\":[\"*\"],\"resources\":[\"*\"],\"verbs\":[\"*\"]}}]'",
))

# 6. impersonate → directly act as target
if _has_perm(perms, "impersonate", "users") or _has_perm(perms, "impersonate", "serviceaccounts") or _has_perm(perms, "impersonate", "groups"):
edges[principal].append((
"*:system:masters",
"impersonate system:masters group (cluster-admin)",
"kubectl --as=system:admin --as-group=system:masters get pods --all-namespaces",
))

# 7. token request abuse — mint token for any SA
if _has_perm(perms, "create", "serviceaccounts/token"):
for sa in sa_principals:
if sa == principal:
continue
ns = sa.split(":", 2)[1]
sa_name = sa.split(":")[-1]
edges[principal].append((
sa,
f"request bound token for {sa} via TokenRequest API",
f"kubectl create token {sa_name} -n {ns}",
))

return edges


def _bfs_paths(edges, start: str, terminals: Set[str], max_depth: int = 4) -> Optional[List[Tuple[str, str, str]]]:
"""Return shortest path from start to any terminal as list of (next_principal, reason, poc)."""
if start in terminals:
return []
visited = {start}
queue = deque([(start, [])]) # (current, path_so_far)
while queue:
current, path = queue.popleft()
if len(path) >= max_depth:
continue
for (nxt, reason, poc) in edges.get(current, []):
if nxt in visited:
continue
new_path = path + [(nxt, reason, poc)]
if nxt in terminals or nxt == "ClusterRole/cluster-admin" or nxt == "*:system:masters":
return new_path
visited.add(nxt)
queue.append((nxt, new_path))
return None


def analyze_sa_chains(roles, croles, rbs, crbs, pods) -> List[Finding]:
"""Discover multi-hop escalation chains and emit one finding per chain."""
findings: List[Finding] = []

principal_perms, _principal_roles, _principal_ns = _build_principal_perms(roles, croles, rbs, crbs)

# SA → pods running it (each pod recorded as (namespace, pod_name))
sa_to_pods: Dict[str, Set[Tuple[str, str]]] = defaultdict(set)
for pod in pods:
ns = pod.metadata.namespace
pname = pod.metadata.name
sa = (pod.spec.service_account_name or "default") if pod.spec else "default"
sa_to_pods[f"sa:{ns}:{sa}"].add((ns, pname))

all_principals = set(principal_perms.keys()) | set(sa_to_pods.keys())

# Find terminal principals (those with effective admin)
terminals: Set[str] = set()
terminal_reasons: Dict[str, str] = {}
for p, perms in principal_perms.items():
is_term, why = _is_terminal_admin(perms)
if is_term:
terminals.add(p)
terminal_reasons[p] = why

edges = _build_edges(principal_perms, sa_to_pods, all_principals)

# For each non-terminal principal, BFS for a chain to a terminal
for start in all_principals:
if start in terminals:
continue
# Skip system principals that are *expected* to have admin reach
if start.startswith("sa:kube-system:") or start in ("user:system:admin", "group:system:masters"):
continue
path = _bfs_paths(edges, start, terminals, max_depth=4)
if path is None or not path:
continue
# Build a human-readable chain
hops = [f"{start}"]
steps = []
for (next_p, reason, poc) in path:
hops.append(next_p)
steps.append(f" - {reason}\n PoC: {poc}")
terminal = path[-1][0]
terminal_reason = terminal_reasons.get(terminal, "cluster-admin via binding")
hop_count = len(path)
sev = "critical" if hop_count <= 2 else "high"
chain_str = " → ".join(hops)
steps_str = "\n".join(steps)

# Use a finding ID that's deterministic per start principal so re-runs are stable
findings.append(Finding(
id="AP-CHAIN",
title=f"Escalation chain ({hop_count} hop): {start} → cluster-admin",
description=(
f"Principal {start} can reach cluster-admin in {hop_count} step(s).\n\n"
f"Chain: {chain_str}\n\n"
f"Steps:\n{steps_str}\n\n"
f"Terminal capability: {terminal_reason}"
),
severity=sev,
category="AttackPath",
resource=start,
exploitability="trivial" if hop_count == 1 else "easy" if hop_count == 2 else "moderate",
exploit=steps[0].strip() if steps else None,
impact=f"Full cluster compromise via {hop_count}-hop escalation. Pods running this principal effectively run as cluster-admin.",
attack_techniques=["T1078.004", "T1098.003"],
remediation=(
"Break the chain at the earliest hop. Common fixes: remove `create pods` and "
"`pods/exec` from non-admin SAs; never grant `bind`/`escalate`/`impersonate`; "
"deny `update` on workload templates that reference privileged SAs."
),
references=[
"https://attack.mitre.org/techniques/T1098/003/",
"https://kubernetes.io/docs/reference/access-authn-authz/rbac/",
"https://raesene.github.io/blog/2021/01/16/Getting-Into-A-Bind-with-Kubernetes/",
],
))

return findings
Loading