|
1 | | -import EventEmitter from "node:events"; |
2 | 1 | import { describe, it, beforeAll, expect } from "vitest"; |
3 | | -import { PubSub } from "@google-cloud/pubsub"; |
4 | 2 | import { firestore } from "./utils"; |
5 | 3 | import { GeoPoint } from "firebase-admin/firestore"; |
6 | 4 | const RUN_ID = String(process.env.RUN_ID); |
7 | 5 |
|
8 | | -const pubsub = new PubSub({ projectId: "cf3-integration-tests-v2-qa" }); |
9 | | -const emitter = new EventEmitter(); |
10 | | - |
11 | 6 | function waitForEvent<T = unknown>( |
12 | 7 | event: string, |
13 | 8 | trigger: () => Promise<void>, |
14 | 9 | timeoutMs: number = 60_000 |
15 | 10 | ): Promise<T> { |
16 | 11 | return new Promise<T>((resolve, reject) => { |
17 | | - emitter.on(event, (data: T) => { |
18 | | - emitter.off(event, resolve); |
19 | | - resolve(data); |
| 12 | + let timer: NodeJS.Timeout | null = null; |
| 13 | + |
| 14 | + const unsubscribe = firestore.collection(`${RUN_ID}_snapshots`).doc(event).onSnapshot(snapshot => { |
| 15 | + if (snapshot.exists) { |
| 16 | + console.log("snapshot", snapshot.data()); |
| 17 | + if (timer) clearTimeout(timer); |
| 18 | + unsubscribe(); |
| 19 | + resolve(snapshot.data() as T); |
| 20 | + } |
20 | 21 | }); |
21 | 22 |
|
22 | | - setTimeout(() => { |
23 | | - emitter.off(event, resolve); |
24 | | - reject(new Error("Timeout waiting for event: " + event)); |
| 23 | + timer = setTimeout(() => { |
| 24 | + unsubscribe(); |
| 25 | + reject(new Error(`Timeout waiting for event "${event}" after ${timeoutMs}ms`)); |
25 | 26 | }, timeoutMs); |
26 | 27 |
|
27 | | - trigger().catch(reject); |
| 28 | + trigger().then().catch(reject); |
28 | 29 | }); |
29 | 30 | } |
30 | 31 |
|
31 | | -beforeAll(async () => { |
32 | | - const topic = pubsub.topic('vitest'); |
33 | | - const subscription = topic.subscription('vitest-sub'); |
34 | | - |
35 | | - subscription.on("message", (message) => { |
36 | | - console.log("message", message.data.toString()); |
37 | | - const data = message.data.length ? JSON.parse(message.data.toString()) : null; |
38 | | - message.ack(); |
39 | | - |
40 | | - if (!("event" in data)) { |
41 | | - throw new Error("Invalid event data: " + JSON.stringify(data)); |
42 | | - } |
43 | | - |
44 | | - emitter.emit(data.event, data.data); |
45 | | - }); |
46 | | - |
47 | | - subscription.on("error", (error) => { |
48 | | - console.error("Pubsub error", error); |
49 | | - process.exit(1); |
50 | | - }); |
51 | | -}); |
52 | | - |
53 | 32 | describe("firestore.v2", () => { |
54 | 33 | describe("onDocumentCreated", () => { |
55 | 34 | let data: any; |
56 | 35 |
|
57 | 36 | beforeAll(async () => { |
58 | 37 | data = await waitForEvent("onDocumentCreated", async () => { |
| 38 | + console.log("triggering event", RUN_ID); |
59 | 39 | await firestore |
60 | 40 | .collection(RUN_ID) |
61 | 41 | .doc("onDocumentCreated") |
62 | 42 | .set({ |
63 | 43 | foo: "bar", |
64 | 44 | timestamp: new Date(), |
65 | 45 | geopoint: new GeoPoint(10, 20), |
| 46 | + }).then(() => { |
| 47 | + console.log("event triggered", RUN_ID); |
66 | 48 | }); |
67 | 49 | }); |
| 50 | + console.log("data", data); |
68 | 51 | }); |
69 | 52 |
|
70 | 53 | it("should be a CloudEvent", () => { |
|
0 commit comments