From 56d9beeec62cb3732afceef568687cc904f83484 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 24 Feb 2026 10:34:29 +0000 Subject: [PATCH 1/2] Capture submitting user into QuerySummary Adds username to the QuerySummary entity so the user who submitted a query is persisted to the history database. The username is extracted from the JWT via VyneUser.username, which works for both UI sessions (logged-in user) and saved endpoint invocations (JWT-identified user). Unauthenticated mode is handled gracefully by leaving username null. Changes: - Add username field to QueryStartEvent and propagate through captureQueryStart - Add username field to QuerySummary JPA entity (column: username) - Update QueryLifecycleEventObserver to carry username and pass it to captureQueryStart - Update QueryService to pass vyneUser?.username at both the normal path (via QueryLifecycleEventObserver) and the compilation-exception path - Update PersistingQueryEventConsumer and ShutdownDecorator captureQueryStart overrides - Update QueryResultEventMapper to map username from QueryStartEvent to QuerySummary - Add Flyway migration V20260224 to add nullable username VARCHAR(255) column https://claude.ai/code/session_01K9g4NeErui15PAMCfuzno4 --- .../orbitalhq/history/db/PersistingQueryEventConsumer.kt | 6 ++++-- .../orbitalhq/query/runtime/core/QueryEventObserver.kt | 7 +++++-- .../com/orbitalhq/query/runtime/core/QueryService.kt | 5 +++-- .../executor/analytics/AnalyticsEventWriterProvider.kt | 5 +++-- .../main/java/com/orbitalhq/query/QueryEventObserver.kt | 9 ++++++--- .../java/com/orbitalhq/query/history/QuerySummary.kt | 4 +++- .../java/com/orbitalhq/history/QueryResultEventMapper.kt | 3 ++- .../V20260224__add_username_to_query_summary.sql | 1 + 8 files changed, 27 insertions(+), 13 deletions(-) create mode 100644 vyne-history-core/src/main/resources/db/migration/V20260224__add_username_to_query_summary.sql diff --git a/history-persistence/src/main/java/com/orbitalhq/history/db/PersistingQueryEventConsumer.kt b/history-persistence/src/main/java/com/orbitalhq/history/db/PersistingQueryEventConsumer.kt index 415473b9b..5f7c4db64 100644 --- a/history-persistence/src/main/java/com/orbitalhq/history/db/PersistingQueryEventConsumer.kt +++ b/history-persistence/src/main/java/com/orbitalhq/history/db/PersistingQueryEventConsumer.kt @@ -87,7 +87,8 @@ class PersistingQueryEventConsumer( query: Query?, clientQueryId: String, message: String, - anonymousTypes: Set + anonymousTypes: Set, + username: String? ) { handleEvent( QueryStartEvent( @@ -102,7 +103,8 @@ class PersistingQueryEventConsumer( persistRemoteCallResponses = config.persistRemoteCallResponses, persistRemoteCallMetadata = config.persistRemoteCallMetadata, persistTraceEvents = config.persistTraceEvents, - persistErrors = config.persistErrors + persistErrors = config.persistErrors, + username = username ) ) } diff --git a/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryEventObserver.kt b/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryEventObserver.kt index 486d1893e..1f154eef2 100644 --- a/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryEventObserver.kt +++ b/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryEventObserver.kt @@ -36,6 +36,7 @@ import java.time.Instant class QueryLifecycleEventObserver( private val consumer: QueryEventConsumer, private val activeQueryMonitor: ActiveQueryMonitor?, + private val username: String? = null ) { companion object { private val logger = KotlinLogging.logger {} @@ -64,7 +65,8 @@ class QueryLifecycleEventObserver( queryId = queryResult.queryId, clientQueryId = queryResult.clientQueryId ?: queryResult.queryId, timestamp = queryStartTime, - anonymousTypes = queryResult.anonymousTypes + anonymousTypes = queryResult.anonymousTypes, + username = username ) return queryResult.copy( @@ -149,7 +151,8 @@ class QueryLifecycleEventObserver( queryId = queryResult.queryId, clientQueryId = queryResult.clientQueryId ?: queryResult.queryId, timestamp = queryStartTime, - anonymousTypes = queryResult.anonymousTypes + anonymousTypes = queryResult.anonymousTypes, + username = username ) return queryResult.copy( diff --git a/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryService.kt b/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryService.kt index fedd0e294..d457f5065 100644 --- a/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryService.kt +++ b/query-node-core/src/main/java/com/orbitalhq/query/runtime/core/QueryService.kt @@ -583,7 +583,8 @@ class QueryService( query = null, clientQueryId = clientQueryId ?: "", message = "", - anonymousTypes = emptySet() + anonymousTypes = emptySet(), + username = vyneUser?.username ) val failedSearchResponse = FailedSearchResponse( message = e.message!!, // Message contains the error messages from the compiler @@ -610,7 +611,7 @@ class QueryService( } catch (e: Exception) { FailedSearchResponse(e.message!!, null, queryId = queryId) } - QueryLifecycleEventObserver(historyWriterEventConsumer, activeQueryMonitor) + QueryLifecycleEventObserver(historyWriterEventConsumer, activeQueryMonitor, username = vyneUser?.username) .responseWithQueryHistoryListener(query, response) to queryOptions } } diff --git a/query-node-native/src/main/java/com/orbitalhq/query/runtime/executor/analytics/AnalyticsEventWriterProvider.kt b/query-node-native/src/main/java/com/orbitalhq/query/runtime/executor/analytics/AnalyticsEventWriterProvider.kt index 41fe4b1fe..cbcab0e5b 100644 --- a/query-node-native/src/main/java/com/orbitalhq/query/runtime/executor/analytics/AnalyticsEventWriterProvider.kt +++ b/query-node-native/src/main/java/com/orbitalhq/query/runtime/executor/analytics/AnalyticsEventWriterProvider.kt @@ -49,8 +49,9 @@ class ShutdownDecorator(private val delegate: QueryEventConsumer, val onShutdown query: Query?, clientQueryId: String, message: String, - anonymousTypes: Set - ) = delegate.captureQueryStart(queryId, timestamp, taxiQuery, query, clientQueryId, message, anonymousTypes) + anonymousTypes: Set, + username: String? + ) = delegate.captureQueryStart(queryId, timestamp, taxiQuery, query, clientQueryId, message, anonymousTypes, username) override fun recordResult(operation: OperationResult, queryId: String) = delegate.recordResult(operation, queryId) diff --git a/taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryEventObserver.kt b/taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryEventObserver.kt index 9ce725bac..5c31a9114 100644 --- a/taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryEventObserver.kt +++ b/taxiql-query-engine/src/main/java/com/orbitalhq/query/QueryEventObserver.kt @@ -16,7 +16,8 @@ interface QueryEventConsumer : RemoteCallOperationResultHandler { query: Query?, clientQueryId: String, message: String, - anonymousTypes: Set + anonymousTypes: Set, + username: String? = null ) { handleEvent( QueryStartEvent( @@ -26,7 +27,8 @@ interface QueryEventConsumer : RemoteCallOperationResultHandler { query = query, clientQueryId = clientQueryId, message = message, - anonymousTypes = anonymousTypes + anonymousTypes = anonymousTypes, + username = username ) ) } @@ -130,6 +132,7 @@ data class QueryStartEvent( val persistRemoteCallResponses: Boolean? = null, val persistRemoteCallMetadata: Boolean? = null, val persistTraceEvents: Boolean? = null, - val persistErrors: Boolean? = null + val persistErrors: Boolean? = null, + val username: String? = null ) : QueryEvent(isTerminalEvent = false) diff --git a/taxiql-query-engine/src/main/java/com/orbitalhq/query/history/QuerySummary.kt b/taxiql-query-engine/src/main/java/com/orbitalhq/query/history/QuerySummary.kt index eab26b993..c4a192db9 100644 --- a/taxiql-query-engine/src/main/java/com/orbitalhq/query/history/QuerySummary.kt +++ b/taxiql-query-engine/src/main/java/com/orbitalhq/query/history/QuerySummary.kt @@ -69,7 +69,9 @@ data class QuerySummary( @Column(name = "persist_trace_events") val persistTraceEvents: Boolean? = null, @Column(name = "persist_errors") - val persistErrors: Boolean? = null + val persistErrors: Boolean? = null, + @Column(name = "username") + val username: String? = null ) : VyneHistoryRecord() { @Transient var durationMs = endTime?.let { Duration.between(startTime, endTime).toMillis() } diff --git a/vyne-history-core/src/main/java/com/orbitalhq/history/QueryResultEventMapper.kt b/vyne-history-core/src/main/java/com/orbitalhq/history/QueryResultEventMapper.kt index 80b139bd4..f7504f869 100644 --- a/vyne-history-core/src/main/java/com/orbitalhq/history/QueryResultEventMapper.kt +++ b/vyne-history-core/src/main/java/com/orbitalhq/history/QueryResultEventMapper.kt @@ -99,7 +99,8 @@ object QueryResultEventMapper { persistRemoteCallResponses = event.persistRemoteCallResponses, persistRemoteCallMetadata = event.persistRemoteCallMetadata, persistTraceEvents = event.persistTraceEvents, - persistErrors = event.persistErrors + persistErrors = event.persistErrors, + username = event.username ) } } diff --git a/vyne-history-core/src/main/resources/db/migration/V20260224__add_username_to_query_summary.sql b/vyne-history-core/src/main/resources/db/migration/V20260224__add_username_to_query_summary.sql new file mode 100644 index 000000000..7b7982171 --- /dev/null +++ b/vyne-history-core/src/main/resources/db/migration/V20260224__add_username_to_query_summary.sql @@ -0,0 +1 @@ +ALTER TABLE query_summary ADD COLUMN username VARCHAR(255); From f642c028998cb3a9170f4677b212258e50fd0545 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 24 Feb 2026 11:32:17 +0000 Subject: [PATCH 2/2] test: add tests for username capture in query history Unit tests in QueryLifecycleEventObserverTest verify that username is forwarded to QueryStartEvent (with and without auth). Integration tests in QuerySummaryOnlyPersistenceTest verify end-to-end that the username from a JWT preferred_username claim is persisted to the QUERY_SUMMARY table, and is null when the query is submitted unauthenticated. Uses the existing JWSBuilder/RsaJwk infrastructure already present in BaseQueryServiceTest, with a local helper that sets preferred_username for a predictable assertion target. https://claude.ai/code/session_01K9g4NeErui15PAMCfuzno4 --- .../core/QueryLifecycleEventObserverTest.kt | 59 +++++++++++++ .../db/QuerySummaryOnlyPersistenceTest.kt | 85 +++++++++++++++++++ 2 files changed, 144 insertions(+) diff --git a/query-node-core/src/test/java/com/orbitalhq/query/runtime/core/QueryLifecycleEventObserverTest.kt b/query-node-core/src/test/java/com/orbitalhq/query/runtime/core/QueryLifecycleEventObserverTest.kt index 1b0f560aa..b8f2a76e9 100644 --- a/query-node-core/src/test/java/com/orbitalhq/query/runtime/core/QueryLifecycleEventObserverTest.kt +++ b/query-node-core/src/test/java/com/orbitalhq/query/runtime/core/QueryLifecycleEventObserverTest.kt @@ -8,6 +8,7 @@ import com.orbitalhq.models.json.right import com.orbitalhq.query.QueryEvent import com.orbitalhq.query.QueryEventConsumer import com.orbitalhq.query.QueryResult +import com.orbitalhq.query.QueryStartEvent import com.orbitalhq.query.StreamingQueryCancelledEvent import com.orbitalhq.query.TaxiQlQueryResultEvent import com.orbitalhq.testVyne @@ -15,9 +16,12 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.test.runTest import mu.KotlinLogging +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Test import reactor.core.publisher.Flux import reactor.core.publisher.Sinks @@ -51,6 +55,61 @@ class QueryLifecycleEventObserverTest { operation streamClients():Stream } """.trimIndent() + @Test + fun `username is included in QueryStartEvent when observer is constructed with a username`() = runTest { + val capturedEvents = mutableListOf() + val capturingConsumer = object : QueryEventConsumer { + override fun handleEvent(event: QueryEvent) { + capturedEvents.add(event) + } + override fun recordResult(operation: OperationResult, queryId: String) {} + } + + val (vyne, stub) = testVyne(taxiDef) + val typedInstance = TypedInstance.from( + vyne.type("Client"), + mapOf("clientId" to "123", "clientName" to "Marty"), + vyne.schema + ) + stub.addResponseFlow("streamClients") { _, _ -> flowOf(typedInstance.right()) } + + val queryResult = vyne.query("stream { Client }") + + // captureQueryStart is called eagerly by responseWithQueryHistoryListener before the flow is consumed + QueryLifecycleEventObserver(capturingConsumer, null, username = "marty.mcfly") + .responseWithQueryHistoryListener("stream { Client }", queryResult) + + val startEvent = capturedEvents.filterIsInstance().firstOrNull() + assertEquals("marty.mcfly", startEvent?.username) + } + + @Test + fun `username is null in QueryStartEvent when observer has no authenticated user`() = runTest { + val capturedEvents = mutableListOf() + val capturingConsumer = object : QueryEventConsumer { + override fun handleEvent(event: QueryEvent) { + capturedEvents.add(event) + } + override fun recordResult(operation: OperationResult, queryId: String) {} + } + + val (vyne, stub) = testVyne(taxiDef) + val typedInstance = TypedInstance.from( + vyne.type("Client"), + mapOf("clientId" to "123", "clientName" to "Marty"), + vyne.schema + ) + stub.addResponseFlow("streamClients") { _, _ -> flowOf(typedInstance.right()) } + + val queryResult = vyne.query("stream { Client }") + + QueryLifecycleEventObserver(capturingConsumer, null, username = null) + .responseWithQueryHistoryListener("stream { Client }", queryResult) + + val startEvent = capturedEvents.filterIsInstance().firstOrNull() + assertNull(startEvent?.username) + } + @OptIn(ExperimentalCoroutinesApi::class) @Test fun `When streaming query is cancelled StreamingQueryCancelledEvent event is published`() = runTest { diff --git a/vyne-query-service/src/test/java/com/orbitalhq/queryService/history/db/QuerySummaryOnlyPersistenceTest.kt b/vyne-query-service/src/test/java/com/orbitalhq/queryService/history/db/QuerySummaryOnlyPersistenceTest.kt index cf3a9d24e..bb5ed4c25 100644 --- a/vyne-query-service/src/test/java/com/orbitalhq/queryService/history/db/QuerySummaryOnlyPersistenceTest.kt +++ b/vyne-query-service/src/test/java/com/orbitalhq/queryService/history/db/QuerySummaryOnlyPersistenceTest.kt @@ -26,6 +26,10 @@ import com.orbitalhq.schemas.fqn import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest +import org.jose4j.jwk.RsaJwkGenerator +import org.jose4j.jws.AlgorithmIdentifiers +import org.jose4j.jws.JsonWebSignature +import org.jose4j.jwt.JwtClaims import org.junit.Test import org.junit.runner.RunWith import org.springframework.beans.factory.annotation.Autowired @@ -34,6 +38,9 @@ import org.springframework.test.context.bean.override.mockito.MockitoBean import org.springframework.boot.testcontainers.service.connection.ServiceConnection import org.springframework.context.annotation.Import import org.springframework.http.MediaType +import org.springframework.security.core.Authentication +import org.springframework.security.oauth2.jwt.NimbusReactiveJwtDecoder +import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationToken import org.springframework.test.context.ActiveProfiles import org.springframework.test.context.junit4.SpringRunner import org.testcontainers.containers.PostgreSQLContainer @@ -110,6 +117,84 @@ class QuerySummaryOnlyPersistenceTest : BaseQueryServiceTest() { lateinit var schemaEditorService: SchemaEditorService + private fun authenticationWithPreferredUsername(preferredUsername: String): Authentication { + val rsaJsonWebKey = RsaJwkGenerator.generateJwk(2048) + rsaJsonWebKey.apply { + keyId = UUID.randomUUID().toString() + algorithm = AlgorithmIdentifiers.RSA_USING_SHA256 + use = "sig" + } + val claims = JwtClaims().apply { + jwtId = UUID.randomUUID().toString() + issuer = "https://test.example.com" + subject = UUID.randomUUID().toString() + setExpirationTimeMinutesInTheFuture(10F) + setIssuedAtToNow() + setClaim("preferred_username", preferredUsername) + } + val jwt = JsonWebSignature().apply { + payload = claims.toJson() + key = rsaJsonWebKey.privateKey + algorithmHeaderValue = rsaJsonWebKey.algorithm + keyIdHeaderValue = rsaJsonWebKey.keyId + setHeader("typ", "JWT") + }.compactSerialization + return JwtAuthenticationToken( + NimbusReactiveJwtDecoder.withPublicKey(rsaJsonWebKey.getRsaPublicKey()).build().decode(jwt).block() + ) + } + + @Test + fun `username is persisted in query summary when query is submitted with authentication`() { + setupTestService(historyDbWriter) + val id = UUID.randomUUID().toString() + val auth = authenticationWithPreferredUsername("marty.mcfly") + + runTest { + val turbine = queryService.submitVyneQlQueryStreamingResponse( + "find { Order[] } as Report[]", + auth = auth, + clientQueryId = id + ).testIn(this) + + val first = turbine.awaitItem() + first.should.not.be.`null` + turbine.awaitComplete() + } + + Awaitility.await().atMost(com.jayway.awaitility.Duration.TEN_SECONDS).until { + queryHistoryRecordRepository.findByClientQueryId(id)?.endTime != null + } + + val historyRecord = queryHistoryRecordRepository.findByClientQueryId(id)!! + historyRecord.username.should.equal("marty.mcfly") + } + + @Test + fun `username is null in query summary when no authentication is provided`() { + setupTestService(historyDbWriter) + val id = UUID.randomUUID().toString() + + runTest { + val turbine = queryService.submitVyneQlQueryStreamingResponse( + "find { Order[] } as Report[]", + auth = null, + clientQueryId = id + ).testIn(this) + + val first = turbine.awaitItem() + first.should.not.be.`null` + turbine.awaitComplete() + } + + Awaitility.await().atMost(com.jayway.awaitility.Duration.TEN_SECONDS).until { + queryHistoryRecordRepository.findByClientQueryId(id)?.endTime != null + } + + val historyRecord = queryHistoryRecordRepository.findByClientQueryId(id)!! + historyRecord.username.should.be.`null` + } + @Test fun `Only Query Summary is persisted when vyne history persistResults is false for a taxiQl query`() { setupTestService(historyDbWriter)