From 8f1dcda7fd1fd994bb8bfb103a22fd23499be878 Mon Sep 17 00:00:00 2001 From: Varun Shetty Date: Thu, 4 Dec 2025 18:57:35 +0000 Subject: [PATCH 1/3] Implement V1 compat API for PubSub --- spec/v2/providers/pubsub.spec.ts | 133 +++++++++++++++++++++++++++++++ src/v2/core.ts | 8 ++ src/v2/providers/pubsub.ts | 68 +++++++++++++++- 3 files changed, 208 insertions(+), 1 deletion(-) diff --git a/spec/v2/providers/pubsub.spec.ts b/spec/v2/providers/pubsub.spec.ts index d498b1b42..aa87187e0 100644 --- a/spec/v2/providers/pubsub.spec.ts +++ b/spec/v2/providers/pubsub.spec.ts @@ -170,6 +170,40 @@ describe("onMessagePublished", () => { expect(json).to.deep.equal({ hello: "world" }); }); + it("should construct a CloudEvent with the correct context", async () => { + const publishTime = new Date().toISOString(); + const message = { + messageId: "uuid", + data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), + publishTime, + }; + const data: pubsub.MessagePublishedData = { + message: message as any, + subscription: "projects/aProject/subscriptions/aSubscription", + }; + const event: CloudEvent = { + specversion: "1.0", + id: "uuid", + time: publishTime, + type: "google.cloud.pubsub.topic.v1.messagePublished", + source: "//pubsub.googleapis.com/projects/aProject/topics/topic", + data, + }; + + let receivedEvent: CloudEvent>; + const func = pubsub.onMessagePublished("topic", (e) => { + receivedEvent = e; + }); + + await func(event); + + expect(receivedEvent.id).to.equal("uuid"); + expect(receivedEvent.time).to.equal(publishTime); + expect(receivedEvent.type).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); + expect(receivedEvent.source).to.equal("//pubsub.googleapis.com/projects/aProject/topics/topic"); + expect(receivedEvent.data.message.json).to.deep.equal({ hello: "world" }); + }); + // These tests pass if the transpiler works it("allows desirable syntax", () => { pubsub.onMessagePublished( @@ -193,4 +227,103 @@ describe("onMessagePublished", () => { (event: CloudEvent) => undefined ); }); + + it("should not modify a CloudEvent that already has a context", async () => { + const publishTime = new Date().toISOString(); + const message = { + messageId: "uuid", + data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), + publishTime, + }; + const data: pubsub.MessagePublishedData = { + message: message as any, + subscription: "projects/aProject/subscriptions/aSubscription", + }; + const existingContext = { + eventId: "custom-id", + timestamp: publishTime, + eventType: "custom.type", + resource: "custom/resource", + params: {}, + }; + const event: CloudEvent = { + specversion: "1.0", + id: "uuid", + time: publishTime, + type: "google.cloud.pubsub.topic.v1.messagePublished", + source: "//pubsub.googleapis.com/projects/aProject/topics/topic", + data, + context: existingContext as any, + }; + + let receivedEvent: CloudEvent>; + const func = pubsub.onMessagePublished("topic", (e) => { + receivedEvent = e; + }); + + await func(event); + + expect(receivedEvent.context).to.deep.equal(existingContext); + }); + + it("should use GCLOUD_PROJECT as fallback for resource name", async () => { + const publishTime = new Date().toISOString(); + const message = { + messageId: "uuid", + data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), + publishTime, + }; + const data: pubsub.MessagePublishedData = { + message: message as any, + subscription: "projects/aProject/subscriptions/aSubscription", + }; + const event: CloudEvent = { + specversion: "1.0", + id: "uuid", + time: publishTime, + type: "google.cloud.pubsub.topic.v1.messagePublished", + source: "//pubsub.googleapis.com/topics/topic", // Malformed source + data, + }; + + let receivedEvent: CloudEvent>; + const func = pubsub.onMessagePublished("topic", (e) => { + receivedEvent = e; + }); + + await func(event); + + expect(receivedEvent.context.resource.name).to.equal("projects/aProject/topics/topic"); + }); + + it("should use 'unknown-project' as fallback for resource name", async () => { + delete process.env.GCLOUD_PROJECT; + const publishTime = new Date().toISOString(); + const message = { + messageId: "uuid", + data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), + publishTime, + }; + const data: pubsub.MessagePublishedData = { + message: message as any, + subscription: "projects/aProject/subscriptions/aSubscription", + }; + const event: CloudEvent = { + specversion: "1.0", + id: "uuid", + time: publishTime, + type: "google.cloud.pubsub.topic.v1.messagePublished", + source: "//pubsub.googleapis.com/topics/topic", // Malformed source + data, + }; + + let receivedEvent: CloudEvent>; + const func = pubsub.onMessagePublished("topic", (e) => { + receivedEvent = e; + }); + + await func(event); + + expect(receivedEvent.context.resource.name).to.equal("project/unknown-project/topics/topic"); + }); }); diff --git a/src/v2/core.ts b/src/v2/core.ts index 26987637d..a898577c1 100644 --- a/src/v2/core.ts +++ b/src/v2/core.ts @@ -25,6 +25,7 @@ * @packageDocumentation */ +import type { EventContext } from "../v1/cloud-functions"; import { Change } from "../common/change"; import { ManifestEndpoint } from "../runtime/manifest"; @@ -91,6 +92,13 @@ export interface CloudEvent { /** Information about this specific event. */ data: T; + + /** V1- compatible context of this event. + * + * This getter is added at runtime for V1 compatibility. + * May be undefined it not set by a provider + */ + readonly context?: EventContext; } /** diff --git a/src/v2/providers/pubsub.ts b/src/v2/providers/pubsub.ts index 5ae982185..d8c0878ab 100644 --- a/src/v2/providers/pubsub.ts +++ b/src/v2/providers/pubsub.ts @@ -34,6 +34,7 @@ import { Expression } from "../../params"; import * as options from "../options"; import { SecretParam } from "../../params/types"; import { withInit } from "../../common/onInit"; +import type { EventContext, Resource } from "../../v1/cloud-functions"; /** * Google Cloud Pub/Sub is a globally distributed message bus that automatically scales as you need it. @@ -304,7 +305,9 @@ export function onMessagePublished( subscription: string; }; messagePublishedData.message = new Message(messagePublishedData.message); - return wrapTraceContext(withInit(handler))(raw as CloudEvent>); + const event = raw as CloudEvent>; + attachPubSubContext(event, topic); + return wrapTraceContext(withInit(handler))(event); }; func.run = handler; @@ -353,3 +356,66 @@ export function onMessagePublished( return func; } + +/** + * @internal + * + * Adds a v1-style context to the event. + * + * @param event - The event to add the context to. + * @param topic - The topic the event is for. + */ +function attachPubSubContext(event: CloudEvent>, topic: string) { + if ("context" in event && event.context) { + return; + } + + const resourceName = getResourceName(event, topic); + const resource: Resource = { + + service: "pubsub.googleapis.com", + name: resourceName ?? "", + + }; + + const context: EventContext = { + eventId: event.id, + timestamp: event.time, + resource, + eventType: "google.cloud.pubsub.topic.v1.messagePublished", + params: {} + + }; + + Object.defineProperty(event, "context", { + + get: () => context, + enumerable: false, + configurable: false, + + }); +} + +/** + * @internal + * + * Extracts the resource name from the event source. + * + * @param event - The event to extract the resource name from. + * @param topic - The topic the event is for. + * @returns The resource name. + */ +function getResourceName(event: CloudEvent>, topic: string) { + const match = event.source?.match(/projects\/([^/]+)\/topics\/([^/]+)/); + const project = + match?.[1] ?? process.env.GCLOUD_PROJECT ?? process.env.GCLOUD_PROJECT_ID ?? process.env.PROJECT_ID; + + const topicName = match?.[2] ?? topic; + + if (!project) { + return `project/unknown-project/topics/${topicName}`; + } + + return `projects/${project}/topics/${topicName}`; + +} \ No newline at end of file From 30e6e107e3c033ec17016d6da41e0b20d170054e Mon Sep 17 00:00:00 2001 From: Varun Shetty Date: Thu, 4 Dec 2025 19:40:15 +0000 Subject: [PATCH 2/3] Addressing gemini review comments --- spec/v2/providers/pubsub.spec.ts | 13 ++++++++++++- src/v2/providers/pubsub.ts | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/spec/v2/providers/pubsub.spec.ts b/spec/v2/providers/pubsub.spec.ts index aa87187e0..0c8e4cf6e 100644 --- a/spec/v2/providers/pubsub.spec.ts +++ b/spec/v2/providers/pubsub.spec.ts @@ -202,6 +202,14 @@ describe("onMessagePublished", () => { expect(receivedEvent.type).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); expect(receivedEvent.source).to.equal("//pubsub.googleapis.com/projects/aProject/topics/topic"); expect(receivedEvent.data.message.json).to.deep.equal({ hello: "world" }); + expect(receivedEvent.context).to.exist; + expect(receivedEvent.context.eventId).to.equal("uuid"); + expect(receivedEvent.context.timestamp).to.equal(publishTime); + expect(receivedEvent.context.eventType).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); + expect(receivedEvent.context.resource).to.deep.equal({ + service: "pubsub.googleapis.com", + name: "projects/aProject/topics/topic", + }); }); // These tests pass if the transpiler works @@ -228,6 +236,7 @@ describe("onMessagePublished", () => { ); }); + //Test case to ensure Idempotency. makes things dont break if there is already context present it("should not modify a CloudEvent that already has a context", async () => { const publishTime = new Date().toISOString(); const message = { @@ -266,6 +275,8 @@ describe("onMessagePublished", () => { expect(receivedEvent.context).to.deep.equal(existingContext); }); + + //Test case to ensure GCLOUD_PROJECT is used as fallback for resource name it("should use GCLOUD_PROJECT as fallback for resource name", async () => { const publishTime = new Date().toISOString(); const message = { @@ -324,6 +335,6 @@ describe("onMessagePublished", () => { await func(event); - expect(receivedEvent.context.resource.name).to.equal("project/unknown-project/topics/topic"); + expect(receivedEvent.context.resource.name).to.equal("projects/unknown-project/topics/topic"); }); }); diff --git a/src/v2/providers/pubsub.ts b/src/v2/providers/pubsub.ts index d8c0878ab..2455ee1d6 100644 --- a/src/v2/providers/pubsub.ts +++ b/src/v2/providers/pubsub.ts @@ -413,7 +413,7 @@ function getResourceName(event: CloudEvent>, topic: st const topicName = match?.[2] ?? topic; if (!project) { - return `project/unknown-project/topics/${topicName}`; + return `projects/unknown-project/topics/${topicName}`; } return `projects/${project}/topics/${topicName}`; From 7222aad86d7fe096af3fab21b6512b2e0c34e80a Mon Sep 17 00:00:00 2001 From: Varun Shetty Date: Fri, 5 Dec 2025 23:08:34 +0000 Subject: [PATCH 3/3] Addressing the PR comments --- spec/v2/providers/pubsub.spec.ts | 93 +++++--------------------------- src/v2/providers/pubsub.ts | 12 +++-- 2 files changed, 20 insertions(+), 85 deletions(-) diff --git a/spec/v2/providers/pubsub.spec.ts b/spec/v2/providers/pubsub.spec.ts index 0c8e4cf6e..59e6c5313 100644 --- a/spec/v2/providers/pubsub.spec.ts +++ b/spec/v2/providers/pubsub.spec.ts @@ -170,15 +170,15 @@ describe("onMessagePublished", () => { expect(json).to.deep.equal({ hello: "world" }); }); - it("should construct a CloudEvent with the correct context", async () => { + it("should construct a CloudEvent with the correct context and message", async () => { const publishTime = new Date().toISOString(); - const message = { + const messagePayload = { messageId: "uuid", data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), publishTime, }; const data: pubsub.MessagePublishedData = { - message: message as any, + message: messagePayload as any, subscription: "projects/aProject/subscriptions/aSubscription", }; const event: CloudEvent = { @@ -190,23 +190,20 @@ describe("onMessagePublished", () => { data, }; - let receivedEvent: CloudEvent>; + let destructuredMessage: pubsub.Message; + let context: any; const func = pubsub.onMessagePublished("topic", (e) => { - receivedEvent = e; + ({ message: destructuredMessage, context } = e as any); }); await func(event); - expect(receivedEvent.id).to.equal("uuid"); - expect(receivedEvent.time).to.equal(publishTime); - expect(receivedEvent.type).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); - expect(receivedEvent.source).to.equal("//pubsub.googleapis.com/projects/aProject/topics/topic"); - expect(receivedEvent.data.message.json).to.deep.equal({ hello: "world" }); - expect(receivedEvent.context).to.exist; - expect(receivedEvent.context.eventId).to.equal("uuid"); - expect(receivedEvent.context.timestamp).to.equal(publishTime); - expect(receivedEvent.context.eventType).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); - expect(receivedEvent.context.resource).to.deep.equal({ + expect(destructuredMessage.json).to.deep.equal({ hello: "world" }); + expect(context).to.exist; + expect(context.eventId).to.equal("uuid"); + expect(context.timestamp).to.equal(publishTime); + expect(context.eventType).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); + expect(context.resource).to.deep.equal({ service: "pubsub.googleapis.com", name: "projects/aProject/topics/topic", }); @@ -236,77 +233,11 @@ describe("onMessagePublished", () => { ); }); - //Test case to ensure Idempotency. makes things dont break if there is already context present - it("should not modify a CloudEvent that already has a context", async () => { - const publishTime = new Date().toISOString(); - const message = { - messageId: "uuid", - data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), - publishTime, - }; - const data: pubsub.MessagePublishedData = { - message: message as any, - subscription: "projects/aProject/subscriptions/aSubscription", - }; - const existingContext = { - eventId: "custom-id", - timestamp: publishTime, - eventType: "custom.type", - resource: "custom/resource", - params: {}, - }; - const event: CloudEvent = { - specversion: "1.0", - id: "uuid", - time: publishTime, - type: "google.cloud.pubsub.topic.v1.messagePublished", - source: "//pubsub.googleapis.com/projects/aProject/topics/topic", - data, - context: existingContext as any, - }; - let receivedEvent: CloudEvent>; - const func = pubsub.onMessagePublished("topic", (e) => { - receivedEvent = e; - }); - await func(event); - expect(receivedEvent.context).to.deep.equal(existingContext); - }); - //Test case to ensure GCLOUD_PROJECT is used as fallback for resource name - it("should use GCLOUD_PROJECT as fallback for resource name", async () => { - const publishTime = new Date().toISOString(); - const message = { - messageId: "uuid", - data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), - publishTime, - }; - const data: pubsub.MessagePublishedData = { - message: message as any, - subscription: "projects/aProject/subscriptions/aSubscription", - }; - const event: CloudEvent = { - specversion: "1.0", - id: "uuid", - time: publishTime, - type: "google.cloud.pubsub.topic.v1.messagePublished", - source: "//pubsub.googleapis.com/topics/topic", // Malformed source - data, - }; - - let receivedEvent: CloudEvent>; - const func = pubsub.onMessagePublished("topic", (e) => { - receivedEvent = e; - }); - - await func(event); - - expect(receivedEvent.context.resource.name).to.equal("projects/aProject/topics/topic"); - }); - it("should use 'unknown-project' as fallback for resource name", async () => { delete process.env.GCLOUD_PROJECT; const publishTime = new Date().toISOString(); diff --git a/src/v2/providers/pubsub.ts b/src/v2/providers/pubsub.ts index 2455ee1d6..476907289 100644 --- a/src/v2/providers/pubsub.ts +++ b/src/v2/providers/pubsub.ts @@ -367,7 +367,7 @@ export function onMessagePublished( */ function attachPubSubContext(event: CloudEvent>, topic: string) { if ("context" in event && event.context) { - return; + throw new Error("Unexpected context in event."); } const resourceName = getResourceName(event, topic); @@ -394,6 +394,12 @@ function attachPubSubContext(event: CloudEvent>, topi configurable: false, }); + + Object.defineProperty(event, "message", { + get: () => (event.data as MessagePublishedData).message, + enumerable: false, + configurable: false, + }); } /** @@ -407,9 +413,7 @@ function attachPubSubContext(event: CloudEvent>, topi */ function getResourceName(event: CloudEvent>, topic: string) { const match = event.source?.match(/projects\/([^/]+)\/topics\/([^/]+)/); - const project = - match?.[1] ?? process.env.GCLOUD_PROJECT ?? process.env.GCLOUD_PROJECT_ID ?? process.env.PROJECT_ID; - + const project = match?.[1]; const topicName = match?.[2] ?? topic; if (!project) {