Skip to content

Commit 1308cdd

Browse files
authored
Merge pull request #28 from join-a-groupchat/27-update-readme-to-explain-codebase
Updated ReadME Documentation
2 parents de0f6e2 + 183481e commit 1308cdd

96 files changed

Lines changed: 5095 additions & 46 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.dockerignore

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ npm-debug.log
66
.env
77
.env.local
88
.env.prod
9-
config
109

1110
# Git
1211
.git
@@ -40,4 +39,4 @@ build
4039

4140
# ignore tests
4241
tests
43-
coverage
42+
coverage
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
name: Deploy Frontend to Cloudflare Pages
2+
3+
on:
4+
workflow_dispatch:
5+
6+
jobs:
7+
deploy:
8+
runs-on: ubuntu-latest
9+
environment: production
10+
11+
steps:
12+
- name: Checkout repository
13+
uses: actions/checkout@v3
14+
15+
- name: Install Node.js
16+
uses: actions/setup-node@v3
17+
with:
18+
node-version: '20'
19+
20+
- name: Install dependencies
21+
working-directory: frontend
22+
run: npm install
23+
24+
- name: Build frontend
25+
working-directory: frontend
26+
run: npm run build
27+
28+
- name: Deploy to Cloudflare Pages
29+
uses: cloudflare/pages-action@v1
30+
with:
31+
apiToken: ${{ secrets.CLOUDFLARE_API_TOKEN }}
32+
accountId: ${{ secrets.CLOUDFLARE_ACCOUNT_ID }}
33+
projectName: rtc-frontend
34+
directory: frontend/dist

.github/workflows/push-images.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
name: Build and push images to GHCR
22

33
on:
4-
push:
5-
branches:
6-
- main
4+
workflow_dispatch:
75

86
permissions:
97
contents: read

.github/workflows/server-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@ on:
66
- staging
77
paths:
88
- 'server/**'
9-
push:
10-
branches:
11-
- staging
129

1310
jobs:
1411
test:

.github/workflows/sonarqube.yml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
name: SonarQube Scan
22

33
on:
4-
workflow_run:
5-
workflows: ["Unit Tests for Server"]
6-
types:
7-
- completed
4+
pull_request:
5+
branches:
6+
- staging
87

98
jobs:
109
scan:

README.md

Lines changed: 142 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,172 @@
1-
# prototype-rtc-js
2-
Real -time scalable group chat app with uWebSockets, Redis, and Postgres
1+
# Real Time Chat
32

4-
### How to run (Local) >>
3+
A real-time, scalable group chat application built with uWebSockets.js, Redis, and PostgreSQL. This application demonstrates enterprise-grade resilience patterns including distributed message processing, automatic failover, graceful shutdown, and robust error recovery mechanisms.
54

6-
Using code?
7-
```bash
8-
node server.js
5+
## Overview
96

10-
node consumer.js
11-
```
12-
Also run local redis instance.
7+
This application provides a high-performance real-time chat system that can scale horizontally across multiple server instances. Messages flow from WebSocket clients through Redis streams for persistence and distribution, with dedicated consumer processes handling database persistence. The architecture is designed to handle high message volumes while maintaining data integrity and system availability even when individual components fail.
8+
9+
The system consists of two main components: a WebSocket server that handles client connections and message broadcasting, and a consumer service that processes messages from Redis streams and persists them to PostgreSQL. Both components are designed to run as multiple instances, sharing workload and providing redundancy.
10+
11+
## Architecture
12+
13+
### Server Component
14+
15+
The server component (`server/server-fanout.js`) handles WebSocket connections using uWebSockets.js for high-performance, low-latency communication. When a client sends a chat message, the server processes it through a message validation and filtering pipeline, then broadcasts it locally to all connected clients on that server instance. Simultaneously, the message is published to a Redis pub/sub channel for distribution to other server instances, and added to a Redis stream for persistence and later processing by consumer instances.
16+
17+
Each server instance is assigned a unique server ID (generated using cryptographically secure random values) which is included in published messages to prevent echo loops, servers ignore messages they themselves published. The server maintains an in-memory set of connected WebSocket clients and uses Redis pub/sub to coordinate message distribution across multiple server instances, enabling horizontal scaling. When clients connect, the server loads recent message history from PostgreSQL, ensuring users see the conversation context immediately upon joining.
18+
19+
### Consumer Component
20+
21+
The consumer component (`consumer/consumer.js`) is responsible for reading messages from Redis streams and persisting them to PostgreSQL. It uses Redis Consumer Groups to enable multiple consumer instances to share the message processing workload. Each consumer instance joins a consumer group with a unique consumer name (also generated using secure random values), allowing Redis to automatically distribute messages across available consumers. This design enables horizontal scaling of the persistence layer, as message volume increases, additional consumer instances can be added to handle the load.
22+
23+
The consumer implements a sophisticated batch processing system that buffers messages in memory before writing them to the database. This approach significantly reduces database write operations and improves overall throughput. Messages are flushed to the database either when the buffer reaches a configured batch size (default 200 messages) or after a timeout period (default 200ms), whichever comes first. This dual-trigger mechanism ensures both high throughput under load and low latency for smaller message volumes.
24+
25+
## Resilience Mechanisms
26+
27+
### Redis Consumer Groups and Load Sharing
28+
29+
The application leverages Redis Consumer Groups to enable distributed message processing across multiple consumer instances. When a consumer starts, it automatically creates or joins a consumer group on the Redis stream. Each consumer instance is assigned a unique consumer name (generated using `crypto.getRandomValues`), allowing Redis to track which consumer is processing which messages. When multiple consumers are active in the same group, Redis automatically distributes incoming messages across them using a round-robin-like mechanism, ensuring balanced workload distribution.
30+
31+
The consumer uses `XREADGROUP` to read messages from the stream, specifying the consumer group name and its unique consumer identifier. Redis ensures that each message is delivered to exactly one consumer in the group, preventing duplicate processing. Messages are read in batches (configurable via `XREAD_COUNT`, default 500) with a short blocking timeout (default 10ms) to balance between latency and CPU efficiency. This design means that if you have three consumer instances running, each will process approximately one-third of the messages, and if one instance fails or is stopped, the remaining instances automatically take over its share of the workload.
32+
33+
### Failover and Message Recovery
34+
35+
The system implements comprehensive message recovery mechanisms to ensure no messages are lost even when consumer instances fail. When a consumer instance crashes or is terminated unexpectedly, messages that were delivered to it but not yet acknowledged remain in Redis as "pending" messages. The application uses Redis's `XAUTOCLAIM` command to automatically recover these pending messages when a consumer starts up or when they exceed a minimum idle time threshold (default 10 seconds).
36+
37+
On startup, each consumer instance first calls `recoverPendingMessages()`, which uses `XAUTOCLAIM` to claim pending messages that were assigned to other consumers but never acknowledged. This process iterates through all pending messages, claiming those that have been idle longer than the configured threshold, and processes them before starting normal message consumption. This ensures that messages are never permanently lost, if consumer instance A crashes while processing a batch, consumer instance B (or A after restart) will automatically claim and process those messages.
38+
39+
The recovery mechanism is designed to be non-blocking and fault-tolerant. If recovery encounters errors, it logs them but continues with normal operation, preventing a single corrupted message from blocking the entire consumer. Messages are recovered in batches (default 200 per iteration) to avoid overwhelming the system, with small delays between batches to allow other operations to proceed. This design ensures that the system can recover from failures gracefully while maintaining high availability.
40+
41+
### Batch Processing and Buffering
42+
43+
The consumer implements an intelligent batch processing system that significantly improves database write performance and reduces connection overhead. Messages are accumulated in an in-memory buffer as they arrive from Redis. The system uses two triggers to flush messages to the database: a size-based trigger (when the buffer reaches the configured batch size) and a time-based trigger (after a configured timeout period). This dual-trigger approach ensures optimal performance under varying load conditions, high message volumes trigger immediate flushes for throughput, while low volumes still get flushed periodically to maintain acceptable latency.
44+
45+
The batch processor uses `p-limit` to control concurrent flush operations, preventing database connection pool exhaustion under high load. By default, only one flush operation runs at a time (configurable via `FLUSH_CONCURRENCY`), ensuring that database writes are serialized and connection pool limits are respected. When a flush operation completes successfully, all messages in that batch are acknowledged in Redis using `XACK`, removing them from the pending entries list. If a flush operation fails (e.g., due to database connectivity issues), the messages are re-queued back into the buffer with a short delay before retry, ensuring eventual consistency.
46+
47+
The buffering strategy also includes special handling for recovered pending messages. When messages are recovered from other failed consumers, they are processed in smaller batches (100 messages) with delays between batches to prevent overwhelming the system during recovery scenarios. This careful pacing ensures that recovery doesn't impact the processing of new messages arriving in real-time.
48+
49+
### Graceful Shutdown
50+
51+
Both the server and consumer components implement comprehensive graceful shutdown mechanisms to ensure data integrity and clean resource cleanup. When a shutdown signal is received (SIGINT or SIGTERM), the consumer enters a controlled shutdown sequence. First, it sets a shutdown flag to stop accepting new messages from the main processing loop. Then, it forces a flush of any remaining messages in the buffer to ensure no data is lost. The system waits for all pending flush operations to complete using the concurrency limiter, ensuring that in-flight database writes finish before closing connections.
52+
53+
The shutdown process also handles uncaught exceptions and unhandled promise rejections, ensuring that even unexpected errors trigger a graceful shutdown sequence rather than an abrupt termination. All Redis and database connections are properly closed, and the process exits with an appropriate status code. The server component similarly handles shutdown signals, cleaning up Redis subscriptions and ensuring that WebSocket connections are properly closed. This graceful shutdown design is critical for production deployments where instances may be restarted for updates or moved between hosts, as it prevents message loss and connection leaks.
54+
55+
### Database Resilience
56+
57+
The application uses PostgreSQL connection pooling to manage database connections efficiently and handle connection failures gracefully. Both the server and consumer components create connection pools with configurable limits (default 5 connections), idle timeouts (30 seconds), and connection timeouts (20 seconds). The pool automatically manages connection lifecycle, creating new connections as needed and closing idle ones to free resources.
58+
59+
Database operations use transactions to ensure atomicity. When inserting messages, the consumer wraps the batch insert in a `BEGIN`/`COMMIT` transaction. If any error occurs during the insert, the transaction is rolled back, and the messages are re-queued into the buffer for retry. The database service also includes conflict handling using `ON CONFLICT DO NOTHING` to prevent duplicate inserts if the same message is processed multiple times (which could occur during recovery scenarios).
60+
61+
The pool includes error event handlers that log connection errors without crashing the application. If a connection fails, the pool automatically attempts to create a new connection for subsequent queries. This design ensures that temporary database connectivity issues don't cause permanent service disruption, the application continues to buffer messages and retry database operations until connectivity is restored.
62+
63+
### Error Handling and Health Monitoring
64+
65+
The application implements comprehensive error handling at multiple levels. The main processing loops in both server and consumer components are wrapped in try-catch blocks that log errors and continue operation rather than crashing. When errors occur during message processing, they are logged with full stack traces for debugging, and the system implements exponential backoff for retry operations to avoid overwhelming failing services.
66+
67+
The consumer includes a health status endpoint (via `getHealthStatus()`) that reports the operational state of all components, including Redis connection status, database connection status, current buffer size, and whether the application is running. This health information can be used by orchestration systems (like Kubernetes) to determine if an instance is healthy and ready to receive traffic. The health checks verify both connection state and operational readiness, providing accurate status for load balancers and monitoring systems.
68+
69+
Error recovery is designed to be automatic and transparent. If Redis becomes temporarily unavailable, the consumer's read operations will fail, but the main loop includes error handling that logs the issue and retries after a short delay (200ms). Similarly, if database writes fail, messages are re-queued and retried, ensuring eventual consistency. This self-healing design means that the system can recover from transient failures without manual intervention, making it suitable for production environments where network issues or service restarts are common.
70+
71+
## Technical Details
72+
73+
### Configuration
74+
75+
The application uses environment variables for configuration, loaded from `config/.env`. Key configuration options include:
76+
77+
**Redis Configuration:**
78+
- `REDIS_URL` or `REDIS_HOST`, `REDIS_PORT`, `REDIS_PWD`: Redis connection details
79+
- `STREAM_KEY`: Redis stream name (default: "chat_stream")
80+
- `CONSUMER_GROUP`: Consumer group name (default: "cg1")
81+
- `CONSUMER_NAME`: Consumer instance name (auto-generated if not provided)
82+
83+
**Consumer Settings:**
84+
- `XREAD_COUNT`: Messages to read per operation (default: 500)
85+
- `XREAD_BLOCK_MS`: Block timeout for reads (default: 10ms)
86+
- `BATCH_SIZE`: Buffer size before flush (default: 200)
87+
- `BATCH_TIMEOUT_MS`: Time-based flush trigger (default: 200ms)
88+
- `FLUSH_CONCURRENCY`: Concurrent flush operations (default: 1)
89+
- `CLAIM_MIN_IDLE_MS`: Minimum idle time for pending recovery (default: 10000ms)
90+
- `CLAIM_COUNT`: Messages to claim per recovery iteration (default: 200)
91+
92+
**Server Configuration:**
93+
- `PORT`: WebSocket server port (default: 9001)
94+
- `SERVER_ID`: Unique server identifier (auto-generated if not provided)
95+
- `ALLOWED_ORIGINS`: CORS allowed origins (comma-separated)
96+
97+
**Database Configuration:**
98+
- `SUPABASE_DB_URL`: PostgreSQL connection string
99+
100+
### Running the Application
101+
102+
#### Local Development
103+
104+
Start Redis and PostgreSQL instances:
13105
```bash
14106
docker start redis-local
15-
```
16-
Also run local Postgres instance.
17-
```bash
18107
docker start local-pg
19108
```
20-
Don't forget to change env variables to local redis and pg
109+
110+
Set environment variables in `config/.env`:
21111
```bash
22112
SUPABASE_DB_URL=postgres://postgres:postgres@localhost:5432/messages
113+
REDIS_HOST=localhost
114+
REDIS_PORT=6379
23115
```
24116

25-
Using containers to run server?
117+
Run the server:
26118
```bash
27-
docker build -t server-local -f Dockerfile.server .
119+
node server/server-fanout.js
120+
```
28121

29-
docker run --env-file .env-local -p 9001:9001 server-local
122+
Run the consumer (in a separate terminal):
123+
```bash
124+
node consumer/consumer.js
30125
```
31-
Run local redis instance too.
32126

33-
### How to run (PM2 Multiple Instances) >>
127+
#### Docker Deployment
34128

129+
Build and run the server:
35130
```bash
36-
pm2 start server.js -i <instance-num>
131+
docker build -t server-local -f server/Dockerfile.server .
132+
docker run --env-file config/.env-local -p 9001:9001 server-local
37133
```
38134

39-
Also run local redis instance.
40-
Change local vars to local redis host and port.
135+
Build and run the consumer:
136+
```bash
137+
docker build -t consumer-local -f consumer/Dockerfile.consumer .
138+
docker run --env-file config/.env-local consumer-local
139+
```
140+
141+
#### Multiple Instances with PM2
41142

42-
Check logs
143+
Run multiple server instances:
43144
```bash
44-
pm2 logs
145+
pm2 start server/server-fanout.js -i <instance-num>
45146
```
46147

47-
Stop all instances
148+
Run multiple consumer instances:
48149
```bash
49-
pm2 stop all
150+
pm2 start consumer/consumer.js -i <instance-num>
50151
```
51152

52-
### Testing >>
153+
Check logs:
154+
```bash
155+
pm2 logs
156+
```
157+
158+
### Testing
53159

54-
Server Unit Tests
160+
Run server unit tests:
55161
```bash
56162
cd server
57163
npm run test:unit
58-
```
164+
```
165+
166+
## Dependencies
167+
168+
- **uWebSockets.js**: High-performance WebSocket server
169+
- **redis**: Redis client for Node.js
170+
- **pg**: PostgreSQL client with connection pooling
171+
- **p-limit**: Concurrency control for batch operations
172+
- **dotenv**: Environment variable management

consumer/Dockerfile.consumer

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ COPY consumer/ ./consumer/
1616
COPY config/ ./config/
1717

1818
# Small health-check-friendly default command
19-
CMD ["node", "consumer/consumer.js"]
19+
CMD ["node", "consumer/consumer.js"]

frontend

Lines changed: 0 additions & 1 deletion
This file was deleted.

frontend/.env.example

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Supabase Configuration
2+
# Copy this file to .env and fill in your actual values
3+
VITE_SUPABASE_URL=your_supabase_project_url
4+
VITE_SUPABASE_ANON_KEY=your_supabase_anon_key
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
fix: UI fix

0 commit comments

Comments
 (0)