Skip to content

Commit 9220e48

Browse files
authored
Merge pull request #58 from XMOJ-Script-dev/codex/implement-websocket-notifications
2 parents 32fa730 + b7190c6 commit 9220e48

7 files changed

Lines changed: 551 additions & 8 deletions

File tree

Source/NotificationManager.ts

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
/*
2+
* Copyright (C) 2023-2025 XMOJ-bbs contributors
3+
* This file is part of XMOJ-bbs.
4+
* XMOJ-bbs is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* XMOJ-bbs is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with XMOJ-bbs. If not, see <https://www.gnu.org/licenses/>.
16+
*/
17+
18+
/**
19+
* Durable Object used to manage notification WebSocket sessions per user.
20+
*
21+
* This implementation uses the WebSocket Hibernation API via
22+
* `state.acceptWebSocket(...)` so idle websocket connections do not keep the DO
23+
* actively running.
24+
*/
25+
interface NotificationAttachment {
26+
userId: string;
27+
connectedAt: number;
28+
}
29+
30+
interface HibernationWebSocket extends WebSocket {
31+
serializeAttachment: (value: NotificationAttachment) => void;
32+
deserializeAttachment: () => NotificationAttachment | null;
33+
}
34+
35+
interface NotificationEnvironment {
36+
NOTIFICATION_PUSH_TOKEN?: string;
37+
}
38+
39+
export class NotificationManager {
40+
private readonly state: DurableObjectState;
41+
private readonly sessions: Map<string, Set<WebSocket>>;
42+
private readonly pushToken: string;
43+
private static readonly MAX_SESSIONS_PER_USER = 20;
44+
45+
constructor(state: DurableObjectState, env: NotificationEnvironment) {
46+
this.state = state;
47+
this.sessions = new Map<string, Set<WebSocket>>();
48+
this.pushToken = env.NOTIFICATION_PUSH_TOKEN || "";
49+
// `state.getWebSockets()` is synchronous in the current Cloudflare runtime.
50+
this.rebuildSessionIndex();
51+
}
52+
53+
/**
54+
* Rebuild in-memory session index from hibernated sockets on cold start.
55+
*/
56+
private rebuildSessionIndex(): void {
57+
for (const websocket of this.state.getWebSockets()) {
58+
const userId = this.getSocketUserId(websocket);
59+
if (!userId) {
60+
continue;
61+
}
62+
this.addSession(userId, websocket);
63+
}
64+
}
65+
66+
/**
67+
* Store a socket in the per-user set (supports multi-tab / multi-device).
68+
*
69+
* To avoid abuse, each user is capped at MAX_SESSIONS_PER_USER sockets. When
70+
* exceeded, the oldest socket is closed and removed.
71+
*/
72+
private addSession(userId: string, websocket: WebSocket): void {
73+
let userSessions = this.sessions.get(userId);
74+
if (!userSessions) {
75+
userSessions = new Set<WebSocket>();
76+
this.sessions.set(userId, userSessions);
77+
}
78+
79+
userSessions.add(websocket);
80+
while (userSessions.size > NotificationManager.MAX_SESSIONS_PER_USER) {
81+
const oldestSession = userSessions.values().next().value as WebSocket | undefined;
82+
if (!oldestSession) {
83+
break;
84+
}
85+
this.removeSession(userId, oldestSession);
86+
try {
87+
oldestSession.close(1008, "Too many websocket sessions");
88+
} catch (_) {
89+
// Best effort close.
90+
}
91+
}
92+
}
93+
94+
/**
95+
* Remove a socket from the in-memory index and cleanup empty user entries.
96+
*/
97+
private removeSession(userId: string, websocket: WebSocket): void {
98+
const userSessions = this.sessions.get(userId);
99+
if (!userSessions) {
100+
return;
101+
}
102+
103+
userSessions.delete(websocket);
104+
if (userSessions.size === 0) {
105+
this.sessions.delete(userId);
106+
}
107+
}
108+
109+
/**
110+
* Read the socket's bound user ID from hibernation attachment metadata.
111+
*/
112+
private getSocketUserId(websocket: WebSocket): string {
113+
try {
114+
const attachment = (websocket as HibernationWebSocket).deserializeAttachment();
115+
if (attachment && attachment.userId !== "") {
116+
return attachment.userId;
117+
}
118+
} catch (_) {
119+
// Ignore attachment parse failures and treat socket as anonymous.
120+
}
121+
return "";
122+
}
123+
124+
async fetch(request: Request): Promise<Response> {
125+
const url = new URL(request.url);
126+
127+
// Internal push channel from Process.ts.
128+
if (url.pathname === "/notify") {
129+
if (this.pushToken === "" || request.headers.get("X-Notification-Token") !== this.pushToken) {
130+
return new Response("Unauthorized", {status: 401});
131+
}
132+
133+
const body = await request.json() as { userId: string; notification: object };
134+
const userSessions = this.sessions.get(body.userId);
135+
if (userSessions) {
136+
const payload = JSON.stringify(body.notification);
137+
for (const websocket of userSessions) {
138+
if (websocket.readyState === 1) {
139+
websocket.send(payload);
140+
}
141+
}
142+
}
143+
return new Response("OK");
144+
}
145+
146+
const upgradeHeader = request.headers.get("Upgrade");
147+
if (upgradeHeader !== "websocket") {
148+
return new Response("Expected WebSocket", {status: 426});
149+
}
150+
151+
const userId = url.searchParams.get("userId");
152+
if (!userId) {
153+
return new Response("Missing userId", {status: 400});
154+
}
155+
156+
const pair = new WebSocketPair();
157+
const [client, server] = Object.values(pair);
158+
159+
// Hibernation API: allow DO to sleep while websocket is idle.
160+
this.state.acceptWebSocket(server);
161+
(server as HibernationWebSocket).serializeAttachment({
162+
userId,
163+
connectedAt: Date.now()
164+
});
165+
this.addSession(userId, server);
166+
167+
server.send(JSON.stringify({
168+
type: "connected",
169+
timestamp: Date.now()
170+
}));
171+
172+
return new Response(null, {status: 101, webSocket: client});
173+
}
174+
175+
webSocketMessage(websocket: WebSocket, message: string | ArrayBuffer): void {
176+
try {
177+
const parsedMessage = JSON.parse(typeof message === "string" ? message : new TextDecoder().decode(message));
178+
if (parsedMessage.type === "ping") {
179+
websocket.send(JSON.stringify({type: "pong"}));
180+
}
181+
} catch (_) {
182+
// Ignore malformed client messages to keep the connection alive.
183+
}
184+
}
185+
186+
webSocketClose(websocket: WebSocket): void {
187+
const userId = this.getSocketUserId(websocket);
188+
if (userId !== "") {
189+
this.removeSession(userId, websocket);
190+
}
191+
}
192+
193+
webSocketError(websocket: WebSocket): void {
194+
const userId = this.getSocketUserId(websocket);
195+
if (userId !== "") {
196+
this.removeSession(userId, websocket);
197+
}
198+
199+
try {
200+
websocket.close(1011, "Socket error");
201+
} catch (_) {
202+
// Socket may already be closed by runtime/client.
203+
}
204+
}
205+
}

Source/Process.ts

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import {CheerioAPI, load} from "cheerio";
2424
import * as sqlstring from 'sqlstring';
2525
// @ts-ignore
2626
import CryptoJS from "crypto-js";
27-
import {AnalyticsEngineDataset, D1Database, D1DatabaseSession, KVNamespace} from "@cloudflare/workers-types";
27+
import {AnalyticsEngineDataset, D1Database, D1DatabaseSession, DurableObjectNamespace, KVNamespace} from "@cloudflare/workers-types";
2828

2929
interface Environment {
3030
API_TOKEN: string;
@@ -36,6 +36,8 @@ interface Environment {
3636
DB: D1Database;
3737
logdb: AnalyticsEngineDataset;
3838
AI: any;
39+
NOTIFICATIONS: DurableObjectNamespace;
40+
NOTIFICATION_PUSH_TOKEN: string;
3941
}
4042

4143
// noinspection JSUnusedLocalSymbols
@@ -64,6 +66,8 @@ export class Process {
6466
private readonly RemoteIP: string;
6567
private XMOJDatabase: Database;
6668
private readonly logs: AnalyticsEngineDataset;
69+
private readonly notifications: DurableObjectNamespace;
70+
private readonly notificationPushToken: string;
6771
private RequestData: Request;
6872
private Fetch = async (RequestURL: URL): Promise<Response> => {
6973
Output.Log("Fetch: " + RequestURL.toString());
@@ -361,48 +365,116 @@ export class Process {
361365
return result;
362366
}
363367

368+
369+
/**
370+
* Push a non-critical realtime notification to the websocket Durable Object.
371+
* Failures are intentionally swallowed to avoid affecting main request flow.
372+
*/
373+
private pushNotification = async (userId: string, notification: object): Promise<void> => {
374+
try {
375+
const id = this.notifications.idFromName(userId);
376+
const stub = this.notifications.get(id);
377+
await stub.fetch(new Request("https://dummy/notify", {
378+
method: "POST",
379+
headers: {
380+
"X-Notification-Token": this.notificationPushToken
381+
},
382+
body: JSON.stringify({userId, notification})
383+
}));
384+
} catch (_) {
385+
// Non-critical path: mention persistence already succeeded.
386+
}
387+
};
388+
364389
private AddBBSMention = async (ToUserID: string, PostID: number, ReplyID: number): Promise<void> => {
365390
if (ToUserID === this.Username) {
366391
return;
367392
}
393+
const mentionTime = new Date().getTime();
394+
let mentionID = 0;
368395
if (ThrowErrorIfFailed(await this.XMOJDatabase.GetTableSize("bbs_mention", {
369396
to_user_id: ToUserID,
370397
post_id: PostID
371398
}))["TableSize"] === 0) {
372-
ThrowErrorIfFailed(await this.XMOJDatabase.Insert("bbs_mention", {
399+
const insertResult = ThrowErrorIfFailed(await this.XMOJDatabase.Insert("bbs_mention", {
373400
to_user_id: ToUserID,
374401
post_id: PostID,
375-
bbs_mention_time: new Date().getTime(),
402+
bbs_mention_time: mentionTime,
376403
reply_id: ReplyID
377404
}));
405+
mentionID = insertResult["InsertID"];
378406
} else {
379407
ThrowErrorIfFailed(await this.XMOJDatabase.Update("bbs_mention", {
380-
bbs_mention_time: new Date().getTime()
408+
bbs_mention_time: mentionTime
381409
}, {
382410
to_user_id: ToUserID,
383411
post_id: PostID,
384412
reply_id: ReplyID
385413
}));
414+
const mentionData = ThrowErrorIfFailed(await this.XMOJDatabase.Select("bbs_mention", ["bbs_mention_id"], {
415+
to_user_id: ToUserID,
416+
post_id: PostID,
417+
reply_id: ReplyID
418+
}));
419+
if (mentionData.toString() !== "") {
420+
mentionID = mentionData[0]["bbs_mention_id"];
421+
}
386422
}
423+
424+
const postData = ThrowErrorIfFailed(await this.XMOJDatabase.Select("bbs_post", ["title"], {post_id: PostID}));
425+
const postTitle = postData.toString() === "" ? "" : postData[0]["title"];
426+
const totalRepliesBefore = (await this.RawDatabase.prepare("SELECT COUNT(*) + 1 AS position FROM bbs_reply WHERE post_id = $1 AND reply_time < (SELECT reply_time FROM bbs_reply WHERE reply_id = $2)").bind(PostID, ReplyID).run())["results"][0]["position"];
427+
const pageNumber = Math.floor(Number(totalRepliesBefore) / 15) + 1;
428+
429+
await this.pushNotification(ToUserID, {
430+
type: "bbs_mention",
431+
data: {
432+
MentionID: mentionID,
433+
PostID,
434+
ReplyID,
435+
PostTitle: postTitle,
436+
MentionTime: mentionTime,
437+
PageNumber: pageNumber
438+
}
439+
});
387440
};
388441
private AddMailMention = async (FromUserID: string, ToUserID: string): Promise<void> => {
442+
const mentionTime = new Date().getTime();
443+
let mentionID = 0;
389444
if (ThrowErrorIfFailed(await this.XMOJDatabase.GetTableSize("short_message_mention", {
390445
from_user_id: FromUserID,
391446
to_user_id: ToUserID
392447
}))["TableSize"] === 0) {
393-
ThrowErrorIfFailed(await this.XMOJDatabase.Insert("short_message_mention", {
448+
const insertResult = ThrowErrorIfFailed(await this.XMOJDatabase.Insert("short_message_mention", {
394449
from_user_id: FromUserID,
395450
to_user_id: ToUserID,
396-
mail_mention_time: new Date().getTime()
451+
mail_mention_time: mentionTime
397452
}));
453+
mentionID = insertResult["InsertID"];
398454
} else {
399455
ThrowErrorIfFailed(await this.XMOJDatabase.Update("short_message_mention", {
400-
mail_mention_time: new Date().getTime()
456+
mail_mention_time: mentionTime
401457
}, {
402458
from_user_id: FromUserID,
403459
to_user_id: ToUserID
404460
}));
461+
const mentionData = ThrowErrorIfFailed(await this.XMOJDatabase.Select("short_message_mention", ["mail_mention_id"], {
462+
from_user_id: FromUserID,
463+
to_user_id: ToUserID
464+
}));
465+
if (mentionData.toString() !== "") {
466+
mentionID = mentionData[0]["mail_mention_id"];
467+
}
405468
}
469+
470+
await this.pushNotification(ToUserID, {
471+
type: "mail_mention",
472+
data: {
473+
MentionID: mentionID,
474+
FromUserID,
475+
MentionTime: mentionTime
476+
}
477+
});
406478
};
407479
private ProcessFunctions = {
408480
NewPost: async (Data: object): Promise<Result> => {
@@ -1473,6 +1545,8 @@ export class Process {
14731545
this.AI = Environment.AI;
14741546
this.kv = Environment.kv;
14751547
this.logs = Environment.logdb;
1548+
this.notifications = Environment.NOTIFICATIONS;
1549+
this.notificationPushToken = Environment.NOTIFICATION_PUSH_TOKEN;
14761550
this.CaptchaSecretKey = Environment.CaptchaSecretKey;
14771551
this.GithubImagePAT = Environment.GithubImagePAT;
14781552
this.ACCOUNT_ID = Environment.ACCOUNT_ID;

0 commit comments

Comments
 (0)