-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_query_engine_injection.py
More file actions
153 lines (127 loc) · 5.79 KB
/
Copy pathtest_query_engine_injection.py
File metadata and controls
153 lines (127 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
from datetime import UTC, datetime
from unittest.mock import Mock, patch
import pytest
import sqlglot
from sqlglot import exp
from src.serving.backends.clickhouse_backend import ClickHouseBackend
from src.serving.semantic_layer.catalog import DataCatalog
from src.serving.semantic_layer.query_engine import QueryEngine
ATTACK_VECTORS = [
"'; DROP TABLE orders_v2; --",
"' OR '1'='1",
"'; DELETE FROM users WHERE '1'='1",
"\\'; DROP TABLE orders_v2; --",
"ORD' UNION SELECT * FROM api_keys --",
"'); ATTACH 'evil.db' AS evil; --",
"ORD\x00'; DROP TABLE --",
"ORD' AND (SELECT COUNT(*) FROM api_keys) > 0 --",
]
@pytest.mark.parametrize("payload", ATTACK_VECTORS)
def test_get_entity_passes_entity_id_as_query_param(payload: str) -> None:
engine = QueryEngine(catalog=DataCatalog(), db_path=":memory:")
engine._tenant_router = Mock()
engine._tenant_router.has_config.return_value = False
engine._tenant_router.get_duckdb_schema.return_value = None
backend = Mock()
backend.name = "duckdb"
backend.execute.return_value = []
engine._backend = backend
engine._backend_name = backend.name
result = engine.get_entity("order", payload)
assert result is None
assert backend.execute.call_count == 1
args = backend.execute.call_args.args
assert len(args) == 2
sql, params = args
assert 'WHERE "order_id" = ?' in sql
assert payload not in sql
assert params == [payload]
@pytest.mark.parametrize("payload", ATTACK_VECTORS)
def test_get_entity_at_passes_history_filters_as_query_params(payload: str) -> None:
engine = QueryEngine(catalog=DataCatalog(), db_path=":memory:")
engine._tenant_router = Mock()
engine._tenant_router.has_config.return_value = False
engine._tenant_router.get_duckdb_schema.return_value = None
backend = Mock()
backend.name = "duckdb"
backend.table_columns.return_value = {"entity_id", "entity_data", "entity_type", "processed_at"}
backend.execute.return_value = [
{"entity_data": "{}", "event_time": datetime(2026, 4, 1, 12, 0, tzinfo=UTC)}
]
engine._backend = backend
engine._backend_name = backend.name
as_of = datetime(2026, 4, 1, 15, 30, tzinfo=UTC)
expected_anchor = as_of.astimezone(datetime.now().astimezone().tzinfo or UTC).replace(
tzinfo=None
)
result = engine.get_entity_at("order", payload, as_of=as_of)
assert result is not None
assert backend.execute.call_count == 1
args = backend.execute.call_args.args
assert len(args) == 2
sql, params = args
assert "entity_type = ?" in sql
assert "entity_id = ?" in sql
assert "CAST(? AS TIMESTAMP)" in sql
assert payload not in sql
assert params == ["order", payload, expected_anchor]
def test_get_metric_passes_as_of_anchor_as_query_params() -> None:
engine = QueryEngine(catalog=DataCatalog(), db_path=":memory:")
engine._tenant_router = Mock()
engine._tenant_router.has_config.return_value = False
engine._tenant_router.get_duckdb_schema.return_value = None
backend = Mock()
backend.name = "duckdb"
backend.scalar.return_value = 12.5
engine._backend = backend
engine._backend_name = backend.name
as_of = datetime(2026, 4, 1, 15, 30, tzinfo=UTC)
expected_anchor = as_of.astimezone(datetime.now().astimezone().tzinfo or UTC).replace(
tzinfo=None
)
result = engine.get_metric("revenue", window="24h", as_of=as_of)
assert result == {"value": 12.5, "unit": "USD"}
assert backend.scalar.call_count == 1
args = backend.scalar.call_args.args
assert len(args) == 2
sql, params = args
assert sql.count("CAST(? AS TIMESTAMP)") == 2
assert "NOW()" not in sql
assert params == [expected_anchor, expected_anchor]
class _EmptyClickHouseResponse:
"""Minimal urlopen() context-manager stand-in returning an empty result."""
def __enter__(self):
return self
def __exit__(self, *_: object) -> bool:
return False
def read(self) -> bytes:
return b'{"data":[]}'
@pytest.mark.parametrize("payload", ATTACK_VECTORS)
def test_get_entity_clickhouse_path_keeps_payload_inert(payload: str) -> None:
"""A-3: the non-DuckDB (ClickHouse) backend does not bind params — the
engine inlines the value via `_quote_literal` and `ClickHouseBackend.
_translate_sql` re-escapes it. Assert the SQL actually sent over HTTP is a
single inert SELECT: no statement split, UNION, OR, or DDL/DML smuggled out
of the string literal. Complements the DuckDB-binding tests above."""
engine = QueryEngine(catalog=DataCatalog(), db_path=":memory:")
engine._tenant_router = Mock()
engine._tenant_router.has_config.return_value = False
engine._tenant_router.get_duckdb_schema.return_value = None
ch_backend = ClickHouseBackend(host="ch", port=8123, user="u", password="p", database="db")
engine._backend = ch_backend
# name != the duckdb backend's name → use_query_params is False (inline path)
engine._backend_name = ch_backend.name
sent: list[str] = []
def fake_urlopen(req, timeout=None): # noqa: ARG001
sent.append(req.data.decode("utf-8"))
return _EmptyClickHouseResponse()
with patch("src.serving.backends.clickhouse_backend.urlopen", side_effect=fake_urlopen):
result = engine.get_entity("order", payload)
assert result is None
assert len(sent) == 1, f"expected exactly one query, got {sent!r}"
statements = [s for s in sqlglot.parse(sent[0], dialect="clickhouse") if s is not None]
assert len(statements) == 1, f"payload split the statement: {sent[0]!r}"
stmt = statements[0]
assert isinstance(stmt, exp.Select)
injected = list(stmt.find_all(exp.Or, exp.Union, exp.Drop, exp.Delete, exp.Insert, exp.Alter))
assert not injected, f"injection leaked {[type(n).__name__ for n in injected]}: {sent[0]!r}"