From 9ae32501347ef391eea1ddbd7cacbd691ea71666 Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Mon, 6 Apr 2026 13:06:32 -0700 Subject: [PATCH] feat: Implement BigQuery auto-schema upgrade and view creation This change adds functionality to automatically upgrade the BigQuery table schema by adding new top-level fields and new nested fields within RECORD types. It also introduces a new utility to create per-event-type BigQuery views that unnest JSON fields for easier querying. The configuration now includes options to enable view creation and set a view prefix. PiperOrigin-RevId: 895463825 --- .../BigQueryAgentAnalyticsPlugin.java | 43 ++- .../agentanalytics/BigQueryLoggerConfig.java | 27 +- .../agentanalytics/BigQuerySchema.java | 8 + .../plugins/agentanalytics/BigQueryUtils.java | 286 ++++++++++++++++++ .../BigQueryAgentAnalyticsPluginTest.java | 126 ++++++++ 5 files changed, 472 insertions(+), 18 deletions(-) create mode 100644 core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java index cf7dad9df..3c673b140 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java @@ -16,6 +16,8 @@ package com.google.adk.plugins.agentanalytics; +import static com.google.adk.plugins.agentanalytics.BigQueryUtils.createAnalyticsViews; +import static com.google.adk.plugins.agentanalytics.BigQueryUtils.maybeUpgradeSchema; import static com.google.adk.plugins.agentanalytics.JsonFormatter.convertToJsonNode; import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate; import static com.google.adk.plugins.agentanalytics.JsonFormatter.toJavaObject; @@ -143,6 +145,8 @@ private static BigQuery createBigQuery(BigQueryLoggerConfig config) throws IOExc builder.setCredentials( GoogleCredentials.getApplicationDefault().createScoped(DEFAULT_AUTH_SCOPES)); } + builder = builder.setLocation(config.location()); + builder.setProjectId(config.projectId()); return builder.build().getService(); } @@ -172,28 +176,37 @@ private void ensureTableExists(BigQuery bigQuery, BigQueryLoggerConfig config) { tableDefinitionBuilder.setClustering( Clustering.newBuilder().setFields(config.clusteringFields()).build()); } - TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinitionBuilder.build()).build(); + TableInfo tableInfo = + TableInfo.newBuilder(tableId, tableDefinitionBuilder.build()) + .setLabels( + ImmutableMap.of( + BigQuerySchema.SCHEMA_VERSION_LABEL_KEY, BigQuerySchema.SCHEMA_VERSION)) + .build(); bigQuery.create(tableInfo); } else if (config.autoSchemaUpgrade()) { - // TODO(b/491851868): Implement auto-schema upgrade. - logger.info("BigQuery table already exists and auto-schema upgrade is enabled: " + tableId); - logger.info("Auto-schema upgrade is not implemented yet."); + maybeUpgradeSchema(bigQuery, table); } } catch (BigQueryException e) { - if (e.getMessage().contains("invalid_grant")) { - logger.log( - Level.SEVERE, - "Failed to authenticate with BigQuery. Please run 'gcloud auth application-default" - + " login' to refresh your credentials or provide valid credentials in" - + " BigQueryLoggerConfig.", - e); - } else { - logger.log( - Level.WARNING, "Failed to check or create/upgrade BigQuery table: " + tableId, e); - } + processBigQueryException(e, "Failed to check or create/upgrade BigQuery table: " + tableId); } catch (RuntimeException e) { logger.log(Level.WARNING, "Failed to check or create/upgrade BigQuery table: " + tableId, e); } + + try { + if (config.createViews()) { + var unused = executor.submit(() -> createAnalyticsViews(bigQuery, config)); + } + } catch (RuntimeException e) { + logger.log(Level.WARNING, "Failed to create/update BigQuery views for table: " + tableId, e); + } + } + + private void processBigQueryException(BigQueryException e, String logMessage) { + if (e.getMessage().contains("invalid_grant")) { + logger.log(Level.SEVERE, "Failed to authenticate with BigQuery.", e); + } else { + logger.log(Level.WARNING, logMessage, e); + } } protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) throws IOException { diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java index 73659b255..ccce8c3bc 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java @@ -43,6 +43,9 @@ public abstract class BigQueryLoggerConfig { // Max length for text content before truncation. public abstract int maxContentLength(); + // BigQuery location. + public abstract String location(); + // Project ID for the BigQuery table. public abstract String projectId(); @@ -93,9 +96,16 @@ public abstract class BigQueryLoggerConfig { // Automatically add new columns to existing tables when the plugin // schema evolves. Only additive changes are made (columns are never // dropped or altered). - // TODO(b/491852782): Implement auto-schema upgrade. public abstract boolean autoSchemaUpgrade(); + // Automatically create per-event-type BigQuery views that unnest + // JSON columns into typed, queryable columns. + public abstract boolean createViews(); + + // Prefix for auto-created per-event-type view names. + // Default "v" produces views like ``v_llm_request``. + public abstract String viewPrefix(); + @Nullable public abstract Credentials credentials(); @@ -105,6 +115,7 @@ public static Builder builder() { return new AutoValue_BigQueryLoggerConfig.Builder() .enabled(true) .maxContentLength(500 * 1024) + .location("us") // Default location. .datasetId("agent_analytics") .tableName("events") .clusteringFields(ImmutableList.of("event_type", "agent", "user_id")) @@ -118,8 +129,9 @@ public static Builder builder() { .customTags(ImmutableMap.of()) .eventAllowlist(ImmutableList.of()) .eventDenylist(ImmutableList.of()) - // TODO(b/491851868): Enable auto-schema upgrade once implemented. - .autoSchemaUpgrade(false); + .autoSchemaUpgrade(true) + .createViews(false) + .viewPrefix("v"); } /** Builder for {@link BigQueryLoggerConfig}. */ @@ -138,6 +150,9 @@ public abstract static class Builder { @CanIgnoreReturnValue public abstract Builder maxContentLength(int maxContentLength); + @CanIgnoreReturnValue + public abstract Builder location(String location); + @CanIgnoreReturnValue public abstract Builder projectId(String projectId); @@ -184,6 +199,12 @@ public abstract Builder contentFormatter( @CanIgnoreReturnValue public abstract Builder autoSchemaUpgrade(boolean autoSchemaUpgrade); + @CanIgnoreReturnValue + public abstract Builder createViews(boolean createViews); + + @CanIgnoreReturnValue + public abstract Builder viewPrefix(String viewPrefix); + @CanIgnoreReturnValue public abstract Builder credentials(Credentials credentials); diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQuerySchema.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQuerySchema.java index 81181a1e0..9a7e76f88 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQuerySchema.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQuerySchema.java @@ -45,6 +45,14 @@ public final class BigQuerySchema { private BigQuerySchema() {} + /** + * The version of the BigQuery schema. Each time the schema is changed(new fields are added), this + * should be incremented. + */ + static final String SCHEMA_VERSION = "1"; + + static final String SCHEMA_VERSION_LABEL_KEY = "adk_schema_version"; + private static final ImmutableMap> FIELD_TYPE_TO_ARROW_FIELD_METADATA = ImmutableMap.of( diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java new file mode 100644 index 000000000..f003e82ad --- /dev/null +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryUtils.java @@ -0,0 +1,286 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.plugins.agentanalytics; + +import static com.google.adk.plugins.agentanalytics.BigQuerySchema.SCHEMA_VERSION; +import static com.google.adk.plugins.agentanalytics.BigQuerySchema.SCHEMA_VERSION_LABEL_KEY; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static java.util.stream.Collectors.toCollection; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.Table; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** Utility for managing BigQuery schema upgrades and analytics views. */ +final class BigQueryUtils { + private static final Logger logger = Logger.getLogger(BigQueryUtils.class.getName()); + + private static final ImmutableList VIEW_COMMON_COLUMNS = + ImmutableList.of( + "timestamp", + "event_type", + "agent", + "session_id", + "invocation_id", + "user_id", + "trace_id", + "span_id", + "parent_span_id", + "status", + "error_message", + "is_truncated"); + + // Per-event-type column extractions. Each value is a list of ``"SQL_EXPR AS alias"`` strings that + // will be appended after the common columns in the view SELECT. + private static final ImmutableMap> EVENT_VIEW_DEFS = + ImmutableMap.>builder() + .put("USER_MESSAGE_RECEIVED", ImmutableList.of()) + .put( + "LLM_REQUEST", + ImmutableList.of( + "JSON_VALUE(attributes, '$.model') AS model", + "content AS request_content", + "JSON_QUERY(attributes, '$.llm_config') AS llm_config", + "JSON_QUERY(attributes, '$.tools') AS tools")) + .put( + "LLM_RESPONSE", + ImmutableList.of( + "JSON_QUERY(content, '$.response') AS response", + "CAST(JSON_VALUE(content, '$.usage.prompt') AS INT64) AS usage_prompt_tokens", + "CAST(JSON_VALUE(content, '$.usage.completion') AS INT64) AS" + + " usage_completion_tokens", + "CAST(JSON_VALUE(content, '$.usage.total') AS INT64) AS usage_total_tokens", + "CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms", + "CAST(JSON_VALUE(latency_ms, '$.time_to_first_token_ms') AS INT64) AS ttft_ms", + "JSON_VALUE(attributes, '$.model_version') AS model_version", + "JSON_QUERY(attributes, '$.usage_metadata') AS usage_metadata")) + .put( + "LLM_ERROR", + ImmutableList.of("CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms")) + .put( + "TOOL_STARTING", + ImmutableList.of( + "JSON_VALUE(content, '$.tool') AS tool_name", + "JSON_QUERY(content, '$.args') AS tool_args", + "JSON_VALUE(content, '$.tool_origin') AS tool_origin")) + .put( + "TOOL_COMPLETED", + ImmutableList.of( + "JSON_VALUE(content, '$.tool') AS tool_name", + "JSON_QUERY(content, '$.result') AS tool_result", + "JSON_VALUE(content, '$.tool_origin') AS tool_origin", + "CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms")) + .put( + "TOOL_ERROR", + ImmutableList.of( + "JSON_VALUE(content, '$.tool') AS tool_name", + "JSON_QUERY(content, '$.args') AS tool_args", + "JSON_VALUE(content, '$.tool_origin') AS tool_origin", + "CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms")) + .put( + "AGENT_STARTING", + ImmutableList.of("JSON_VALUE(content, '$.text_summary') AS agent_instruction")) + .put( + "AGENT_COMPLETED", + ImmutableList.of("CAST(JSON_VALUE(latency_ms, '$.total_ms') AS INT64) AS total_ms")) + .put("INVOCATION_STARTING", ImmutableList.of()) + .put("INVOCATION_COMPLETED", ImmutableList.of()) + .put( + "STATE_DELTA", + ImmutableList.of("JSON_QUERY(attributes, '$.state_delta') AS state_delta")) + .put( + "HITL_CREDENTIAL_REQUEST", + ImmutableList.of( + "JSON_VALUE(content, '$.tool') AS tool_name", + "JSON_QUERY(content, '$.args') AS tool_args")) + .put( + "HITL_CONFIRMATION_REQUEST", + ImmutableList.of( + "JSON_VALUE(content, '$.tool') AS tool_name", + "JSON_QUERY(content, '$.args') AS tool_args")) + .put( + "HITL_INPUT_REQUEST", + ImmutableList.of( + "JSON_VALUE(content, '$.tool') AS tool_name", + "JSON_QUERY(content, '$.args') AS tool_args")) + .buildOrThrow(); + + /** Creates and/or replaces the analytics views in BigQuery. */ + static void createAnalyticsViews(BigQuery bigQuery, BigQueryLoggerConfig config) { + for (Map.Entry> entry : EVENT_VIEW_DEFS.entrySet()) { + String eventType = entry.getKey(); + ImmutableList extraCols = entry.getValue(); + + String viewName = config.viewPrefix() + "_" + eventType.toLowerCase(Locale.ROOT); + ImmutableList allCols = + ImmutableList.builder().addAll(VIEW_COMMON_COLUMNS).addAll(extraCols).build(); + + String columns = String.join(",\n ", allCols); + String sql = + String.format( + "CREATE OR REPLACE VIEW `%s.%s.%s` AS\nSELECT\n %s\nFROM\n " + + "`%s.%s.%s` \nWHERE\n event_type = '%s'", + config.projectId(), + config.datasetId(), + viewName, + columns, + config.projectId(), + config.datasetId(), + config.tableName(), + eventType); + + try { + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(sql).build(); + var unused = bigQuery.query(queryConfig); + } catch (BigQueryException | InterruptedException e) { + logger.log(Level.WARNING, "Failed to create or update view " + viewName, e); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } + } + + /** Adds missing columns to an existing table if the schema version has changed. */ + static void maybeUpgradeSchema(BigQuery bigQuery, Table existingTable) { + String storedVersion = + Optional.ofNullable(existingTable.getLabels()) + .map(labels -> labels.get(SCHEMA_VERSION_LABEL_KEY)) + .orElse(""); + + if (storedVersion.equals(SCHEMA_VERSION)) { + return; + } + + SchemaDiff diff = + schemaFieldsMatch( + existingTable.getDefinition().getSchema().getFields(), + BigQuerySchema.getEventsSchema().getFields()); + + if (!diff.newTopLevelFields().isEmpty() || !diff.updatedRecordFields().isEmpty()) { + ImmutableMap updatedFields = + diff.updatedRecordFields().stream().collect(toImmutableMap(Field::getName, f -> f)); + ImmutableSet updatedNames = updatedFields.keySet(); + + List mergedFields = new ArrayList<>(); + for (Field f : existingTable.getDefinition().getSchema().getFields()) { + if (updatedNames.contains(f.getName())) { + mergedFields.add(updatedFields.get(f.getName())); + } else { + mergedFields.add(f); + } + } + mergedFields.addAll(diff.newTopLevelFields()); + + logger.info( + String.format( + "Auto-upgrading table %s: new columns %s, updated RECORD fields %s", + existingTable.getTableId(), + diff.newTopLevelFields().stream().map(Field::getName).collect(toImmutableList()), + diff.updatedRecordFields().stream() + .map(Field::getName) + .collect(toCollection(ArrayList::new)))); + + try { + Map labels = + new HashMap<>(Optional.ofNullable(existingTable.getLabels()).orElse(ImmutableMap.of())); + labels.put(BigQuerySchema.SCHEMA_VERSION_LABEL_KEY, BigQuerySchema.SCHEMA_VERSION); + + Table updatedTable = + existingTable.toBuilder() + .setDefinition( + existingTable.getDefinition().toBuilder() + .setSchema(Schema.of(mergedFields)) + .build()) + .setLabels(labels) + .build(); + + var unused = bigQuery.update(updatedTable); + } catch (BigQueryException e) { + logger.log( + Level.WARNING, "Schema auto-upgrade failed for " + existingTable.getTableId(), e); + } + } + } + + private static SchemaDiff schemaFieldsMatch(FieldList existing, FieldList desired) { + ImmutableMap existingByName = + existing.stream().collect(toImmutableMap(Field::getName, f -> f)); + List newFields = new ArrayList<>(); + List updatedRecords = new ArrayList<>(); + + for (Field desiredField : desired) { + Field existingField = existingByName.get(desiredField.getName()); + if (existingField == null) { + newFields.add(desiredField); + } else if (desiredField.getType().getStandardType().equals(StandardSQLTypeName.STRUCT) + && existingField.getType().getStandardType().equals(StandardSQLTypeName.STRUCT) + && desiredField.getSubFields() != null) { + + SchemaDiff subDiff = + schemaFieldsMatch(existingField.getSubFields(), desiredField.getSubFields()); + + if (!subDiff.newTopLevelFields().isEmpty() || !subDiff.updatedRecordFields().isEmpty()) { + List mergedSub = new ArrayList<>(existingField.getSubFields()); + ImmutableSet subUpdatedNames = + subDiff.updatedRecordFields().stream().map(Field::getName).collect(toImmutableSet()); + + for (int i = 0; i < mergedSub.size(); i++) { + Field f = mergedSub.get(i); + if (subUpdatedNames.contains(f.getName())) { + mergedSub.set( + i, + subDiff.updatedRecordFields().stream() + .filter(u -> u.getName().equals(f.getName())) + .findFirst() + .get()); + } + } + mergedSub.addAll(subDiff.newTopLevelFields()); + updatedRecords.add( + existingField.toBuilder() + .setType(StandardSQLTypeName.STRUCT, FieldList.of(mergedSub)) + .build()); + } + } + } + return new SchemaDiff(ImmutableList.copyOf(newFields), ImmutableList.copyOf(updatedRecords)); + } + + private record SchemaDiff( + ImmutableList newTopLevelFields, ImmutableList updatedRecordFields) {} + + private BigQueryUtils() {} +} diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java index 04987ed3c..c7e35e3d6 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java @@ -16,11 +16,13 @@ package com.google.adk.plugins.agentanalytics; +import static com.google.common.collect.ImmutableList.toImmutableList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -43,7 +45,12 @@ import com.google.auth.Credentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.FieldList; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; @@ -88,6 +95,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnit; import org.mockito.junit.MockitoRule; @@ -101,6 +109,7 @@ public class BigQueryAgentAnalyticsPluginTest { @Mock private StreamWriter mockWriter; @Mock private BigQueryWriteClient mockWriteClient; @Mock private InvocationContext mockInvocationContext; + @Captor private ArgumentCaptor> labelsCaptor; private BaseAgent fakeAgent; private BigQueryLoggerConfig config; @@ -641,6 +650,123 @@ protected TraceManager createTraceManager() { "attributes should not contain session_metadata", attributes.has("session_metadata")); } + @Test + public void maybeUpgradeSchema_addsNewTopLevelField() throws Exception { + Table mockTable = mock(Table.class); + when(mockTable.getTableId()).thenReturn(TableId.of("project", "dataset", "table")); + when(mockTable.getLabels()).thenReturn(ImmutableMap.of()); + + // Initial schema missing one field, e.g., 'is_truncated' + ImmutableList initialFields = + BigQuerySchema.getEventsSchema().getFields().stream() + .filter(f -> !f.getName().equals("is_truncated")) + .collect(toImmutableList()); + StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setSchema(com.google.cloud.bigquery.Schema.of(initialFields)) + .build(); + when(mockTable.getDefinition()).thenReturn(tableDefinition); + + Table.Builder mockTableBuilder = mock(Table.Builder.class); + when(mockTable.toBuilder()).thenReturn(mockTableBuilder); + when(mockTableBuilder.setDefinition(any(TableDefinition.class))).thenReturn(mockTableBuilder); + when(mockTableBuilder.setLabels(anyMap())).thenReturn(mockTableBuilder); + when(mockTableBuilder.build()).thenReturn(mockTable); + + BigQueryUtils.maybeUpgradeSchema(mockBigQuery, mockTable); + + ArgumentCaptor definitionCaptor = + ArgumentCaptor.forClass(StandardTableDefinition.class); + verify(mockTableBuilder).setDefinition(definitionCaptor.capture()); + com.google.cloud.bigquery.Schema updatedSchema = definitionCaptor.getValue().getSchema(); + assertNotNull(updatedSchema.getFields().get("is_truncated")); + + verify(mockTableBuilder).setLabels(labelsCaptor.capture()); + assertEquals( + BigQuerySchema.SCHEMA_VERSION, + labelsCaptor.getValue().get(BigQuerySchema.SCHEMA_VERSION_LABEL_KEY)); + + verify(mockBigQuery).update(any(Table.class)); + } + + @Test + public void maybeUpgradeSchema_addsNewNestedField() throws Exception { + Table mockTable = mock(Table.class); + when(mockTable.getTableId()).thenReturn(TableId.of("project", "dataset", "table")); + when(mockTable.getLabels()).thenReturn(ImmutableMap.of()); + + // Initial schema missing 'storage_mode' in 'content_parts' + ImmutableList initialFields = + BigQuerySchema.getEventsSchema().getFields().stream() + .map( + f -> { + if (f.getName().equals("content_parts")) { + ImmutableList subFields = + f.getSubFields().stream() + .filter(sf -> !sf.getName().equals("storage_mode")) + .collect(toImmutableList()); + return f.toBuilder() + .setType(StandardSQLTypeName.STRUCT, FieldList.of(subFields)) + .build(); + } + return f; + }) + .collect(toImmutableList()); + + StandardTableDefinition tableDefinition = + StandardTableDefinition.newBuilder() + .setSchema(com.google.cloud.bigquery.Schema.of(initialFields)) + .build(); + when(mockTable.getDefinition()).thenReturn(tableDefinition); + + Table.Builder mockTableBuilder = mock(Table.Builder.class); + when(mockTable.toBuilder()).thenReturn(mockTableBuilder); + when(mockTableBuilder.setDefinition(any(TableDefinition.class))).thenReturn(mockTableBuilder); + when(mockTableBuilder.setLabels(anyMap())).thenReturn(mockTableBuilder); + when(mockTableBuilder.build()).thenReturn(mockTable); + + BigQueryUtils.maybeUpgradeSchema(mockBigQuery, mockTable); + + ArgumentCaptor definitionCaptor = + ArgumentCaptor.forClass(StandardTableDefinition.class); + verify(mockTableBuilder).setDefinition(definitionCaptor.capture()); + com.google.cloud.bigquery.Field contentParts = + definitionCaptor.getValue().getSchema().getFields().get("content_parts"); + assertNotNull(contentParts.getSubFields().get("storage_mode")); + + verify(mockBigQuery).update(any(Table.class)); + } + + @Test + public void createAnalyticsViews_executesQueries() throws Exception { + BigQueryUtils.createAnalyticsViews(mockBigQuery, config); + + // Verify a few specific views are created + verify(mockBigQuery, atLeastOnce()).query(any(QueryJobConfiguration.class)); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(QueryJobConfiguration.class); + verify(mockBigQuery, atLeastOnce()).query(captor.capture()); + + ImmutableList queries = + captor.getAllValues().stream() + .map(QueryJobConfiguration::getQuery) + .collect(toImmutableList()); + + assertTrue( + queries.stream() + .anyMatch( + q -> + q.contains( + "CREATE OR REPLACE VIEW `project.dataset.v_user_message_received`"))); + assertTrue( + queries.stream() + .anyMatch(q -> q.contains("CREATE OR REPLACE VIEW `project.dataset.v_llm_request`"))); + assertTrue( + queries.stream() + .anyMatch(q -> q.contains("CREATE OR REPLACE VIEW `project.dataset.v_llm_response`"))); + } + private static class FakeAgent extends BaseAgent { FakeAgent(String name) { super(name, "description", null, null, null);