Skip to content
Open
2 changes: 1 addition & 1 deletion tests/appsec/api_security/test_api_security_rc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

def get_schema(request: HttpResponse, address: str):
"""Get api security schema from spans"""
for _, _, span in interfaces.library.get_spans(request):
for _, _, span, _ in interfaces.library.get_spans(request):
meta = span.get("meta", {})
key = "_dd.appsec.s." + address
payload = meta.get(key)
Expand Down
2 changes: 1 addition & 1 deletion tests/appsec/api_security/test_apisec_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

def get_schema(request: HttpResponse, address: str):
"""Get api security schema from spans"""
for _, _, span in interfaces.library.get_spans(request):
for _, _, span, _ in interfaces.library.get_spans(request):
meta = span.get("meta", {})
payload = meta.get("_dd.appsec.s." + address)
if payload is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

def get_schema(request: HttpResponse, address: str):
"""Get api security schema from spans"""
for _, _, span in interfaces.library.get_spans(request):
for _, _, span, _ in interfaces.library.get_spans(request):
meta = span.get("meta", {})
key = "_dd.appsec.s." + address
payload = meta.get(key)
Expand Down
7 changes: 4 additions & 3 deletions tests/appsec/api_security/test_endpoint_fallback.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

def get_schema(request: HttpResponse, address: str):
"""Get api security schema from spans"""
for _, _, span in interfaces.library.get_spans(request):
for _, _, span, _ in interfaces.library.get_spans(request):
meta = span.get("meta", {})
payload = meta.get("_dd.appsec.s." + address)
if payload is not None:
Expand All @@ -26,8 +26,9 @@ def get_schema(request: HttpResponse, address: str):

def get_span_meta(request: HttpResponse, key: str):
"""Get a specific meta value from the root span"""
span = interfaces.library.get_root_span(request)
return span.get("meta", {}).get(key)
span, span_format = interfaces.library.get_root_span(request)
meta = interfaces.library.get_span_meta(span, span_format)
return meta.get(key)


@rfc("https://docs.google.com/document/d/1GnWwiaw6dkVtgn5f1wcHJETND_Svqd-sJl6FSVVuCkI")
Expand Down
4 changes: 2 additions & 2 deletions tests/appsec/api_security/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

def get_schema(request: HttpResponse, address: str):
"""Get api security schema from spans"""
span = interfaces.library.get_root_span(request)
meta = span.get("meta", {})
span, span_format = interfaces.library.get_root_span(request)
meta = interfaces.library.get_span_meta(span, span_format)
key = "_dd.appsec.s." + address
if key not in meta:
logger.info(f"Schema not found in span meta for {key}")
Expand Down
6 changes: 3 additions & 3 deletions tests/appsec/iast/test_vulnerability_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ def test_vulnerability_schema(self):
with open(schema_path, "r") as f:
schema = json.load(f)
validator = jsonschema.Draft7Validator(schema)
spans = [s for _, s in interfaces.library.get_root_spans()]
for span in spans:
meta = span.get("meta", {})
spans_with_format = list(interfaces.library.get_root_spans())
for _, span, span_format in spans_with_format:
meta = interfaces.library.get_span_meta(span, span_format)
if "_dd.iast.json" not in meta:
continue
iast_data = meta["_dd.iast.json"]
Expand Down
51 changes: 29 additions & 22 deletions tests/appsec/iast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def _get_expectation(d: str | dict | None) -> str | None:


def _get_span_meta(request: HttpResponse):
span = interfaces.library.get_root_span(request)
meta = span.get("meta", {})
span, span_format = interfaces.library.get_root_span(request)
meta = interfaces.library.get_span_meta(span, span_format)
meta_struct = span.get("meta_struct", {})
return meta, meta_struct

Expand Down Expand Up @@ -56,8 +56,9 @@ def assert_iast_vulnerability(
def assert_metric(request: HttpResponse, metric: str, *, expected: bool) -> None:
spans_checked = 0
metric_available = False
for _, __, span in interfaces.library.get_spans(request):
if metric in span["metrics"]:
for _, __, span, span_format in interfaces.library.get_spans(request):
metrics = interfaces.library.get_span_metrics(span, span_format)
if metric in metrics:
metric_available = True
spans_checked += 1
assert spans_checked == 1
Expand All @@ -77,13 +78,13 @@ def _check_telemetry_response_from_agent():


def get_all_iast_events() -> list:
spans = [span[2] for span in interfaces.library.get_spans()]
assert spans, "No spans found"
spans_meta = [span.get("meta") for span in spans if span.get("meta")]
spans_meta_struct = [span.get("meta_struct") for span in spans if span.get("meta_struct")]
spans_with_format = [(span, span_format) for _, _, span, span_format in interfaces.library.get_spans()]
assert spans_with_format, "No spans found"
spans_meta = [interfaces.library.get_span_meta(span, span_format) for span, span_format in spans_with_format]
spans_meta_struct = [span.get("meta_struct") for span, _ in spans_with_format if span.get("meta_struct")]
assert spans_meta or spans_meta_struct, "No spans meta found"
iast_events = [meta.get("_dd.iast.json") for meta in spans_meta if meta.get("_dd.iast.json")]
iast_events += [metastruct.get("iast") for metastruct in spans_meta_struct if metastruct.get("iast")]
iast_events = [meta.get("_dd.iast.json") for meta in spans_meta if meta and meta.get("_dd.iast.json")]
iast_events += [metastruct.get("iast") for metastruct in spans_meta_struct if metastruct and metastruct.get("iast")]
assert iast_events, "No iast events found"

return iast_events
Expand Down Expand Up @@ -198,8 +199,8 @@ def assert_no_iast_event(request: HttpResponse, tested_vulnerability_type: str |


def validate_stack_traces(request: HttpResponse) -> None:
span = interfaces.library.get_root_span(request)
meta = span.get("meta", {})
span, span_format = interfaces.library.get_root_span(request)
meta = interfaces.library.get_span_meta(span, span_format)
meta_struct = span.get("meta_struct", {})
iast = meta.get("_dd.iast.json") or meta_struct.get("iast")
assert iast is not None, "No iast event in root span"
Expand Down Expand Up @@ -289,9 +290,12 @@ def validate_stack_traces(request: HttpResponse) -> None:
def validate_extended_location_data(
request: HttpResponse, vulnerability_type: str | None, *, is_expected_location_required: bool = True
) -> None:
span = interfaces.library.get_root_span(request)
iast = span.get("meta", {}).get("_dd.iast.json") or span.get("meta_struct", {}).get("iast")
assert iast, f"Expected at least one vulnerability in span {span.get('span_id')}"
span, span_format = interfaces.library.get_root_span(request)
meta = interfaces.library.get_span_meta(span, span_format)
meta_struct = span.get("meta_struct", {})
iast = meta.get("_dd.iast.json") or meta_struct.get("iast")
span_id = interfaces.library.get_span_span_id(span, span_format)
assert iast, f"Expected at least one vulnerability in span {span_id}"
assert iast["vulnerabilities"], f"Expected at least one vulnerability: {iast['vulnerabilities']}"

# Filter by vulnerability
Expand Down Expand Up @@ -321,8 +325,8 @@ def validate_extended_location_data(
if context.library.name not in ("python", "nodejs"):
assert all(field in location for field in ["class", "method"])
else:
assert "vulnerability" in span["meta_struct"]["_dd.stack"], "'vulnerability' not found in '_dd.stack'"
stack_traces = span["meta_struct"]["_dd.stack"]["vulnerability"]
assert "vulnerability" in meta_struct["_dd.stack"], "'vulnerability' not found in '_dd.stack'"
stack_traces = meta_struct["_dd.stack"]["vulnerability"]
assert stack_traces, "No vulnerability stack traces found"
stack_traces = [s for s in stack_traces if s.get("id") == stack_id]
assert stack_traces, f"No vulnerability stack trace found for id {stack_id}"
Expand Down Expand Up @@ -364,16 +368,19 @@ def _norm(s: str | None) -> str | None:


def get_hardcoded_vulnerabilities(vulnerability_type: str, request: HttpResponse | None = None) -> list:
spans = [s for _, s in interfaces.library.get_root_spans(request=request)]
assert spans, "No spans found"
spans_meta = [span.get("meta") for span in spans]
spans_with_format = [
(span, span_format) for _, span, span_format in interfaces.library.get_root_spans(request=request)
]
assert spans_with_format, "No spans found"
spans_meta = [interfaces.library.get_span_meta(span, span_format) for span, span_format in spans_with_format]
assert spans_meta, "No spans meta found"
iast_events = [meta.get("_dd.iast.json") for meta in spans_meta if meta.get("_dd.iast.json")]
iast_events = [meta.get("_dd.iast.json") for meta in spans_meta if meta and meta.get("_dd.iast.json")]
assert iast_events, "No iast events found"

vulnerabilities: list = []
for event in iast_events:
vulnerabilities.extend(event.get("vulnerabilities", []))
if event:
vulnerabilities.extend(event.get("vulnerabilities", []))

assert vulnerabilities, "No vulnerabilities found"

Expand Down
2 changes: 1 addition & 1 deletion tests/appsec/rasp/test_api10.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def test_api10_redirect(self):
assert self.r.status_code == 200
# interfaces.library.validate_one_span(self.r, validator=self.validate)
interfaces.library.validate_one_span(self.r, validator=self.validate_metric)
for _, _trace, span in interfaces.library.get_spans(request=self.r):
for _, _trace, span, _ in interfaces.library.get_spans(request=self.r):
meta = span.get("meta", {})
assert isinstance(meta.get("appsec.api.redirection.move_target", None), str), f"missing tag in {meta}"
assert "/redirect?totalRedirects=2" in meta["appsec.api.redirection.move_target"]
6 changes: 3 additions & 3 deletions tests/appsec/rasp/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ def validate_span_tags(
request: HttpResponse, expected_meta: Sequence[str] = (), expected_metrics: Sequence[str] = ()
) -> None:
"""Validate RASP span tags are added when an event is generated"""
span = interfaces.library.get_root_span(request)
meta = span["meta"]
span, span_format = interfaces.library.get_root_span(request)
meta = interfaces.library.get_span_meta(span, span_format)
for m in expected_meta:
assert m in meta, f"missing span meta tag `{m}` in {meta}"

metrics = span["metrics"]
metrics = interfaces.library.get_span_metrics(span, span_format)
for m in expected_metrics:
assert m in metrics, f"missing span metric tag `{m}` in {metrics}"

Expand Down
Loading
Loading