Skip to content

Bugfix/Redis Pub Sub Streaming#5884

Draft
HenryHengZJ wants to merge 1 commit intomainfrom
bugfix/Redis-PubSub-Streaming
Draft

Bugfix/Redis Pub Sub Streaming#5884
HenryHengZJ wants to merge 1 commit intomainfrom
bugfix/Redis-PubSub-Streaming

Conversation

@HenryHengZJ
Copy link
Contributor

Redis Pub/Sub subscription leak causing SSE streaming failure in QUEUE mode

Summary

Fix critical Redis Pub/Sub subscription leak and multiple SSE streaming reliability issues that cause streaming to fail after hours of sustained usage in QUEUE mode.

Problem

When running Flowise in QUEUE mode, SSE streaming progressively degrades and eventually stops working entirely after approximately one day of usage. Events return empty, and only a fresh deployment resolves the issue temporarily.

Root Cause Analysis

Three compounding issues were identified:

1. Redis Pub/Sub channel subscriptions never unsubscribed (PRIMARY)

Each SSE streaming request subscribes the Redis subscriber client to a unique chatId channel via redisSubscriber.subscribe(chatId). When the request completes, sseStreamer.removeClient(chatId) cleans up the SSE client — but the Redis channel subscription persists forever. The subscribedChannels Set and the underlying Redis SUBSCRIBE list grow without bound.

Over ~24 hours with thousands of requests, the subscriber accumulates thousands of dead channel subscriptions. Redis enforces client-output-buffer-limit for pubsub clients (default: pubsub 32mb 8mb 60). When messages flowing to dead channels fill this buffer, Redis forcefully disconnects the subscriber client, causing all SSE streaming to fail globally.

2. No client disconnect detection

When SSE clients disconnect prematurely (browser close, network drop, ALB idle timeout at 2000s), the server never learns about it. There are no res.on('close') handlers, so both the SSE client object and Redis subscription leak until the server process restarts.

3. Unsafe writes to closed SSE connections

SSEStreamer.removeClient() calls response.write() followed by response.end(), then delete this.clients[chatId]. If the client already disconnected, response.write() throws an error, and the delete line never executes — causing the client object to leak permanently in the clients dictionary.

Changes

packages/server/src/queue/RedisEventSubscriber.ts

  • Add unsubscribe(channel) method that calls this.redisSubscriber.unsubscribe(channel) and removes from subscribedChannels Set
  • Add getSubscriptionCount() for observability
  • Add startPeriodicCleanup() — every 60s, unsubscribes channels with no active SSE client as a safety net
  • Update disconnect() to clear the cleanup interval

packages/server/src/controllers/predictions/index.ts

  • Add res.on('close') handler to detect client disconnects (browser close, ALB timeout, network drop)
  • Call redisSubscriber.unsubscribe(chatId) in both the close handler and finally block to ensure cleanup

packages/server/src/controllers/internal-predictions/index.ts

  • Same cleanup pattern as the external predictions controller

packages/server/src/utils/SSEStreamer.ts

  • Add safeWrite() helper that wraps response.write() in try/catch and auto-removes dead clients on failure
  • Wrap removeClient() in try/finally so delete this.clients[chatId] always executes even if response.write() throws
  • Same pattern for streamTTSAbortEvent() which also calls write+end+delete
  • Use safeWrite() in all streamXxxEvent methods
  • Add startHeartbeat() / stopHeartbeat() — sends :heartbeat\n\n (SSE comment) to all active clients every 30s to keep ALB connections alive and detect dead clients early

packages/server/src/queue/RedisEventPublisher.ts

  • Add safePublish() helper that checks isReady before publishing, awaits the publish call (was fire-and-forget), and uses logger instead of console.error
  • Logs a warning when Redis is disconnected instead of silently swallowing errors
  • All stream methods use safePublish — reduces boilerplate significantly

packages/server/src/index.ts

  • Start SSE heartbeat after SSEStreamer initialization
  • Start periodic subscription cleanup after Redis subscriber connects
  • Stop heartbeat during graceful shutdown

Test plan

  • Deploy to staging and verify SSE streaming works end-to-end for both internal and external prediction endpoints
  • Verify heartbeat comments (:heartbeat) appear in SSE stream every 30s when inspecting the raw event stream
  • Monitor [RedisEventSubscriber] log lines in CloudWatch — subscription count should stay bounded proportional to concurrent active streams
  • Confirm periodic cleanup logs (Periodic cleanup: removed N stale subscriptions) appear when stale subscriptions exist
  • Monitor Redis PUBSUB NUMSUB and CLIENT LIST TYPE pubsub via db-proxy to verify subscription count stays low
  • Run for 24+ hours under normal traffic load and confirm SSE streaming continues to work without degradation
  • Simulate client disconnect (close browser mid-stream) and verify cleanup occurs (check logs and subscription count)
  • Check ElastiCache CloudWatch BytesUsedForCache trends stable/downward compared to pre-fix baseline

Diagnostics (for verifying the fix in production)

SSH into the db-proxy and run these commands to monitor Redis pub/sub health:

# Check total subscribed channels (should stay low and bounded)
redis-cli -h localhost -a '<password>' INFO stats | grep pubsub

# See subscriber client's output buffer usage
redis-cli -h localhost -a '<password>' CLIENT LIST TYPE pubsub

# Check buffer limit configuration
redis-cli -h localhost -a '<password>' CONFIG GET client-output-buffer-limit

… mechanisms

- Added heartbeat functionality to SSEStreamer for improved client connection management.
- Implemented periodic cleanup in RedisEventSubscriber to remove stale subscriptions.
- Updated internal prediction and prediction controllers to handle client disconnects and manage subscriptions more effectively.
- Refactored RedisEventPublisher to streamline event publishing with error handling.

These changes improve the reliability and performance of server-side event streaming and Redis event handling.
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses critical reliability issues in SSE streaming, particularly when Flowise operates in QUEUE mode. The changes focus on preventing resource leaks, such as unmanaged Redis Pub/Sub subscriptions and stale SSE client connections, which previously led to progressive degradation and eventual failure of streaming services. By implementing robust cleanup mechanisms, client disconnect detection, and safe communication patterns, the update significantly enhances the stability and longevity of SSE streaming under sustained usage.

Highlights

  • Redis Pub/Sub Subscription Management: Implemented explicit unsubscription for Redis Pub/Sub channels and introduced a periodic cleanup mechanism to prevent subscription leaks, which was a primary cause of streaming failures.
  • Client Disconnect Detection: Added res.on('close') handlers in prediction controllers to detect premature client disconnects (e.g., browser close, network drops) and trigger immediate cleanup of SSE clients and Redis subscriptions.
  • Robust SSE Streaming Writes: Introduced a safeWrite() helper in SSEStreamer to gracefully handle writes to potentially closed SSE connections, automatically removing dead clients and preventing errors. All streaming methods now utilize this safe write mechanism.
  • SSE Heartbeat Mechanism: Added a periodic heartbeat (:heartbeat) to active SSE connections to keep Application Load Balancer (ALB) connections alive and detect unresponsive clients earlier, improving streaming reliability.
  • Safe Redis Publishing: Implemented a safePublish() helper in RedisEventPublisher to ensure messages are only published when the Redis client is ready, awaiting publish calls, and logging errors more robustly.
  • Application Lifecycle Integration: Integrated the SSE heartbeat and Redis subscription cleanup into the application's startup and graceful shutdown processes.
Changelog
  • packages/server/src/controllers/internal-predictions/index.ts
    • Added res.on('close') event handler to detect client disconnections.
    • Ensured redisSubscriber.unsubscribe(chatId) is called in both the close handler and the finally block for comprehensive cleanup.
  • packages/server/src/controllers/predictions/index.ts
    • Added res.on('close') event handler to detect client disconnections.
    • Ensured redisSubscriber.unsubscribe(chatId) is called in both the close handler and the finally block for comprehensive cleanup.
  • packages/server/src/index.ts
    • Initiated the SSE heartbeat after SSEStreamer initialization.
    • Started the periodic Redis subscription cleanup after the Redis subscriber connects.
    • Stopped the SSE heartbeat during the application's graceful shutdown.
  • packages/server/src/queue/RedisEventPublisher.ts
    • Implemented safePublish() to check Redis client readiness, await publish calls, and use logger for error reporting.
    • Converted all streaming methods to use the safePublish() helper, reducing boilerplate and improving error handling.
    • Replaced console.error with logger.error for consistent logging.
  • packages/server/src/queue/RedisEventSubscriber.ts
    • Added unsubscribe(channel) method to explicitly remove Redis channel subscriptions.
    • Included getSubscriptionCount() for monitoring active subscriptions.
    • Implemented startPeriodicCleanup() to automatically unsubscribe from channels without active SSE clients every 60 seconds.
    • Updated disconnect() to clear the periodic cleanup interval.
  • packages/server/src/utils/SSEStreamer.ts
    • Introduced safeWrite() helper to wrap response.write() with error handling and automatic client removal.
    • Modified removeClient() and streamTTSAbortEvent() to use try/finally blocks, ensuring client deletion even if response.write() fails.
    • Refactored all streamXxxEvent methods to utilize the new safeWrite() helper.
    • Added startHeartbeat() and stopHeartbeat() methods to send periodic SSE comments to keep connections alive.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request provides a comprehensive fix for a critical Redis Pub/Sub subscription leak and improves SSE streaming reliability by introducing proper cleanup for subscriptions, handling client disconnects, and ensuring safe writes to SSE streams. However, a security audit identified critical and high-severity vulnerabilities in client connection management: the use of user-controlled chatId as a key in a plain JavaScript object exposes the server to Prototype Pollution, potentially leading to server-wide information disclosure, and the lack of uniqueness and ownership checks for chatId allows for SSE session hijacking. It is strongly recommended to use a Map for client management and to generate chatIds server-side using a secure random generator to mitigate these risks. Additionally, there are suggestions to further improve code clarity and maintainability by reducing code duplication and adopting safer iteration patterns.

Comment on lines +29 to 30
private safeWrite(chatId: string, data: string): boolean {
const client = this.clients[chatId]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

The safeWrite method performs a property lookup on the this.clients object using the chatId parameter, which is directly controlled by the user. Because clients is initialized as a plain JavaScript object ({}), an attacker can provide __proto__ as the chatId in a request. This pollutes the prototype of the clients object.

Consequently, any subsequent lookup for a non-existent or disconnected chatId (e.g., during event streaming or periodic cleanup) will return the attacker's connection from the prototype instead of undefined. This leads to a critical server-wide information leak where chat events intended for other users are routed to the attacker's connection. To remediate this, initialize this.clients using Object.create(null) or, preferably, use a Map<string, Client> object which is not susceptible to prototype pollution.

Comment on lines 88 to 89
const apiResponse = await predictionsServices.buildChatflow(req)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The chatId used to identify and route Server-Sent Events (SSE) is taken directly from the user-supplied request body without any uniqueness or ownership validation. An attacker can provide a chatId that is already in use by another user's active session. This will overwrite the victim's connection in the server's internal clients map. As a result, any subsequent events generated for the victim's chat session will be routed to the attacker's connection, allowing for session hijacking. The server should be responsible for generating unique, cryptographically secure chatIds (e.g., using uuidv4()) and returning them to the client, rather than allowing the client to provide them.

Comment on lines 58 to 59
const apiResponse = await utilBuildChatflow(req, true)
sseStreamer.streamMetadataEvent(apiResponse.chatId, apiResponse)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

Similar to the external predictions controller, the chatId here is user-controlled and used to route SSE events. An attacker with access to the internal API can hijack another user's chat stream within the same workspace by providing a colliding chatId. This allows the attacker to receive real-time chat events intended for another user. Ensure that chatId is generated server-side or validated for uniqueness and ownership before being used to establish an SSE connection.

Comment on lines +67 to +69
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency, you can use the isQueueMode constant defined on line 45. Also, the cleanup logic is duplicated from the res.on('close') handler. It would be cleaner to extract this into a shared function to avoid duplication and potential future bugs if one place is updated and the other is not.

Suggested change
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}

Comment on lines +97 to +99
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For consistency, you can use the isQueueMode constant defined on line 75. Also, the cleanup logic is duplicated from the res.on('close') handler. It would be cleaner to extract this into a shared function to avoid duplication and potential future bugs if one place is updated and the other is not.

Suggested change
if (process.env.MODE === MODE.QUEUE) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}
if (isQueueMode) {
getRunningExpressApp().redisSubscriber.unsubscribe(chatId)
}

Comment on lines +127 to +138
let cleaned = 0
for (const channel of this.subscribedChannels) {
if (!this.sseStreamer.clients[channel]) {
this.unsubscribe(channel)
cleaned++
}
}
if (cleaned > 0) {
logger.info(
`[RedisEventSubscriber] Periodic cleanup: removed ${cleaned} stale subscriptions. Remaining: ${this.subscribedChannels.size}`
)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Modifying a collection (this.subscribedChannels) while iterating over it can be a source of bugs, even though it might be safe for Set in this specific case. A clearer and safer pattern is to first collect the items to be removed and then perform the removal in a separate loop. This improves readability and avoids potential issues.

            const staleChannels = Array.from(this.subscribedChannels).filter(
                (channel) => !this.sseStreamer.clients[channel]
            )
            if (staleChannels.length > 0) {
                for (const channel of staleChannels) {
                    this.unsubscribe(channel)
                }
                logger.info(
                    `[RedisEventSubscriber] Periodic cleanup: removed ${staleChannels.length} stale subscriptions. Remaining: ${this.subscribedChannels.size}`
                )
            }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

THE SAME ISSUE AS LAST WEEK: Chat window no longer produces any output. No process flow or response is visible.

1 participant