diff --git a/.github/workflows/docker-build.yml b/.github/workflows/docker-build.yml new file mode 100644 index 0000000..66ed563 --- /dev/null +++ b/.github/workflows/docker-build.yml @@ -0,0 +1,30 @@ +name: Build and Publish Docker Image +on: + push: + branches: + - main + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to Docker Hub + uses: docker/login-action@v3 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_PASSWORD }} + + - name: Build and push docker image + uses: docker/build-push-action@v5 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ secrets.DOCKERHUB_USERNAME }}/mitsi-media:latest diff --git a/package-lock.json b/package-lock.json index b762196..b63c97a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -15,6 +15,7 @@ "dotenv": "^17.2.1", "express": "^5.1.0", "helmet": "^8.1.0", + "ioredis": "^5.8.2", "mediasoup": "3.19.4", "public-ip": "^7.0.1", "redis": "^5.8.0", @@ -902,6 +903,12 @@ "url": "https://github.com/sponsors/nzakas" } }, + "node_modules/@ioredis/commands": { + "version": "1.4.0", + "resolved": "https://registry.npmjs.org/@ioredis/commands/-/commands-1.4.0.tgz", + "integrity": "sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==", + "license": "MIT" + }, "node_modules/@isaacs/cliui": { "version": "8.0.2", "resolved": "https://registry.npmjs.org/@isaacs/cliui/-/cliui-8.0.2.tgz", @@ -3426,6 +3433,15 @@ "node": ">=10" } }, + "node_modules/denque": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/denque/-/denque-2.1.0.tgz", + "integrity": "sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==", + "license": "Apache-2.0", + "engines": { + "node": ">=0.10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -4715,6 +4731,30 @@ "node": "^18.17.0 || >=20.5.0" } }, + "node_modules/ioredis": { + "version": "5.8.2", + "resolved": "https://registry.npmjs.org/ioredis/-/ioredis-5.8.2.tgz", + "integrity": "sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==", + "license": "MIT", + "dependencies": { + "@ioredis/commands": "1.4.0", + "cluster-key-slot": "^1.1.0", + "debug": "^4.3.4", + "denque": "^2.1.0", + "lodash.defaults": "^4.2.0", + "lodash.isarguments": "^3.1.0", + "redis-errors": "^1.2.0", + "redis-parser": "^3.0.0", + "standard-as-callback": "^2.1.0" + }, + "engines": { + "node": ">=12.22.0" + }, + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/ioredis" + } + }, "node_modules/ip-regex": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/ip-regex/-/ip-regex-5.0.0.tgz", @@ -5687,6 +5727,18 @@ "integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==", "license": "MIT" }, + "node_modules/lodash.defaults": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/lodash.defaults/-/lodash.defaults-4.2.0.tgz", + "integrity": "sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==", + "license": "MIT" + }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==", + "license": "MIT" + }, "node_modules/lodash.memoize": { "version": "4.1.2", "resolved": "https://registry.npmjs.org/lodash.memoize/-/lodash.memoize-4.1.2.tgz", @@ -6774,6 +6826,27 @@ "node": ">= 18" } }, + "node_modules/redis-errors": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/redis-errors/-/redis-errors-1.2.0.tgz", + "integrity": "sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==", + "license": "MIT", + "engines": { + "node": ">=4" + } + }, + "node_modules/redis-parser": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/redis-parser/-/redis-parser-3.0.0.tgz", + "integrity": "sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==", + "license": "MIT", + "dependencies": { + "redis-errors": "^1.0.0" + }, + "engines": { + "node": ">=4" + } + }, "node_modules/require-directory": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/require-directory/-/require-directory-2.1.1.tgz", @@ -7152,6 +7225,12 @@ "node": ">=8" } }, + "node_modules/standard-as-callback": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/standard-as-callback/-/standard-as-callback-2.1.0.tgz", + "integrity": "sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==", + "license": "MIT" + }, "node_modules/statuses": { "version": "2.0.2", "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.2.tgz", diff --git a/package.json b/package.json index 771b3dc..e7214a2 100644 --- a/package.json +++ b/package.json @@ -48,6 +48,7 @@ "dotenv": "^17.2.1", "express": "^5.1.0", "helmet": "^8.1.0", + "ioredis": "^5.8.2", "mediasoup": "3.19.4", "public-ip": "^7.0.1", "redis": "^5.8.0", diff --git a/src/app.ts b/src/app.ts index baa1407..2c9a499 100644 --- a/src/app.ts +++ b/src/app.ts @@ -5,12 +5,11 @@ import { createServer } from 'https'; import config from './config'; import { Routes } from './routes'; -import { redisServer } from './servers/redis-server'; import { grpcServer } from './servers/grpc-server'; -import { MediaNodeData } from './types'; -import { getRedisKey, registerMediaNode } from './lib/utils'; +import { getRedisKey, handleHeartBeat, registerMediaNode } from './lib/utils'; import { mediaSoupServer } from './servers/mediasoup-server'; import { Actions } from './types/actions'; +import { ioRedisServer } from './servers/ioredis-server'; const app = express(); app.use(cors(config.cors)); @@ -20,11 +19,12 @@ app.use('/', Routes); const httpsServer = createServer(config.httpsServerOptions, app); -let medianodeData: MediaNodeData; +// let medianodeData: MediaNodeData; +let heartBeatInterval: NodeJS.Timeout; (async (): Promise => { try { - await redisServer.connect(); + await ioRedisServer.connect(); httpsServer.listen(config.port, () => { console.log(`Server running on port ${config.port}`); }); @@ -32,7 +32,10 @@ let medianodeData: MediaNodeData; await mediaSoupServer.start(); - medianodeData = await registerMediaNode(); + await registerMediaNode(); + + heartBeatInterval = setInterval(handleHeartBeat, 120000); + console.log('Register medianode'); } catch (error) { console.error('Initialization error:', error); @@ -42,19 +45,20 @@ let medianodeData: MediaNodeData; const shutdown = async (): Promise => { try { - await redisServer.sRem( - getRedisKey['medianodes'](), - JSON.stringify(medianodeData) - ); - await redisServer.publish({ + await ioRedisServer.publish({ channel: Actions.Message, action: Actions.MediaNodeRemoved, - args: { id: medianodeData.id }, + args: { id: config.nodeId }, }); + + await ioRedisServer.del(getRedisKey['medianode'](config.nodeId)); + console.log('Delete medianode'); httpsServer.close(); mediaSoupServer.shutdown(); - await redisServer.disconnect(); + await ioRedisServer.disconnect(); + + clearInterval(heartBeatInterval); console.log('Application shut down gracefully'); process.exit(0); } catch (err) { diff --git a/src/lib/utils.ts b/src/lib/utils.ts index 1af989d..0749ca0 100644 --- a/src/lib/utils.ts +++ b/src/lib/utils.ts @@ -1,9 +1,10 @@ import config from '../config'; -import { redisServer } from '../servers/redis-server'; +import { ioRedisServer } from '../servers/ioredis-server'; import { MediaNodeData } from '../types'; import { Actions } from '../types/actions'; export const HEARTBEAT_TIMEOUT = 60000; // 90 seconds / 1.30mins +export const MEDIANODE_TTL = 300000; // 180seconds 3mins export const getRedisKey = { room: (roomId: string): string => `room:${roomId}`, @@ -13,7 +14,8 @@ export const getRedisKey = { roomActiveSpeakerPeerId: (roomId: string): string => `room:${roomId}:activespeakerpeerid`, rooms: (): string => `rooms`, - medianodes: (): string => `medianodes`, + medianodes: (): string => `medianodes`, // todo delete + medianode: (nodeId: string): string => `medianodes:${nodeId}`, signalnodes: (): string => `signalnodes`, roomMedianodes: (roomId: string): string => `room:${roomId}:medianodes`, roomSignalnodes: (roomId: string): string => `room:${roomId}:signalnodes`, @@ -34,13 +36,23 @@ export const registerMediaNode = async (): Promise => { }; // todo remove data from redis if it matches this node id - await redisServer.sAdd( - getRedisKey['medianodes'](), - JSON.stringify(medianodeData) - ); + // await redisServer.sAdd( + // {etRedisKey['medianodes'](), + // JSON.stringify(medianodeData) + // ); + + // register medianode in redis hash for easy lookup + await ioRedisServer.hSet(getRedisKey['medianode'](medianodeData.id), { + ...medianodeData, + }); + // set ttl for medianode + await ioRedisServer.expire( + getRedisKey['medianode'](medianodeData.id), + MEDIANODE_TTL / 1000 + ); // publish update to notify other services - await redisServer.publish({ + await ioRedisServer.publish({ channel: Actions.Message, action: Actions.MediaNodeAdded, args: { ...medianodeData }, @@ -51,6 +63,13 @@ export const registerMediaNode = async (): Promise => { } }; +export const handleHeartBeat = async (): Promise => { + ioRedisServer + .expire(getRedisKey['medianode'](config.nodeId), MEDIANODE_TTL / 1000) + .catch(error => console.log(error)); + console.log('handleHeartBeat'); +}; + export const parseArguments = (args?: string): { [key: string]: unknown } => { let parsedArgs: { [key: string]: unknown } = {}; if (args) { diff --git a/src/servers/grpc-server.ts b/src/servers/grpc-server.ts index 07eca8e..c8d0470 100644 --- a/src/servers/grpc-server.ts +++ b/src/servers/grpc-server.ts @@ -26,30 +26,12 @@ class GrpcServer extends EventEmitter { this.server = new grpc.Server(); - // this.server = new grpc.Server({ - // 'grpc.keepalive_time_ms': 10000, - // 'grpc.keepalive_timeout_ms': 5000, - // 'grpc.keepalive_permit_without_calls': 1, - // 'grpc.http2.max_pings_without_data': 0, - // 'grpc.http2.min_time_between_pings_ms': 10000, - // 'grpc.http2.min_ping_interval_without_data_ms': 300000, - // 'grpc.max_connection_idle_ms': 300000, - // 'grpc.max_connection_age_ms': 600000, - // 'grpc.max_connection_age_grace_ms': 30000, - // 'grpc.max_receive_message_length': 4 * 1024 * 1024, // 4MB - // 'grpc.max_send_message_length': 4 * 1024 * 1024, // 4MB - // }); - this.startTime = new Date(); this.cleanupInterval = null; this.healthCheckInterval = null; this.metricsInterval = null; this.setup(); - - // Graceful shutdown handling - // process.on('SIGINT', () => this.gracefulShutdown('SIGINT')); - // process.on('SIGTERM', () => this.gracefulShutdown('SIGTERM')); } static getInstance(): GrpcServer { diff --git a/src/servers/ioredis-server.ts b/src/servers/ioredis-server.ts new file mode 100644 index 0000000..c9f7b4a --- /dev/null +++ b/src/servers/ioredis-server.ts @@ -0,0 +1,259 @@ +import Redis from 'ioredis'; + +import { Actions as PSA } from '../types/actions'; +import config from '../config'; + +class IORedisServer { + private static instance: IORedisServer | null = null; + private pubClient: Redis; + private subClient: Redis; + + private isConnected: boolean = false; + + private constructor() { + this.pubClient = new Redis(config.redisServerUrl); + this.subClient = this.pubClient.duplicate(); + } + + static getInstance(): IORedisServer { + if (!IORedisServer.instance) { + IORedisServer.instance = new IORedisServer(); + } + return IORedisServer.instance; + } + + async connect(): Promise { + if (this.isConnected) { + console.log('Redis clients already connected'); + return; + } + try { + await Promise.all([this.pubClient.ping(), this.subClient.ping()]); + this.isConnected = true; + await this.subscribe(PSA.Message); + console.log('ioredis connected'); + } catch (error) { + console.error(error); + throw error; + } + } + + async publish({ + channel, + action, + args, + }: { + channel: string; + action: PSA; + args: { [key: string]: unknown }; + }): Promise { + if (!this.isConnected) + throw new Error('Redis clients are not connected. Call connect() first'); + + const message = JSON.stringify({ action, args }); + await this.pubClient.publish(channel, message); + console.info(`Message published to channel ${message}`); + } + + getPubClient(): Redis { + return this.pubClient; + } + + getSubClient(): Redis { + return this.subClient; + } + + async subscribe(channel: string): Promise { + if (!this.isConnected) + throw new Error('Redis clients are not connected. Call connect() first'); + console.log(`Subscribing to channel "${channel}"`); + + await this.subClient.subscribe(channel, (err, count) => { + if (err) { + console.error('Failed to subscribe:', err); + } else { + console.log(`Subscribed to ${count} channel(s)`); + } + }); + + this.subClient.on('message', (channel, message) => { + const { + action, + args, + }: { + action: PSA; + args: { [key: string]: unknown }; + } = JSON.parse(message); + console.log( + `got pubsub event from channel -> ${channel} message -> ${message}` + ); + // check and ignore if publisher + const handler = this.pubSubHander[action]; + + if (handler) handler(args); + }); + + console.log(`Subscribed to channel "${channel}"`); + } + + async unsubscribe(channel: string): Promise { + if (!this.isConnected) + throw new Error('Redis clients are not connected. Call connect() first'); + + await this.subClient.unsubscribe(channel); + console.log(`Unsubscribed from channel "${channel}"`); + } + + // Set operations + + async set(key: string, value: string | number): Promise { + return await this.pubClient.set(key, value); + } + + async setex(key: string, value: string, seconds: number): Promise { + return await this.pubClient.setex(key, value, seconds); + } + + async setnx(key: string, value: string): Promise { + return await this.pubClient.setnx(key, value); + } + + async get(key: string): Promise { + return await this.pubClient.get(key); + } + + async sAdd(key: string, member: string): Promise { + return await this.pubClient.sadd(key, member); + } + + async sRem(key: string, member: string): Promise { + return await this.pubClient.srem(key, member); + } + + async sIsMember(key: string, member: string): Promise { + return await this.pubClient.sismember(key, member); + } + + async sMembers(key: string): Promise { + return await this.pubClient.smembers(key); + } + + async scan( + cursor: number | string, + options: { + MATCH: string; + COUNT: number; + } + ): Promise<{ + cursor: string; + keys: string[]; + }> { + const stringCursor = + typeof cursor === 'number' ? cursor.toString() : cursor; + + const [nextCursor, keys] = await this.pubClient.scan( + stringCursor, + 'MATCH', + options.MATCH, + 'COUNT', + options.COUNT + ); + + return { + cursor: nextCursor, + keys: keys, + }; + } + + async exists(key: string): Promise { + return await this.pubClient.exists(key); + } + + // Hash operations + + async hSet( + key: string, + fieldOrValue: string | Record, + value?: string | number + ): Promise { + if (typeof fieldOrValue === 'string') { + if (value === undefined) { + throw new Error('Value must be provided when field is a string'); + } + return await this.pubClient.hset(key, fieldOrValue, value); + } + + if (typeof fieldOrValue === 'object') { + return await this.pubClient.hset(key, fieldOrValue); + } + throw new Error('Invalid arguments for hSet'); + } + + async hGet(key: string, field: string): Promise { + return await this.pubClient.hget(key, field); + } + + async hDel(key: string, field: string): Promise { + return await this.pubClient.hdel(key, field); + } + + async hGetAll(key: string): Promise<{ [key: string]: string }> { + return await this.pubClient.hgetall(key); + } + + async hkeys(key: string): Promise { + return await this.pubClient.hkeys(key); + } + + async hVals(key: string): Promise { + return await this.pubClient.hvals(key); + } + + async hLen(key: string): Promise { + return await this.pubClient.hlen(key); + } + + async del(key: string): Promise { + return await this.pubClient.del(key); + } + + async expire(key: string, seconds: number): Promise { + return await this.pubClient.expire(key, seconds); + } + + // async hExpire( + // key: string, + // fields: string[], + // seconds: number, + // mode?: 'NX' | 'XX' | 'GT' | 'LT' | undefined + // ): Promise { + // const args: any[] = [key, seconds, ...fields]; + // if (mode) { + // args.push(mode); + // } + // return await this.pubClient.hexpire(...args); + // } + + async persist(key: string): Promise { + return await this.pubClient.persist(key); + } + + async disconnect(): Promise { + if (this.isConnected) { + await Promise.all([this.pubClient.quit(), this.subClient.quit()]); + IORedisServer.instance = null; + this.isConnected = false; + console.log('Redis clients disconnected'); + } + } + + private pubSubHander: { + [key in PSA]?: (args: { [key: string]: unknown }) => void; + } = { + [PSA.RemovePeer]: args => { + console.log(args); + }, + }; +} + +export const ioRedisServer = IORedisServer.getInstance(); diff --git a/src/servers/redis-server.ts b/src/servers/redis-server.ts index a4c4ca9..df357a0 100644 --- a/src/servers/redis-server.ts +++ b/src/servers/redis-server.ts @@ -108,6 +108,89 @@ class RedisServer { return await this.pubClient.sMembers(key); } + async scan( + cursor: number | string, + options: { + MATCH: string; + COUNT: number; + } + ): Promise<{ + cursor: string; + keys: string[]; + }> { + const stringCursor = + typeof cursor === 'number' ? cursor.toString() : cursor; + + return await this.pubClient.scan(stringCursor, options); + } + + async exists(key: string): Promise { + return await this.pubClient.exists(key); + } + + async hSet( + key: string, + fieldOrValue: string | Record, + value?: string | number + ): Promise { + if (typeof fieldOrValue === 'string') { + if (value === undefined) { + throw new Error('Value must be provided when field is a string'); + } + return await this.pubClient.hSet(key, fieldOrValue, value); + } + + if (typeof fieldOrValue === 'object') { + return await this.pubClient.hSet(key, fieldOrValue); + } + throw new Error('Invalid arguments for hSet'); + } + + async hGet(key: string, field: string): Promise { + return await this.pubClient.hGet(key, field); + } + + async hDel(key: string, field: string): Promise { + return await this.pubClient.hDel(key, field); + } + + async hGetAll(key: string): Promise<{ [key: string]: string | number }> { + return await this.pubClient.hGetAll(key); + } + + async hkeys(key: string): Promise { + return await this.pubClient.hKeys(key); + } + + async hVals(key: string): Promise { + return await this.pubClient.hVals(key); + } + + async hLen(key: string): Promise { + return await this.pubClient.hLen(key); + } + + async del(key: string): Promise { + return await this.pubClient.del(key); + } + + async expire(key: string, seconds: number): Promise { + return await this.pubClient.expire(key, seconds); + } + + async hExpire( + key: string, + fields: string[], + seconds: number, + mode?: 'NX' | 'XX' | 'GT' | 'LT' | undefined + ): Promise { + return await this.pubClient.hExpire(key, fields, seconds, mode); + } + + async persist(key: string): Promise { + return await this.pubClient.persist(key); + } + async disconnect(): Promise { if (this.isConnected) { await Promise.all([this.pubClient.quit(), this.subClient.quit()]); @@ -134,4 +217,4 @@ class RedisServer { }; } -export const redisServer = RedisServer.getInstance(); +export default RedisServer; diff --git a/src/services/room.ts b/src/services/room.ts index 7271650..4bcae8a 100644 --- a/src/services/room.ts +++ b/src/services/room.ts @@ -3,7 +3,7 @@ import { EventEmitter } from 'events'; import Peer from './peer'; import config from '../config'; import { mediaSoupServer } from '../servers/mediasoup-server'; -import { redisServer } from '../servers/redis-server'; +import { ioRedisServer } from '../servers/ioredis-server'; import { getRedisKey } from '../lib/utils'; import { Actions } from '../types/actions'; import MediaNode from './medianode'; @@ -289,7 +289,7 @@ class Room extends EventEmitter { // store meeting active speaker peerid in db. // todo may require a different position when optimising for multiple media servers] - redisServer.set( + ioRedisServer.set( getRedisKey['roomActiveSpeakerPeerId'](this.roomId), JSON.stringify(peerId) );