Skip to content

Commit f857060

Browse files
authored
Merge pull request #163 from IFRCGo/feat/cql2-filtering
2 parents 771d81e + b4029a9 commit f857060

10 files changed

Lines changed: 383 additions & 138 deletions

File tree

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ repos:
55
- id: trailing-whitespace
66
- id: end-of-file-fixer
77
- id: check-yaml
8+
exclude: ^applications/.*/internal/
89
- id: check-case-conflict
910
- id: detect-private-key
1011

applications/argocd/staging/applications/montandon-eoapi/application.yaml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@ spec:
105105
- path: applications/argocd/staging/applications/montandon-eoapi/internal/
106106
targetRevision: develop
107107
repoURL: https://github.com/IFRCGo/go-deploy.git
108+
helm:
109+
valuesObject:
110+
azure:
111+
clientID: 9b1f12a8-4ae9-4281-afa9-948451f77dce
112+
secretProviderClass:
113+
enabled: true
114+
keyvaultName: montandon-eoapi-staging
108115

109116
- repoURL: https://github.com/developmentseed/stac-auth-proxy.git
110117
targetRevision: v0.9.2
@@ -117,6 +124,8 @@ spec:
117124
OIDC_DISCOVERY_URL: "https://goadmin-stage.ifrc.org/o/.well-known/openid-configuration"
118125
OVERRIDE_HOST: "0"
119126
ROOT_PATH: "/stac"
127+
COLLECTIONS_FILTER_CLS: stac_auth_proxy.montandon_filters:CollectionsFilter
128+
ITEMS_FILTER_CLS: stac_auth_proxy.montandon_filters:ItemsFilter
120129
ingress:
121130
enabled: "true"
122131
host: "montandon-eoapi-stage.ifrc.org"
@@ -125,7 +134,15 @@ spec:
125134
enabled: "true"
126135
secretName: "montandon-eoapi-helm-secret-cert"
127136
replicaCount: 1
128-
137+
extraVolumes:
138+
- name: filters
139+
configMap:
140+
name: stac-auth-proxy-filters
141+
extraVolumeMounts:
142+
- name: filters
143+
mountPath: /app/src/stac_auth_proxy/montandon_filters.py
144+
subPath: montandon_filters.py
145+
readOnly: true
129146
destination:
130147
server: https://kubernetes.default.svc
131148
namespace: montandon-eoapi
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
apiVersion: v2
2+
name: montandon-eoapi-extra-manifests
3+
description: Montandon eoAPI extra manifests
4+
type: application
5+
6+
version: 0.1.0
7+
appVersion: "1.0"
Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
"""
2+
CQL2 filter factories.
3+
4+
These classes will be initialized at the startup of the STAC Auth Proxy service and will
5+
be called for each request to collections/items endpoints in order to generate CQL2
6+
filters based on the JWT permissions.
7+
8+
docs: https://developmentseed.org/stac-auth-proxy/user-guide/record-level-auth/
9+
"""
10+
11+
import asyncio
12+
import dataclasses
13+
import os
14+
import time
15+
import logging
16+
from typing import Any, Literal, Optional, Sequence
17+
18+
import httpx
19+
20+
logger = logging.getLogger(__name__)
21+
22+
if not (UPSTREAM_URL := os.environ.get("UPSTREAM_URL")):
23+
raise ValueError("Failed to retrieve upstream URL")
24+
25+
26+
def cql2_in_query(
27+
variable: Literal["collection", "id"], collection_ids: Sequence[str]
28+
) -> str:
29+
"""
30+
Generate CQL2 query to see if value of variable matches any element of sequence of
31+
strings. Due to CQL2 syntax ambiguities around single element arrays with the "in"
32+
operator, we use a direct comparison when there's only one permitted collection.
33+
"""
34+
if not collection_ids:
35+
return "1=0"
36+
37+
if len(collection_ids) == 1:
38+
return f"{variable} = " + repr(list(collection_ids)[0])
39+
40+
return f"{variable} IN ({','.join(repr(c_id) for c_id in collection_ids)})"
41+
42+
43+
@dataclasses.dataclass
44+
class CollectionsFilter:
45+
"""
46+
CQL2 filter factory for collections based on JWT permissions.
47+
"""
48+
49+
collections_claim: str = "collections" # JWT claim with allowed collection IDs
50+
admin_claim: str = "superuser" # JWT claim indicating superuser status
51+
public_collections_filter: str = "(private IS NULL OR private = false)"
52+
53+
async def __call__(self, context: dict[str, Any]) -> str:
54+
jwt_payload: Optional[dict[str, Any]] = context.get("payload")
55+
56+
# Anonymous: no data
57+
if not jwt_payload:
58+
logger.debug("Anonymous user, no collections permitted to be viewed")
59+
return "1=0"
60+
61+
# Superuser: all data
62+
if jwt_payload.get(self.admin_claim) == "true":
63+
logger.debug(
64+
f"Superuser detected for sub {jwt_payload.get('sub')}, "
65+
"no filter applied for collections"
66+
)
67+
return "1=1" # No filter for superusers
68+
69+
# Authenticated user: Allowed to access collections mentioned in JWT
70+
permitted_collections = jwt_payload.get(self.collections_claim, [])
71+
return " OR ".join(
72+
[
73+
self.public_collections_filter,
74+
cql2_in_query("id", permitted_collections),
75+
]
76+
)
77+
78+
79+
@dataclasses.dataclass
80+
class ItemsFilter:
81+
"""
82+
CQL2 filter factory for items based on JWT permissions.
83+
"""
84+
85+
collections_claim: str = "collections" # JWT claim with allowed collection IDs
86+
admin_claim: str = "superuser" # JWT claim indicating superuser status
87+
public_collections_filter: str = "(private IS NULL OR private = false)"
88+
89+
cache_ttl: int = 30 # TTL for caching public collections, in seconds
90+
_client: httpx.AsyncClient = dataclasses.field(
91+
init=False,
92+
repr=False,
93+
default_factory=lambda: httpx.AsyncClient(base_url=UPSTREAM_URL),
94+
)
95+
_public_collections_cache: Optional[list[str]] = dataclasses.field(
96+
init=False, default=None, repr=False
97+
)
98+
_cache_expiry: float = dataclasses.field(init=False, default=0, repr=False)
99+
_cache_lock: asyncio.Lock = dataclasses.field(
100+
init=False, repr=False, default_factory=asyncio.Lock
101+
)
102+
103+
@property
104+
def _cached_public_collections(self) -> Optional[list[str]]:
105+
"""Return cached public collections if still valid, otherwise None."""
106+
if time.time() < self._cache_expiry:
107+
return self._public_collections_cache
108+
return None
109+
110+
@_cached_public_collections.setter
111+
def _cached_public_collections(self, value: list[str]) -> None:
112+
"""Set the cache with a new value and expiry time."""
113+
self._public_collections_cache = value
114+
self._cache_expiry = time.time() + self.cache_ttl
115+
116+
async def _get_public_collections_ids(self) -> list[str]:
117+
"""
118+
Retrieve IDs of public collections from the upstream API.
119+
Uses a lock to prevent concurrent requests from fetching the same data.
120+
"""
121+
# Return cached value if still valid (fast path without lock)
122+
if (cached := self._cached_public_collections) is not None:
123+
logger.debug("Using cached public collections")
124+
return cached
125+
126+
# Acquire lock to prevent concurrent fetches
127+
async with self._cache_lock:
128+
# Double-check cache after acquiring lock
129+
# Another coroutine might have populated it while we waited
130+
if (cached := self._cached_public_collections) is not None:
131+
logger.debug("Using cached public collections (after lock)")
132+
return cached
133+
134+
logger.debug("Fetching public collections from upstream API")
135+
136+
# First request uses params dict
137+
url: Optional[str] = "/collections"
138+
params: Optional[dict[str, Any]] = {
139+
"filter": self.public_collections_filter,
140+
"limit": 100,
141+
}
142+
143+
ids = []
144+
while url:
145+
try:
146+
response = await self._client.get(url, params=params)
147+
response.raise_for_status()
148+
data = response.json()
149+
except httpx.HTTPError:
150+
logger.exception(f"Failed to fetch {url!r}.")
151+
raise
152+
ids.extend(collection["id"] for collection in data["collections"])
153+
154+
# Subsequent requests use the "next" link URL directly (already has params)
155+
url = next(
156+
(link["href"] for link in data["links"] if link["rel"] == "next"),
157+
None,
158+
)
159+
params = None # Clear params after first request
160+
161+
# Update cache
162+
self._cached_public_collections = ids
163+
return ids
164+
165+
async def __call__(self, context: dict[str, Any]) -> str:
166+
jwt_payload: Optional[dict[str, Any]] = context.get("payload")
167+
168+
# Anonymous: no data
169+
if not jwt_payload:
170+
logger.debug("Anonymous user, no items permitted to be viewed")
171+
return "1=0"
172+
173+
# Superuser: all data
174+
if jwt_payload.get(self.admin_claim) == "true":
175+
logger.debug(
176+
f"Superuser detected for sub {jwt_payload.get('sub')}, "
177+
"no filter applied for items"
178+
)
179+
return "1=1"
180+
181+
# Everyone: Allowed access to items in public collections
182+
try:
183+
permitted_collections = set(await self._get_public_collections_ids())
184+
except httpx.HTTPError:
185+
logger.warning("Failed to fetch public collections.")
186+
permitted_collections = set()
187+
188+
# Authenticated user: Allowed to access items in collections mentioned in JWT
189+
if jwt_payload:
190+
permitted_collections.update(jwt_payload.get(self.collections_claim, []))
191+
192+
return cql2_in_query("collection", permitted_collections)
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
{
2+
"$schema": "https://json-schema.org/draft/2019-09/schema",
3+
"$id": "https://example.com/stac/queryables",
4+
"type": "object",
5+
"title": "Queryables for Monty STAC API",
6+
"description": "Queryable names for the Monty STAC API",
7+
"properties": {
8+
"id": {
9+
"description": "Item identifier",
10+
"type": "string"
11+
},
12+
"collection": {
13+
"description": "Collection identifier",
14+
"type": "string"
15+
},
16+
"datetime": {
17+
"description": "Datetime",
18+
"type": "string",
19+
"format": "date-time"
20+
},
21+
"geometry": {
22+
"description": "Geometry",
23+
"type": "object"
24+
},
25+
"monty:episode_number": {
26+
"description": "The episode number of the event (deprecated)",
27+
"type": "integer"
28+
},
29+
"monty:country_codes": {
30+
"description": "The country codes of the countries affected by the event, hazard, impact or response",
31+
"type": "array",
32+
"items": {
33+
"type": "string",
34+
"pattern": "^([A-Z]{3})|AB9$"
35+
}
36+
},
37+
"monty:corr_id": {
38+
"description": "The unique identifier assigned by the Monty system to the reference event",
39+
"type": "string"
40+
},
41+
"monty:hazard_codes": {
42+
"description": "The hazard codes of the hazards affecting the event",
43+
"type": "array",
44+
"items": {
45+
"type": "string",
46+
"pattern": "^([A-Z]{2}(?:\\d{4}$){0,1})|([a-z]{3}-[a-z]{3}-[a-z]{3}-[a-z]{3})|([A-Z]{2})$"
47+
}
48+
},
49+
"roles": {
50+
"description": "The roles of the item",
51+
"type": "array",
52+
"items": {
53+
"type": "string",
54+
"enum": ["event", "reference", "source", "hazard", "impact", "response"]
55+
}
56+
},
57+
"monty:hazard_detail.cluster": {
58+
"description": "The cluster of the hazard (deprecated)",
59+
"type": "string"
60+
},
61+
"monty:hazard_detail.severity_value": {
62+
"description": "The estimated maximum hazard intensity/magnitude/severity value",
63+
"type": "number"
64+
},
65+
"monty:hazard_detail.severity_unit": {
66+
"description": "The unit of the severity value",
67+
"type": "string"
68+
},
69+
"monty:hazard_detail.estimate_type": {
70+
"description": "The type of the estimate",
71+
"type": "string",
72+
"enum": ["primary", "secondary", "modelled"]
73+
},
74+
"monty:impact_detail.category": {
75+
"description": "The category of impact",
76+
"type": "string",
77+
"enum": [
78+
"people", "crops", "women", "men", "children_0_4", "children_5_9",
79+
"children_10_14", "children_15_19", "adult_20_24", "adult_25_29",
80+
"adult_30_34", "adult_35_39", "adult_40_44", "adult_45_49",
81+
"adult_50_54", "adult_55_59", "adult_60_64", "elderly",
82+
"wheelchair_users", "roads", "railways", "vulnerable_employment",
83+
"buildings", "reconstruction_costs", "hospitals", "schools",
84+
"local_currency", "global_currency", "local_currency_adj",
85+
"global_currency_adj", "usd_uncertain", "cattle", "aid_general",
86+
"ifrc_contribution", "ifrc_requested", "alertscore", "households"
87+
]
88+
},
89+
"monty:impact_detail.type": {
90+
"description": "The estimated value type of the impact",
91+
"type": "string",
92+
"enum": [
93+
"unspecified", "unaffected", "damaged", "destroyed", "potentially_damaged",
94+
"affected_total", "affected_direct", "affected_indirect", "death",
95+
"missing", "injured", "evacuated", "relocated", "assisted",
96+
"shelter_emergency", "shelter_temporary", "shelter_longterm", "in_need",
97+
"targeted", "disrupted", "cost", "homeless", "displaced_internal",
98+
"displaced_external", "displaced_total", "alertscore", "potentially_affected",
99+
"highest_risk"
100+
]
101+
},
102+
"monty:impact_detail.value": {
103+
"description": "The estimated impact value",
104+
"type": "number"
105+
},
106+
"monty:impact_detail.unit": {
107+
"description": "The units of the impact estimate",
108+
"type": "string"
109+
},
110+
"monty:impact_detail.estimate_type": {
111+
"description": "The type of the estimate",
112+
"type": "string",
113+
"enum": ["primary", "secondary", "modelled"]
114+
},
115+
"monty:impact_detail.description": {
116+
"description": "The description of the impact",
117+
"type": "string"
118+
}
119+
},
120+
"additionalProperties": true
121+
}

0 commit comments

Comments
 (0)