diff --git a/spec/v2/providers/pubsub.spec.ts b/spec/v2/providers/pubsub.spec.ts index d498b1b42..59e6c5313 100644 --- a/spec/v2/providers/pubsub.spec.ts +++ b/spec/v2/providers/pubsub.spec.ts @@ -170,6 +170,45 @@ describe("onMessagePublished", () => { expect(json).to.deep.equal({ hello: "world" }); }); + it("should construct a CloudEvent with the correct context and message", async () => { + const publishTime = new Date().toISOString(); + const messagePayload = { + messageId: "uuid", + data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), + publishTime, + }; + const data: pubsub.MessagePublishedData = { + message: messagePayload as any, + subscription: "projects/aProject/subscriptions/aSubscription", + }; + const event: CloudEvent = { + specversion: "1.0", + id: "uuid", + time: publishTime, + type: "google.cloud.pubsub.topic.v1.messagePublished", + source: "//pubsub.googleapis.com/projects/aProject/topics/topic", + data, + }; + + let destructuredMessage: pubsub.Message; + let context: any; + const func = pubsub.onMessagePublished("topic", (e) => { + ({ message: destructuredMessage, context } = e as any); + }); + + await func(event); + + expect(destructuredMessage.json).to.deep.equal({ hello: "world" }); + expect(context).to.exist; + expect(context.eventId).to.equal("uuid"); + expect(context.timestamp).to.equal(publishTime); + expect(context.eventType).to.equal("google.cloud.pubsub.topic.v1.messagePublished"); + expect(context.resource).to.deep.equal({ + service: "pubsub.googleapis.com", + name: "projects/aProject/topics/topic", + }); + }); + // These tests pass if the transpiler works it("allows desirable syntax", () => { pubsub.onMessagePublished( @@ -193,4 +232,40 @@ describe("onMessagePublished", () => { (event: CloudEvent) => undefined ); }); + + + + + + + it("should use 'unknown-project' as fallback for resource name", async () => { + delete process.env.GCLOUD_PROJECT; + const publishTime = new Date().toISOString(); + const message = { + messageId: "uuid", + data: Buffer.from(JSON.stringify({ hello: "world" })).toString("base64"), + publishTime, + }; + const data: pubsub.MessagePublishedData = { + message: message as any, + subscription: "projects/aProject/subscriptions/aSubscription", + }; + const event: CloudEvent = { + specversion: "1.0", + id: "uuid", + time: publishTime, + type: "google.cloud.pubsub.topic.v1.messagePublished", + source: "//pubsub.googleapis.com/topics/topic", // Malformed source + data, + }; + + let receivedEvent: CloudEvent>; + const func = pubsub.onMessagePublished("topic", (e) => { + receivedEvent = e; + }); + + await func(event); + + expect(receivedEvent.context.resource.name).to.equal("projects/unknown-project/topics/topic"); + }); }); diff --git a/src/v2/core.ts b/src/v2/core.ts index 26987637d..a898577c1 100644 --- a/src/v2/core.ts +++ b/src/v2/core.ts @@ -25,6 +25,7 @@ * @packageDocumentation */ +import type { EventContext } from "../v1/cloud-functions"; import { Change } from "../common/change"; import { ManifestEndpoint } from "../runtime/manifest"; @@ -91,6 +92,13 @@ export interface CloudEvent { /** Information about this specific event. */ data: T; + + /** V1- compatible context of this event. + * + * This getter is added at runtime for V1 compatibility. + * May be undefined it not set by a provider + */ + readonly context?: EventContext; } /** diff --git a/src/v2/providers/pubsub.ts b/src/v2/providers/pubsub.ts index 5ae982185..476907289 100644 --- a/src/v2/providers/pubsub.ts +++ b/src/v2/providers/pubsub.ts @@ -34,6 +34,7 @@ import { Expression } from "../../params"; import * as options from "../options"; import { SecretParam } from "../../params/types"; import { withInit } from "../../common/onInit"; +import type { EventContext, Resource } from "../../v1/cloud-functions"; /** * Google Cloud Pub/Sub is a globally distributed message bus that automatically scales as you need it. @@ -304,7 +305,9 @@ export function onMessagePublished( subscription: string; }; messagePublishedData.message = new Message(messagePublishedData.message); - return wrapTraceContext(withInit(handler))(raw as CloudEvent>); + const event = raw as CloudEvent>; + attachPubSubContext(event, topic); + return wrapTraceContext(withInit(handler))(event); }; func.run = handler; @@ -353,3 +356,70 @@ export function onMessagePublished( return func; } + +/** + * @internal + * + * Adds a v1-style context to the event. + * + * @param event - The event to add the context to. + * @param topic - The topic the event is for. + */ +function attachPubSubContext(event: CloudEvent>, topic: string) { + if ("context" in event && event.context) { + throw new Error("Unexpected context in event."); + } + + const resourceName = getResourceName(event, topic); + const resource: Resource = { + + service: "pubsub.googleapis.com", + name: resourceName ?? "", + + }; + + const context: EventContext = { + eventId: event.id, + timestamp: event.time, + resource, + eventType: "google.cloud.pubsub.topic.v1.messagePublished", + params: {} + + }; + + Object.defineProperty(event, "context", { + + get: () => context, + enumerable: false, + configurable: false, + + }); + + Object.defineProperty(event, "message", { + get: () => (event.data as MessagePublishedData).message, + enumerable: false, + configurable: false, + }); +} + +/** + * @internal + * + * Extracts the resource name from the event source. + * + * @param event - The event to extract the resource name from. + * @param topic - The topic the event is for. + * @returns The resource name. + */ +function getResourceName(event: CloudEvent>, topic: string) { + const match = event.source?.match(/projects\/([^/]+)\/topics\/([^/]+)/); + const project = match?.[1]; + const topicName = match?.[2] ?? topic; + + if (!project) { + return `projects/unknown-project/topics/${topicName}`; + } + + return `projects/${project}/topics/${topicName}`; + +} \ No newline at end of file