Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions ooniapi/services/ooniprobe/src/ooniprobe/prio.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,20 @@
```
"""

from functools import lru_cache
import random
from typing import Annotated, Dict, List, Tuple
import logging
from typing import List, Tuple

import sqlalchemy as sa
from clickhouse_driver import Client as Clickhouse

from fastapi import Depends
from pydantic import BaseModel

from .common.clickhouse_utils import query_click
from .common.metrics import timer
from .dependencies import ClickhouseDep

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -118,7 +124,7 @@ def fetch_prioritization_rules(clickhouse_db: Clickhouse, cc: str) -> tuple:
def generate_test_list(
clickhouse: Clickhouse,
country_code: str,
category_codes: List,
category_codes: List[str] | None,
probe_asn: int,
limit: int,
debug: bool,
Expand Down Expand Up @@ -151,7 +157,69 @@ def generate_test_list(
out.append(i)
if len(out) >= limit:
break

if debug:
return out, entries, prio_rules
return out, (), ()


class CTZ(BaseModel):
url: str
category_code: str


def failover_fetch_citizenlab_data(clickhouse: Clickhouse) -> Dict[str, List[CTZ]]:
"""
Fetches the citizenlab table from the database.
Used only once at startime for failover.
"""

log.info("Started failover_fetch_citizenlab_data")

sql = """SELECT category_code, url
FROM citizenlab
WHERE cc = 'ZZ'
"""

out: Dict[str, List[CTZ]] = {}
query = query_click(clickhouse, sql, {}, query_prio=1)
for e in query:
catcode = e["category_code"]
c = CTZ(url=e["url"], category_code=catcode)
out.setdefault(catcode, []).append(c)

log.info("Fetch done: %d" % len(out))
return out


@lru_cache
def failover_test_lists_cache(clickhouse: ClickhouseDep):
return failover_fetch_citizenlab_data(clickhouse)


FailoverTestListDep = Annotated[
Dict[str, List[CTZ]], Depends(failover_test_lists_cache)
]


def failover_generate_test_list(
failover_test_items: Dict[str, List[CTZ]],
category_codes: List[str] | None,
limit: int,
):
if not category_codes:
category_codes = list(failover_test_items.keys())

candidates: List[CTZ] = []
for catcode in category_codes:
if catcode not in failover_test_items:
continue
new = failover_test_items[catcode]
candidates.extend(new)

limit = min(limit, len(candidates))
selected = random.sample(candidates, k=limit)
out = [
dict(category_code=entry.category_code, url=entry.url, country_code="XX")
for entry in selected
]
return out
14 changes: 10 additions & 4 deletions ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ async def receive_measurement(
data = zstd.decompress(data)
ratio = len(data) / len(data)
log.debug(f"Zstd compression ratio {ratio}")
except Exception as e:
except Exception:
log.info("Failed zstd decompression")
error("Incorrect format")

Expand Down Expand Up @@ -219,7 +219,7 @@ async def receive_measurement(
log.error(
f"[Try {t+1}/{N_RETRIES}] Error trying to send measurement to the fastpath. Error: {exc}"
)
sleep_time = random.uniform(0, min(3, 0.3 * 2 ** t))
sleep_time = random.uniform(0, min(3, 0.3 * 2**t))
await asyncio.sleep(sleep_time)

Metrics.SEND_FASTPATH_FAILURE.inc()
Expand Down Expand Up @@ -271,8 +271,14 @@ def compare_probe_msmt_cc_asn(
if db_probe_cc == cc and db_asn == asn:
Metrics.PROBE_CC_ASN_MATCH.inc()
elif db_probe_cc != cc:
Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="cc", reported=cc, detected=db_probe_cc).inc()
log.error(f"db_cc != cc: {db_probe_cc} != {cc}")
Metrics.PROBE_CC_ASN_NO_MATCH.labels(
mismatch="cc", reported=cc, detected=db_probe_cc
).inc()
elif db_asn != asn:
Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="asn", reported=asn, detected=db_asn).inc()
log.error(f"db_asn != asn: {db_asn} != {asn}")
Metrics.PROBE_CC_ASN_NO_MATCH.labels(
mismatch="asn", reported=asn, detected=db_asn
).inc()
except Exception:
pass
110 changes: 102 additions & 8 deletions ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import logging
import random
import time
from datetime import datetime, timezone, timedelta
from typing import List, Optional, Any, Dict, Tuple
import time
from typing import Annotated, List, Optional, Any, Dict, Tuple
import random

import geoip2
import geoip2.errors
from fastapi import APIRouter, HTTPException, Response, Request
from fastapi import APIRouter, HTTPException, Query, Response, Request
from prometheus_client import Counter, Info, Gauge
from pydantic import Field, IPvAnyAddress

Expand All @@ -26,7 +26,8 @@
lookup_probe_cc,
lookup_probe_network,
)
from ...prio import generate_test_list
from ...common.utils import setcacheresponse
from ...prio import FailoverTestListDep, failover_generate_test_list, generate_test_list

router = APIRouter(prefix="/v1")

Expand Down Expand Up @@ -67,6 +68,10 @@ class Metrics:
"geoip_asn_differs", "There's a mismatch between reported ASN and observed ASN"
)

TEST_LIST_URLS_COUNT = Gauge(
"test_list_urls_count", "How many urls were generated for a test list"
)


class ProbeLogin(BaseModel):
# Allow None username and password
Expand Down Expand Up @@ -595,6 +600,96 @@ def random_web_test_helpers(th_list: List[str]) -> List[Dict]:
return out


class TestListUrlsMeta(BaseModel):
count: int
current_page: int
limit: int
next_url: str
pages: int


class TestListUrlsResult(BaseModel):
category_code: str
country_code: str
url: str


class TestListUrlsResponse(BaseModel):
"""
URL test list
"""

metadata: TestListUrlsMeta
results: List[TestListUrlsResult]


@router.get("/test-list/urls")
def list_test_urls(
clickhouse: ClickhouseDep,
failover_test_items: FailoverTestListDep,
response: Response,
category_codes: Annotated[
str | None,
Query(
description="Comma separated list of URL categories, all uppercase",
pattern=r"[A-Z,]*",
),
] = None,
country_code: Annotated[
str,
Query(
description="Two letter, uppercase country code",
min_length=2,
max_length=2,
alias="probe_cc",
),
] = "ZZ",
limit: Annotated[
int, Query(description="Maximum number of URLs to return", le=9999)
] = -1,
debug: Annotated[
bool,
Query(
description="Include measurement counts and priority",
),
] = False,
) -> TestListUrlsResponse | Dict[str, Any]:
"""
Generate test URL list with prioritization
"""
try:
country_code = country_code.upper()
category_codes_list = category_codes.split(",") if category_codes else None
if limit == -1:
limit = 9999
except Exception as e:
log.error(e, exc_info=True)
setnocacheresponse(response)
return {}

try:
test_items, _1, _2 = generate_test_list(
clickhouse, country_code, category_codes_list, 0, limit, debug
)
except Exception as e:
log.error(e, exc_info=True)
# failover_generate_test_list runs without any database interaction
test_items = failover_generate_test_list(
failover_test_items, category_codes_list, limit
)

# TODO: remove current_page / next_url / pages ?
Metrics.TEST_LIST_URLS_COUNT.set(len(test_items))
out = TestListUrlsResponse(
metadata=TestListUrlsMeta(
count=len(test_items), current_page=-1, limit=-1, next_url="", pages=1
),
results=[TestListUrlsResult(**item) for item in test_items],
)
setcacheresponse("1s", response)
return out


class GeoLookupResult(BaseModel):
cc: str = Field(description="Country Code")
asn: str = Field(description="Autonomous System Number (ASN)")
Expand All @@ -606,7 +701,7 @@ class GeoLookupRequest(BaseModel):


class GeoLookupResponse(BaseModel):
v: int = Field(description="response format version", default="1")
v: int = Field(description="response format version", default=1)
geolocation: Dict[IPvAnyAddress, GeoLookupResult] = Field(description="Dict of IP addresses to GeoLookupResult")


Expand Down Expand Up @@ -641,7 +736,6 @@ class CollectorEntry(BaseModel):
front: Optional[str] = Field(default=None, description="Fronted domain")
type: Optional[str] = Field(default=None, description="Type of collector")


@router.get("/collectors", tags=["ooniprobe"])
def list_collectors(
settings: SettingsDep,
Expand Down Expand Up @@ -669,7 +763,7 @@ def list_tor_targets(
) -> Dict[str, TorTarget]:

token = request.headers.get("Authorization")
if token == None:
if token is None:
# XXX not actually validated
pass

Expand Down
13 changes: 6 additions & 7 deletions ooniapi/services/ooniprobe/src/ooniprobe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@

import itertools
import logging
from typing import List, TypedDict, Tuple
import io

from fastapi import Request
from mypy_boto3_s3 import S3Client
from sqlalchemy.orm import Session
import pem
from base64 import b64encode
from datetime import datetime, timezone
from os import urandom
from typing import List, TypedDict, Tuple

import httpx
import pem
from fastapi import Request
from mypy_boto3_s3 import S3Client
from sqlalchemy.orm import Session

from .common.config import Settings
from .dependencies import CCReaderDep, ASNReaderDep
Expand Down Expand Up @@ -148,7 +149,6 @@ def lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, st
resp.autonomous_system_organization or "0",
)


def get_first_ip(headers: str) -> str:
"""
parse the first ip from a comma-separated list of ips encoded as a string
Expand All @@ -159,7 +159,6 @@ def get_first_ip(headers: str) -> str:
"""
return headers.partition(',')[0]


def read_file(s3_client : S3Client, bucket: str, file : str) -> str:
"""
Reads the content of `file` within `bucket` into a string
Expand Down
Loading
Loading