Skip to content

Commit eb1095a

Browse files
authored
feat: table-aware JSON column detection for dot-notation syntax (#57)
Previously, JSON path syntax (e.g., `containers[]`, `usage.cpu`) only worked for columns in the hardcoded DEFAULT_JSON_OBJECT_COLUMNS list (spec, status, labels, annotations, data, owner_references). This meant resource-specific JSON columns like `containers` in podmetrics weren't recognized. This change implements table-aware JSON column detection: - Add `ColumnDef::json()` constructor to explicitly mark JSON array/object columns - Update `get_core_resource_fields()` to use `json()` for 15+ array/object fields: rules, subjects, containers, usage, subsets, involved_object, source, etc. - Add `ResourceRegistry::get_json_columns()` and `get_json_columns_for_tables()` - Add `extract_table_names()` to parse FROM/JOIN clauses from SQL - Add `preprocess_sql_with_registry()` for table-aware preprocessing - Update `execute_sql()` to use the new preprocessing function Now queries like `SELECT containers[] FROM podmetrics` work correctly because: 1. Table name "podmetrics" is extracted from the query 2. Registry lookup finds containers has is_json_object=true 3. The `[]` syntax is recognized and converted to UNNEST(json_get_array(...)) This approach is table-aware - if another resource had a `containers` column that was a string, it wouldn't be incorrectly treated as JSON.
1 parent 794f2b6 commit eb1095a

3 files changed

Lines changed: 265 additions & 38 deletions

File tree

src/datafusion_integration/context.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use datafusion::prelude::SessionConfig;
1616
use crate::kubernetes::K8sClientPool;
1717
use crate::output::QueryResult;
1818

19-
use super::preprocess::{preprocess_sql, validate_read_only};
19+
use super::preprocess::{preprocess_sql_with_registry, validate_read_only};
2020
use super::provider::K8sTableProvider;
2121

2222
/// Table information with native types (for data layer)
@@ -95,8 +95,15 @@ impl K8sSessionContext {
9595

9696
/// Execute a SQL query and return the results as Arrow RecordBatches
9797
pub async fn execute_sql(&self, sql: &str) -> DFResult<Vec<RecordBatch>> {
98-
// Preprocess first (compiles PRQL to SQL if detected, fixes arrow precedence)
99-
let processed_sql = preprocess_sql(sql)
98+
// Get registry for table-aware JSON column detection
99+
let registry = self
100+
.pool
101+
.get_registry(None)
102+
.await
103+
.map_err(|e| datafusion::error::DataFusionError::Plan(e.to_string()))?;
104+
105+
// Preprocess with table-aware JSON columns (compiles PRQL, converts JSON paths)
106+
let processed_sql = preprocess_sql_with_registry(sql, &registry)
100107
.map_err(|e| datafusion::error::DataFusionError::Plan(e.to_string()))?;
101108

102109
// Validate the resulting SQL is read-only

src/datafusion_integration/preprocess.rs

Lines changed: 183 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,14 @@
5555
//! ```
5656
5757
use super::{json_path, prql};
58+
use crate::kubernetes::discovery::ResourceRegistry;
5859
use anyhow::Result;
5960
use datafusion::sql::sqlparser::ast::Statement;
6061
use datafusion::sql::sqlparser::dialect::PostgreSqlDialect;
6162
use datafusion::sql::sqlparser::parser::Parser;
63+
use datafusion::sql::sqlparser::tokenizer::{Token, Tokenizer};
6264
use regex::Regex;
65+
use std::collections::HashSet;
6366
use std::sync::LazyLock;
6467

6568
/// Regex to match arrows followed by comparison operators (left side)
@@ -120,7 +123,112 @@ fn fix_arrow_precedence(sql: &str) -> String {
120123
.into_owned()
121124
}
122125

123-
/// Preprocess a query for execution.
126+
/// Extract table names from a SQL query.
127+
///
128+
/// Uses DataFusion's tokenizer to find identifiers following FROM and JOIN keywords.
129+
/// This is a best-effort extraction - missing some tables is acceptable since we
130+
/// fall back to DEFAULT_JSON_OBJECT_COLUMNS for unrecognized columns.
131+
///
132+
/// # Examples
133+
///
134+
/// ```ignore
135+
/// extract_table_names("SELECT * FROM pods") // -> ["pods"]
136+
/// extract_table_names("SELECT * FROM pods p JOIN services s ON ...") // -> ["pods", "services"]
137+
/// extract_table_names("SELECT * FROM pods WHERE x IN (SELECT * FROM namespaces)") // -> ["pods", "namespaces"]
138+
/// ```
139+
fn extract_table_names(sql: &str) -> Vec<String> {
140+
let dialect = PostgreSqlDialect {};
141+
let tokens = match Tokenizer::new(&dialect, sql).tokenize() {
142+
Ok(t) => t,
143+
Err(_) => return vec![],
144+
};
145+
146+
let mut table_names = Vec::new();
147+
let mut i = 0;
148+
149+
while i < tokens.len() {
150+
// Look for FROM or JOIN keywords
151+
if let Token::Word(word) = &tokens[i] {
152+
let keyword = word.value.to_uppercase();
153+
if keyword == "FROM" || keyword == "JOIN" {
154+
// Skip whitespace and find the next identifier
155+
i += 1;
156+
while i < tokens.len() && matches!(tokens[i], Token::Whitespace(_)) {
157+
i += 1;
158+
}
159+
160+
// The next token should be a table name (identifier or word)
161+
if let Some(Token::Word(table_word)) = tokens.get(i) {
162+
// Skip keywords that might follow FROM (like SELECT in subqueries)
163+
let upper = table_word.value.to_uppercase();
164+
if !matches!(
165+
upper.as_str(),
166+
"SELECT" | "WITH" | "LATERAL" | "UNNEST" | "("
167+
) {
168+
table_names.push(table_word.value.to_lowercase());
169+
}
170+
}
171+
}
172+
}
173+
i += 1;
174+
}
175+
176+
table_names
177+
}
178+
179+
/// Build a set of JSON column names from the registry for the given tables.
180+
///
181+
/// Merges DEFAULT_JSON_OBJECT_COLUMNS with table-specific JSON columns.
182+
fn build_json_columns_for_tables(
183+
table_names: &[String],
184+
registry: &ResourceRegistry,
185+
) -> HashSet<String> {
186+
// Start with default columns (always available)
187+
let mut columns = json_path::build_json_columns_set(&[]);
188+
189+
// Add table-specific JSON columns
190+
columns.extend(registry.get_json_columns_for_tables(table_names));
191+
192+
columns
193+
}
194+
195+
/// Preprocess a SQL query with table-aware JSON column detection.
196+
///
197+
/// This is the primary preprocessing function when a ResourceRegistry is available.
198+
/// It extracts table names from the query and looks up their JSON columns for
199+
/// accurate dot-notation conversion.
200+
///
201+
/// # Arguments
202+
///
203+
/// * `sql` - The SQL or PRQL query to preprocess
204+
/// * `registry` - The resource registry containing table schemas
205+
///
206+
/// # Returns
207+
///
208+
/// The preprocessed SQL ready for execution
209+
pub fn preprocess_sql_with_registry(sql: &str, registry: &ResourceRegistry) -> Result<String> {
210+
// Step 1: Compile PRQL to SQL if detected
211+
let sql = if prql::is_prql(sql) {
212+
let prql_preprocessed = prql::preprocess_prql_json_paths(sql);
213+
prql::compile_prql(&prql_preprocessed)?
214+
} else {
215+
sql.to_string()
216+
};
217+
218+
// Step 2: Extract table names from the SQL
219+
let table_names = extract_table_names(&sql);
220+
221+
// Step 3: Build JSON columns set from registry for these tables
222+
let json_columns = build_json_columns_for_tables(&table_names, registry);
223+
224+
// Step 4: Convert JSON path syntax with table-aware columns
225+
let sql = json_path::preprocess_json_paths(&sql, Some(&json_columns));
226+
227+
// Step 5: Fix JSON arrow operator precedence
228+
Ok(fix_arrow_precedence(&sql))
229+
}
230+
231+
/// Preprocess a query for execution (without registry - uses defaults only).
124232
///
125233
/// This function handles:
126234
/// 1. **PRQL detection and compilation**: Queries starting with `from`, `let`, or `prql`
@@ -132,6 +240,9 @@ fn fix_arrow_precedence(sql: &str) -> String {
132240
/// 3. **JSON arrow precedence fix**: Wraps arrow expressions in parentheses when used
133241
/// with comparison operators to work around DataFusion parser precedence.
134242
///
243+
/// Note: This function only recognizes DEFAULT_JSON_OBJECT_COLUMNS (spec, status, labels, etc.).
244+
/// For table-aware JSON column detection, use `preprocess_sql_with_registry` instead.
245+
///
135246
/// # Examples
136247
///
137248
/// ```ignore
@@ -153,6 +264,7 @@ fn fix_arrow_precedence(sql: &str) -> String {
153264
/// preprocess_sql("SELECT * FROM pods WHERE labels->>'app' = 'nginx'")?;
154265
/// // Returns: SELECT * FROM pods WHERE (labels->>'app') = 'nginx'
155266
/// ```
267+
#[allow(dead_code)] // Used by tests and for backward compatibility
156268
pub fn preprocess_sql(sql: &str) -> Result<String> {
157269
// Step 1: Compile PRQL to SQL if detected
158270
let sql = if prql::is_prql(sql) {
@@ -709,4 +821,74 @@ mod tests {
709821
let result = preprocess_sql(sql).unwrap();
710822
assert_eq!(result, sql);
711823
}
824+
825+
// ==================== Table extraction tests ====================
826+
827+
#[test]
828+
fn test_extract_table_names_simple() {
829+
let tables = extract_table_names("SELECT * FROM pods");
830+
assert_eq!(tables, vec!["pods"]);
831+
}
832+
833+
#[test]
834+
fn test_extract_table_names_with_alias() {
835+
let tables = extract_table_names("SELECT * FROM pods p");
836+
assert_eq!(tables, vec!["pods"]);
837+
}
838+
839+
#[test]
840+
fn test_extract_table_names_join() {
841+
let tables = extract_table_names("SELECT * FROM pods p JOIN services s ON p.name = s.name");
842+
assert_eq!(tables, vec!["pods", "services"]);
843+
}
844+
845+
#[test]
846+
fn test_extract_table_names_multiple_joins() {
847+
let tables = extract_table_names(
848+
"SELECT * FROM pods p \
849+
JOIN services s ON p.name = s.name \
850+
JOIN deployments d ON d.name = p.name",
851+
);
852+
assert_eq!(tables, vec!["pods", "services", "deployments"]);
853+
}
854+
855+
#[test]
856+
fn test_extract_table_names_subquery() {
857+
let tables = extract_table_names(
858+
"SELECT * FROM pods WHERE namespace IN (SELECT name FROM namespaces)",
859+
);
860+
assert_eq!(tables, vec!["pods", "namespaces"]);
861+
}
862+
863+
#[test]
864+
fn test_extract_table_names_case_insensitive() {
865+
let tables = extract_table_names("SELECT * FROM Pods");
866+
assert_eq!(tables, vec!["pods"]); // lowercased
867+
}
868+
869+
#[test]
870+
fn test_extract_table_names_left_join() {
871+
let tables = extract_table_names("SELECT * FROM pods LEFT JOIN services ON true");
872+
// LEFT is a keyword before JOIN, so we get both tables
873+
assert!(tables.contains(&"pods".to_string()));
874+
assert!(tables.contains(&"services".to_string()));
875+
}
876+
877+
// ==================== Table-aware preprocessing tests ====================
878+
// Note: These tests need a registry, which requires more setup.
879+
// The integration is tested via execute_sql in context.rs.
880+
881+
#[test]
882+
fn test_build_json_columns_includes_defaults() {
883+
// Even with no registry tables found, defaults should be present
884+
use crate::kubernetes::discovery::ResourceRegistry;
885+
let registry = ResourceRegistry::new();
886+
let columns = build_json_columns_for_tables(&[], &registry);
887+
888+
// Should include default columns
889+
assert!(columns.contains("spec"));
890+
assert!(columns.contains("status"));
891+
assert!(columns.contains("labels"));
892+
assert!(columns.contains("annotations"));
893+
}
712894
}

0 commit comments

Comments
 (0)