diff --git a/package-lock.json b/package-lock.json index 22449b5..b0b7bce 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,6 +16,7 @@ "express": "^5.1.0", "helmet": "^8.1.0", "mediasoup": "^3.18.0", + "public-ip": "^7.0.1", "redis": "^5.8.0", "zod": "^4.0.17" }, @@ -1482,6 +1483,12 @@ "url": "https://opencollective.com/js-sdsl" } }, + "node_modules/@leichtgewicht/ip-codec": { + "version": "2.0.5", + "resolved": "https://registry.npmjs.org/@leichtgewicht/ip-codec/-/ip-codec-2.0.5.tgz", + "integrity": "sha512-Vo+PSpZG2/fmgmiNzYK9qWRh8h/CHrwD0mo1h1DzL4yzHNSfWYujGTYsWGreD000gcgmZ7K4Ys6Tx9TxtsKdDw==", + "license": "MIT" + }, "node_modules/@napi-rs/wasm-runtime": { "version": "0.2.12", "resolved": "https://registry.npmjs.org/@napi-rs/wasm-runtime/-/wasm-runtime-0.2.12.tgz", @@ -1688,6 +1695,18 @@ "dev": true, "license": "MIT" }, + "node_modules/@sindresorhus/is": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/@sindresorhus/is/-/is-5.6.0.tgz", + "integrity": "sha512-TV7t8GKYaJWsn00tFDqBw8+Uqmr8A0fRU1tvTQhyZzGv0sJCGRQL3JGMI3ucuKo3XIZdUP+Lx7/gh2t3lewy7g==", + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sindresorhus/is?sponsor=1" + } + }, "node_modules/@sinonjs/commons": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/@sinonjs/commons/-/commons-3.0.1.tgz", @@ -1708,6 +1727,18 @@ "@sinonjs/commons": "^3.0.1" } }, + "node_modules/@szmarczak/http-timer": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/@szmarczak/http-timer/-/http-timer-5.0.1.tgz", + "integrity": "sha512-+PmQX0PiAYPMeVYe237LJAYvOMYW1j2rH5YROyS3b4CTVJum34HfRvKvAzozHAQG0TnHNdUfY9nCeUyRAs//cw==", + "license": "MIT", + "dependencies": { + "defer-to-connect": "^2.0.1" + }, + "engines": { + "node": ">=14.16" + } + }, "node_modules/@tsconfig/node10": { "version": "1.0.11", "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.11.tgz", @@ -1862,6 +1893,12 @@ "@types/send": "*" } }, + "node_modules/@types/http-cache-semantics": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/@types/http-cache-semantics/-/http-cache-semantics-4.0.4.tgz", + "integrity": "sha512-1m0bIFVc7eJWyve9S0RnuRgcQqF/Xd5QsUZAZeQFr1Q3/p9JWoQQEqmVy+DPTNpGXwhgIetAoYF8JSc33q29QA==", + "license": "MIT" + }, "node_modules/@types/http-errors": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/@types/http-errors/-/http-errors-2.0.5.tgz", @@ -2879,6 +2916,33 @@ "node": ">= 0.8" } }, + "node_modules/cacheable-lookup": { + "version": "7.0.0", + "resolved": "https://registry.npmjs.org/cacheable-lookup/-/cacheable-lookup-7.0.0.tgz", + "integrity": "sha512-+qJyx4xiKra8mZrcwhjMRMUhD5NR1R8esPkzIYxX96JiecFoxAXFuz/GpR3+ev4PE1WamHip78wV0vcmPQtp8w==", + "license": "MIT", + "engines": { + "node": ">=14.16" + } + }, + "node_modules/cacheable-request": { + "version": "10.2.14", + "resolved": "https://registry.npmjs.org/cacheable-request/-/cacheable-request-10.2.14.tgz", + "integrity": "sha512-zkDT5WAF4hSSoUgyfg5tFIxz8XQK+25W/TLVojJTMKBaxevLBBtLxgqguAuVQB8PVW79FVjHcU+GJ9tVbDZ9mQ==", + "license": "MIT", + "dependencies": { + "@types/http-cache-semantics": "^4.0.2", + "get-stream": "^6.0.1", + "http-cache-semantics": "^4.1.1", + "keyv": "^4.5.3", + "mimic-response": "^4.0.0", + "normalize-url": "^8.0.0", + "responselike": "^3.0.0" + }, + "engines": { + "node": ">=14.16" + } + }, "node_modules/call-bind-apply-helpers": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/call-bind-apply-helpers/-/call-bind-apply-helpers-1.0.2.tgz", @@ -3118,6 +3182,21 @@ "url": "https://github.com/chalk/wrap-ansi?sponsor=1" } }, + "node_modules/clone-regexp": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/clone-regexp/-/clone-regexp-3.0.0.tgz", + "integrity": "sha512-ujdnoq2Kxb8s3ItNBtnYeXdm07FcU0u8ARAT1lQ2YdMwQC+cdiXX8KoqMVuglztILivceTtp4ivqGSmEmhBUJw==", + "license": "MIT", + "dependencies": { + "is-regexp": "^3.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/cluster-key-slot": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", @@ -3191,6 +3270,18 @@ "node": ">= 0.6" } }, + "node_modules/convert-hrtime": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/convert-hrtime/-/convert-hrtime-5.0.0.tgz", + "integrity": "sha512-lOETlkIeYSJWcbbcvjRKGxVMXJR+8+OQb/mTPbA4ObPMytYIsUbuOE0Jzy60hjARYszq1id0j8KgVhC+WGZVTg==", + "license": "MIT", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/convert-source-map": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/convert-source-map/-/convert-source-map-2.0.0.tgz", @@ -3277,6 +3368,33 @@ } } }, + "node_modules/decompress-response": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/decompress-response/-/decompress-response-6.0.0.tgz", + "integrity": "sha512-aW35yZM6Bb/4oJlZncMH2LCoZtJXTRxES17vE3hoRiowU2kWHaJKFkSBDnDR+cm9J+9QhXmREyIfv0pji9ejCQ==", + "license": "MIT", + "dependencies": { + "mimic-response": "^3.1.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, + "node_modules/decompress-response/node_modules/mimic-response": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-3.1.0.tgz", + "integrity": "sha512-z0yWI+4FDrrweS8Zmt4Ej5HdJmky15+L2e6Wgn3+iK5fWzb6T3fhNFq2+MeTRb064c6Wr4N/wv0DzQTjNzHNGQ==", + "license": "MIT", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/dedent": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/dedent/-/dedent-1.6.0.tgz", @@ -3309,6 +3427,15 @@ "node": ">=0.10.0" } }, + "node_modules/defer-to-connect": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/defer-to-connect/-/defer-to-connect-2.0.1.tgz", + "integrity": "sha512-4tvttepXG1VaYGrRibk5EwJd1t4udunSOVMdLSAL6mId1ix438oPwPZMALY41FCijukO1L0twNcGsdzS7dHgDg==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, "node_modules/depd": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/depd/-/depd-2.0.0.tgz", @@ -3338,6 +3465,30 @@ "node": ">=0.3.1" } }, + "node_modules/dns-packet": { + "version": "5.6.1", + "resolved": "https://registry.npmjs.org/dns-packet/-/dns-packet-5.6.1.tgz", + "integrity": "sha512-l4gcSouhcgIKRvyy99RNVOgxXiicE+2jZoNmaNmZ6JXiGajBOJAesk1OBlJuM5k2c+eudGdLxDqXuPCKIj6kpw==", + "license": "MIT", + "dependencies": { + "@leichtgewicht/ip-codec": "^2.0.1" + }, + "engines": { + "node": ">=6" + } + }, + "node_modules/dns-socket": { + "version": "4.2.2", + "resolved": "https://registry.npmjs.org/dns-socket/-/dns-socket-4.2.2.tgz", + "integrity": "sha512-BDeBd8najI4/lS00HSKpdFia+OvUMytaVjfzR9n5Lq8MlZRSvtbI+uLtx1+XmQFls5wFU9dssccTmQQ6nfpjdg==", + "license": "MIT", + "dependencies": { + "dns-packet": "^5.2.4" + }, + "engines": { + "node": ">=6" + } + }, "node_modules/dotenv": { "version": "17.2.1", "resolved": "https://registry.npmjs.org/dotenv/-/dotenv-17.2.1.tgz", @@ -4072,6 +4223,15 @@ "url": "https://github.com/sponsors/isaacs" } }, + "node_modules/form-data-encoder": { + "version": "2.1.4", + "resolved": "https://registry.npmjs.org/form-data-encoder/-/form-data-encoder-2.1.4.tgz", + "integrity": "sha512-yDYSgNMraqvnxiEXO4hi88+YZxaHC6QKzb5N84iRCTDeRO7ZALpir/lVmf/uXUhnwUr2O4HU8s/n6x+yNjQkHw==", + "license": "MIT", + "engines": { + "node": ">= 14.17" + } + }, "node_modules/formdata-polyfill": { "version": "4.0.10", "resolved": "https://registry.npmjs.org/formdata-polyfill/-/formdata-polyfill-4.0.10.tgz", @@ -4133,6 +4293,18 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/function-timeout": { + "version": "0.1.1", + "resolved": "https://registry.npmjs.org/function-timeout/-/function-timeout-0.1.1.tgz", + "integrity": "sha512-0NVVC0TaP7dSTvn1yMiy6d6Q8gifzbvQafO46RtLG/kHJUBNd+pVRGOBoK44wNBvtSPUJRfdVvkFdD3p0xvyZg==", + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/gensync": { "version": "1.0.0-beta.2", "resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz", @@ -4203,7 +4375,6 @@ "version": "6.0.1", "resolved": "https://registry.npmjs.org/get-stream/-/get-stream-6.0.1.tgz", "integrity": "sha512-ts6Wi+2j3jQjqi70w5AlN8DFnkSwC+MqmxEzdEALB2qXZYV3X/b1CTfgPLGJNMeAWxdPfU8FO1ms3NUfaHCPYg==", - "dev": true, "license": "MIT", "engines": { "node": ">=10" @@ -4271,6 +4442,31 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/got": { + "version": "13.0.0", + "resolved": "https://registry.npmjs.org/got/-/got-13.0.0.tgz", + "integrity": "sha512-XfBk1CxOOScDcMr9O1yKkNaQyy865NbYs+F7dr4H0LZMVgCj2Le59k6PqbNHoL5ToeaEQUYh6c6yMfVcc6SJxA==", + "license": "MIT", + "dependencies": { + "@sindresorhus/is": "^5.2.0", + "@szmarczak/http-timer": "^5.0.1", + "cacheable-lookup": "^7.0.0", + "cacheable-request": "^10.2.8", + "decompress-response": "^6.0.0", + "form-data-encoder": "^2.1.2", + "get-stream": "^6.0.1", + "http2-wrapper": "^2.1.10", + "lowercase-keys": "^3.0.0", + "p-cancelable": "^3.0.0", + "responselike": "^3.0.0" + }, + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sindresorhus/got?sponsor=1" + } + }, "node_modules/graceful-fs": { "version": "4.2.11", "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", @@ -4373,6 +4569,12 @@ "dev": true, "license": "MIT" }, + "node_modules/http-cache-semantics": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/http-cache-semantics/-/http-cache-semantics-4.2.0.tgz", + "integrity": "sha512-dTxcvPXqPvXBQpq5dUr6mEMJX4oIEFv6bwom3FDwKRDsuIjjJGANqhBuoAn9c1RQJIdAKav33ED65E2ys+87QQ==", + "license": "BSD-2-Clause" + }, "node_modules/http-errors": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/http-errors/-/http-errors-2.0.0.tgz", @@ -4398,6 +4600,19 @@ "node": ">= 0.8" } }, + "node_modules/http2-wrapper": { + "version": "2.2.1", + "resolved": "https://registry.npmjs.org/http2-wrapper/-/http2-wrapper-2.2.1.tgz", + "integrity": "sha512-V5nVw1PAOgfI3Lmeaj2Exmeg7fenjhRUgz1lPSezy1CuhPYbgQtbQj4jZfEAEMlaL+vupsvhjqCyjzob0yxsmQ==", + "license": "MIT", + "dependencies": { + "quick-lru": "^5.1.1", + "resolve-alpn": "^1.2.0" + }, + "engines": { + "node": ">=10.19.0" + } + }, "node_modules/human-signals": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/human-signals/-/human-signals-2.1.0.tgz", @@ -4511,6 +4726,18 @@ "node": "^18.17.0 || >=20.5.0" } }, + "node_modules/ip-regex": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/ip-regex/-/ip-regex-5.0.0.tgz", + "integrity": "sha512-fOCG6lhoKKakwv+C6KdsOnGvgXnmgfmp0myi3bcNwj3qfwPAxRKWEuFhvEFF7ceYIz6+1jRZ+yguLFAmUNPEfw==", + "license": "MIT", + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/ipaddr.js": { "version": "1.9.1", "resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-1.9.1.tgz", @@ -4582,6 +4809,22 @@ "node": ">=0.10.0" } }, + "node_modules/is-ip": { + "version": "5.0.1", + "resolved": "https://registry.npmjs.org/is-ip/-/is-ip-5.0.1.tgz", + "integrity": "sha512-FCsGHdlrOnZQcp0+XT5a+pYowf33itBalCl+7ovNXC/7o5BhIpG14M3OrpPPdBSIQJCm+0M5+9mO7S9VVTTCFw==", + "license": "MIT", + "dependencies": { + "ip-regex": "^5.0.0", + "super-regex": "^0.2.0" + }, + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-number": { "version": "7.0.0", "resolved": "https://registry.npmjs.org/is-number/-/is-number-7.0.0.tgz", @@ -4598,6 +4841,18 @@ "integrity": "sha512-hvpoI6korhJMnej285dSg6nu1+e6uxs7zG3BYAm5byqDsgJNWwxzM6z6iZiAgQR4TJ30JmBTOwqZUw3WlyH3AQ==", "license": "MIT" }, + "node_modules/is-regexp": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/is-regexp/-/is-regexp-3.1.0.tgz", + "integrity": "sha512-rbku49cWloU5bSMI+zaRaXdQHXnthP6DZ/vLnfdSKyL4zUzuWnomtOEiZZOd+ioQ+avFo/qau3KPTc7Fjy1uPA==", + "license": "MIT", + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/is-stream": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/is-stream/-/is-stream-2.0.1.tgz", @@ -5345,7 +5600,6 @@ "version": "3.0.1", "resolved": "https://registry.npmjs.org/json-buffer/-/json-buffer-3.0.1.tgz", "integrity": "sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==", - "dev": true, "license": "MIT" }, "node_modules/json-parse-even-better-errors": { @@ -5386,7 +5640,6 @@ "version": "4.5.4", "resolved": "https://registry.npmjs.org/keyv/-/keyv-4.5.4.tgz", "integrity": "sha512-oxVHkHR/EJf2CNXnWxRLW6mg7JyCCUcG0DtEGmL2ctUo1PNTin1PUil+r/+4r5MpVgC/fn1kjsx7mjSujKqIpw==", - "dev": true, "license": "MIT", "dependencies": { "json-buffer": "3.0.1" @@ -5465,6 +5718,18 @@ "integrity": "sha512-mNAgZ1GmyNhD7AuqnTG3/VQ26o760+ZYBPKjPvugO8+nLbYfX6TVpJPseBvopbdY+qpZ/lKUnmEc1LeZYS3QAA==", "license": "Apache-2.0" }, + "node_modules/lowercase-keys": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/lowercase-keys/-/lowercase-keys-3.0.0.tgz", + "integrity": "sha512-ozCC6gdQ+glXOQsveKD0YsDy8DSQFjDTz4zyzEHNV5+JP5D62LmfDZ6o1cycFx9ouG940M5dE8C8CTewdj2YWQ==", + "license": "MIT", + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/lru-cache": { "version": "5.1.1", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-5.1.1.tgz", @@ -5636,6 +5901,18 @@ "node": ">=6" } }, + "node_modules/mimic-response": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/mimic-response/-/mimic-response-4.0.0.tgz", + "integrity": "sha512-e5ISH9xMYU0DzrT+jl8q2ze9D6eWBto+I8CNpe+VI+K2J/F/k3PdkdTdz4wvGVH4NTpo+NRYTVIuMQEMMcsLqg==", + "license": "MIT", + "engines": { + "node": "^12.20.0 || ^14.13.1 || >=16.0.0" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/minimatch": { "version": "9.0.5", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-9.0.5.tgz", @@ -5881,6 +6158,18 @@ "node": ">=0.10.0" } }, + "node_modules/normalize-url": { + "version": "8.0.2", + "resolved": "https://registry.npmjs.org/normalize-url/-/normalize-url-8.0.2.tgz", + "integrity": "sha512-Ee/R3SyN4BuynXcnTaekmaVdbDAEiNrHqjQIA37mHU8G9pf7aaAD4ZX3XjBLo6rsdcxA/gtkcNYZLt30ACgynw==", + "license": "MIT", + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/npm-run-path": { "version": "4.0.1", "resolved": "https://registry.npmjs.org/npm-run-path/-/npm-run-path-4.0.1.tgz", @@ -5970,6 +6259,15 @@ "node": ">= 0.8.0" } }, + "node_modules/p-cancelable": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/p-cancelable/-/p-cancelable-3.0.0.tgz", + "integrity": "sha512-mlVgR3PGuzlo0MmTdk4cXqXWlwQDLnONTAg6sm62XkMJEiRxN3GL3SffkYvqwonbkJBcrI7Uvv5Zh9yjvn2iUw==", + "license": "MIT", + "engines": { + "node": ">=12.20" + } + }, "node_modules/p-limit": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz", @@ -6333,6 +6631,23 @@ "dev": true, "license": "MIT" }, + "node_modules/public-ip": { + "version": "7.0.1", + "resolved": "https://registry.npmjs.org/public-ip/-/public-ip-7.0.1.tgz", + "integrity": "sha512-DdNcqcIbI0wEeCBcqX+bmZpUCvrDMJHXE553zgyG1MZ8S1a/iCCxmK9iTjjql+SpHSv4cZkmRv5/zGYW93AlCw==", + "license": "MIT", + "dependencies": { + "dns-socket": "^4.2.2", + "got": "^13.0.0", + "is-ip": "^5.0.1" + }, + "engines": { + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/punycode": { "version": "2.3.1", "resolved": "https://registry.npmjs.org/punycode/-/punycode-2.3.1.tgz", @@ -6396,6 +6711,18 @@ ], "license": "MIT" }, + "node_modules/quick-lru": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/quick-lru/-/quick-lru-5.1.1.tgz", + "integrity": "sha512-WuyALRjWPDGtt/wzJiadO5AXY+8hZ80hVpe6MyivgraREW751X3SbhRvG3eLKOYN+8VEvqLcf3wdnt44Z4S4SA==", + "license": "MIT", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/range-parser": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/range-parser/-/range-parser-1.2.1.tgz", @@ -6465,6 +6792,12 @@ "node": ">=0.10.0" } }, + "node_modules/resolve-alpn": { + "version": "1.2.1", + "resolved": "https://registry.npmjs.org/resolve-alpn/-/resolve-alpn-1.2.1.tgz", + "integrity": "sha512-0a1F4l73/ZFZOakJnQ3FvkJ2+gSTQWz/r2KE5OdDY0TxPm5h4GkqkWWfM47T7HsbnOtcJVEF4epCVy6u7Q3K+g==", + "license": "MIT" + }, "node_modules/resolve-cwd": { "version": "3.0.0", "resolved": "https://registry.npmjs.org/resolve-cwd/-/resolve-cwd-3.0.0.tgz", @@ -6498,6 +6831,21 @@ "node": ">=4" } }, + "node_modules/responselike": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/responselike/-/responselike-3.0.0.tgz", + "integrity": "sha512-40yHxbNcl2+rzXvZuVkrYohathsSJlMTXKryG5y8uciHv1+xDLHQpgjG64JUO9nrEq2jGLH6IZ8BcZyw3wrweg==", + "license": "MIT", + "dependencies": { + "lowercase-keys": "^3.0.0" + }, + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/reusify": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.1.0.tgz", @@ -6996,6 +7344,23 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/super-regex": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/super-regex/-/super-regex-0.2.0.tgz", + "integrity": "sha512-WZzIx3rC1CvbMDloLsVw0lkZVKJWbrkJ0k1ghKFmcnPrW1+jWbgTkTEWVtD9lMdmI4jZEz40+naBxl1dCUhXXw==", + "license": "MIT", + "dependencies": { + "clone-regexp": "^3.0.0", + "function-timeout": "^0.1.0", + "time-span": "^5.1.0" + }, + "engines": { + "node": ">=14.16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/supports-color": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", @@ -7112,6 +7477,21 @@ "node": "*" } }, + "node_modules/time-span": { + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/time-span/-/time-span-5.1.0.tgz", + "integrity": "sha512-75voc/9G4rDIJleOo4jPvN4/YC4GRZrY8yy1uU4lwrB3XEQbWve8zXoO5No4eFrGcTAMYyoY67p8jRQdtA1HbA==", + "license": "MIT", + "dependencies": { + "convert-hrtime": "^5.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/tmpl": { "version": "1.0.5", "resolved": "https://registry.npmjs.org/tmpl/-/tmpl-1.0.5.tgz", diff --git a/package.json b/package.json index 4f58a05..a2d2b8d 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "express": "^5.1.0", "helmet": "^8.1.0", "mediasoup": "^3.18.0", + "public-ip": "^7.0.1", "redis": "^5.8.0", "zod": "^4.0.17" } diff --git a/proto-gen.sh b/proto-gen.sh index 8c4ef9d..7824bf2 100755 --- a/proto-gen.sh +++ b/proto-gen.sh @@ -1,3 +1,3 @@ #!/bin/bash -npx proto-loader-gen-types --grpcLib=@grpc/grpc-js --outDir=src/protos/ src/protos/*.proto +npx proto-loader-gen-types --grpcLib=@grpc/grpc-js --outDir=src/protos/gen/ src/protos/*.proto diff --git a/src/app.ts b/src/app.ts index 33d917c..e354a9e 100644 --- a/src/app.ts +++ b/src/app.ts @@ -1,4 +1,3 @@ -import fs from 'fs'; import express from 'express'; import cors from 'cors'; import helmet from 'helmet'; @@ -8,11 +7,9 @@ import config from './config'; import { Routes } from './routes'; import { redisServer } from './servers/redis-server'; import { grpcServer } from './servers/grpc-server'; - -const serverOption = { - key: fs.readFileSync(config.tls.key, 'utf8'), - cert: fs.readFileSync(config.tls.cert, 'utf8'), -}; +import { MediaNodeData } from './types'; +import { getRedisKey, registerMediaNode } from './lib/utils'; +import { mediaSoupServer } from './servers/mediasoup-server'; const app = express(); app.use(cors(config.cors)); @@ -20,7 +17,9 @@ app.use(helmet()); app.use(express.json()); app.use('/', Routes); -const httpsServer = createServer(serverOption, app); +const httpsServer = createServer(config.httpsServerOptions, app); + +let medianodeData: MediaNodeData; (async (): Promise => { try { @@ -29,6 +28,11 @@ const httpsServer = createServer(serverOption, app); console.log(`Server running on port ${config.port}`); }); await grpcServer.start(); + + await mediaSoupServer.start(); + + medianodeData = await registerMediaNode(); + console.log('Register medianode'); } catch (error) { console.error('Initialization error:', error); process.exit(1); @@ -37,6 +41,12 @@ const httpsServer = createServer(serverOption, app); const shutdown = async (): Promise => { try { + await redisServer.sRem( + getRedisKey['medianodesRunning'](), + JSON.stringify(medianodeData) + ); + console.log('Delete medianode'); + await redisServer.disconnect(); httpsServer.close(); console.log('Application shut down gracefully'); diff --git a/src/config/index.ts b/src/config/index.ts index d2913a1..65d12ea 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -1,26 +1,138 @@ +import fs from 'fs'; +import os from 'os'; import path from 'path'; + import * as dotenv from 'dotenv'; +import { types as mediasoupTypes } from 'mediasoup'; dotenv.config(); -const certPath = path.join(__dirname, '..', 'certs', 'fullchain.pem'); -const keyPath = path.join(__dirname, '..', 'certs', 'privkey.pem'); +const certFile = + process.env.HTTPS_CERT || + path.join(__dirname, '..', 'certs', 'fullchain.pem'); +const keyFile = + process.env.HTTPS_KEY || path.join(__dirname, '..', 'certs', 'privkey.pem'); + +const LISTEN_IP = process.env.LISTEN_IP || '0.0.0.0'; +const ANNOUNCED_ADDRESS = process.env.ANNOUNCED_ADDRESS || '127.0.0.1'; const config = { + nodeId: `mnode-1`, env: process.env.NODE_ENV, cors: { origin: process.env.NODE_ENV === 'production' ? ['https://mitsi.app'] : '*', methods: ['GET', 'POST'], }, - tls: { - cert: process.env.HTTPS_CERT || certPath, - key: process.env.HTTPS_KEY || keyPath, + httpsServerOptions: { + key: fs.readFileSync(keyFile, 'utf8'), + cert: fs.readFileSync(certFile, 'utf8'), }, port: process.env.PORT || 4000, + grpcPort: process.env.GRPC_PORT || 50052, + cpus: Object.keys(os.cpus()).length, + apiServerUrl: process.env.API_SERVER_URL, apiServerApiKey: process.env.API_SERVER_API_KEY, recordingServerUrl: process.env.RECORDING_SERVER_URL, redisServerUrl: process.env.REDIS_SERVER_URL || 'redis://localhost:6379', + + mediasoup: { + maxWorkerLoad: parseInt(process.env.MAX_WORKER_LOAD || '100'), + workerSettings: { + dtlsCertificateFile: certFile, + dtlsPrivateKeyFile: keyFile, + rtcMinPort: parseInt(process.env.RTC_MIN_PORT || '2000'), + rtcMaxPort: parseInt(process.env.RTC_MAX_PORT || '2300'), + logLevel: 'warn' as mediasoupTypes.WorkerLogLevel, + logTags: [ + 'info', + 'ice', + 'dtls', + 'rtp', + 'srtp', + 'rtcp', + 'rtx', + 'bwe', + 'score', + 'simulcast', + 'svc', + 'sctp', + ] as Array, + }, + webRtcServer: { + listenInfos: [ + { + protocol: 'udp', + ip: LISTEN_IP, + announcedAddress: ANNOUNCED_ADDRESS, + }, + { + protocol: 'tcp', + ip: LISTEN_IP, + announcedAddress: ANNOUNCED_ADDRESS, + }, + ] as Array, + }, + routerMediaCodecs: [ + { + kind: 'audio', + mimeType: 'audio/opus', + clockRate: 48000, + channels: 2, + // parameters: { + // 'stereo': 1, + // 'sprop-stereo': 1, + // 'maxplaybackrate': 48000, + // 'useinbandfec': 1 + // } + }, + { + kind: 'video', + mimeType: 'video/VP8', + clockRate: 90000, + parameters: { + 'x-google-start-bitrate': 1000, + }, + }, + // { + // kind: 'video', + // mimeType: 'video/VP9', + // clockRate: 90000, + // parameters: { + // 'profile-id': 2, + // 'x-google-start-bitrate': 1000 + // } + // }, + // { + // kind: 'video', + // mimeType: 'video/h264', + // clockRate: 90000, + // parameters: { + // 'packetization-mode': 1, + // 'profile-level-id': '4d0032', + // 'level-asymmetry-allowed': 1, + // 'x-google-start-bitrate': 1000 + // } + // }, + // { + // kind: 'video', + // mimeType: 'video/h264', + // clockRate: 90000, + // parameters: { + // 'packetization-mode': 1, + // 'profile-level-id': '42e01f', + // 'level-asymmetry-allowed': 1, + // 'x-google-start-bitrate': 1000 + // } + // } + ] as Array, + + transportListenInfo: { + protocol: 'udp', + ip: LISTEN_IP, + announcedAddress: ANNOUNCED_ADDRESS, + } as mediasoupTypes.TransportListenInfo, + }, }; export default config; diff --git a/src/lib/utils.ts b/src/lib/utils.ts new file mode 100644 index 0000000..020d748 --- /dev/null +++ b/src/lib/utils.ts @@ -0,0 +1,37 @@ +import config from '../config'; +import { redisServer } from '../servers/redis-server'; +import { MediaNodeData } from '../types'; + +export const getRedisKey = { + room: (roomId: string): string => `room-${roomId}`, + lobby: (roomId: string): string => `lobby-${roomId}`, + roomPeers: (roomId: string): string => `room-${roomId}-peers`, + roomPeerIds: (roomId: string): string => `room-${roomId}-peerids`, + roomActiveSpeakerPeerId: (roomId: string): string => + `room-${roomId}-activespeakerpeerid`, + roomsOngoing: (): string => `rooms-ongoing`, + medianodesRunning: (): string => `medianodes-running`, + signalnodesRunning: (): string => `signalnodes-running`, + roomMedianodes: (roomId: string): string => `room-${roomId}-medianodes`, + roomSignalnodes: (roomId: string): string => `room-${roomId}-signalnodes`, +}; + +export const registerMediaNode = async (): Promise => { + try { + // const { publicIpv4 } = await import('public-ip'); + // const ip = await publicIpv4(); + const medianodeData: MediaNodeData = { + id: config.nodeId, + ip: '0.0.0.0', + address: `${config.port}`, + grpcPort: `${config.grpcPort}`, + }; + await redisServer.sAdd( + getRedisKey['medianodesRunning'](), + JSON.stringify(medianodeData) + ); + return medianodeData; + } catch (error) { + throw error; + } +}; diff --git a/src/protos/gen/media-signaling.ts b/src/protos/gen/media-signaling.ts new file mode 100644 index 0000000..412467e --- /dev/null +++ b/src/protos/gen/media-signaling.ts @@ -0,0 +1,19 @@ +import type * as grpc from '@grpc/grpc-js'; +import type { MessageTypeDefinition } from '@grpc/proto-loader'; + +import type { MediaSignalingClient as _mediaSignalingPackage_MediaSignalingClient, MediaSignalingDefinition as _mediaSignalingPackage_MediaSignalingDefinition } from './mediaSignalingPackage/MediaSignaling'; +import type { MessageRequest as _mediaSignalingPackage_MessageRequest, MessageRequest__Output as _mediaSignalingPackage_MessageRequest__Output } from './mediaSignalingPackage/MessageRequest'; +import type { MessageResponse as _mediaSignalingPackage_MessageResponse, MessageResponse__Output as _mediaSignalingPackage_MessageResponse__Output } from './mediaSignalingPackage/MessageResponse'; + +type SubtypeConstructor any, Subtype> = { + new(...args: ConstructorParameters): Subtype; +}; + +export interface ProtoGrpcType { + mediaSignalingPackage: { + MediaSignaling: SubtypeConstructor & { service: _mediaSignalingPackage_MediaSignalingDefinition } + MessageRequest: MessageTypeDefinition<_mediaSignalingPackage_MessageRequest, _mediaSignalingPackage_MessageRequest__Output> + MessageResponse: MessageTypeDefinition<_mediaSignalingPackage_MessageResponse, _mediaSignalingPackage_MessageResponse__Output> + } +} + diff --git a/src/protos/gen/mediaSignalingPackage/MediaSignaling.ts b/src/protos/gen/mediaSignalingPackage/MediaSignaling.ts new file mode 100644 index 0000000..9ce9dc6 --- /dev/null +++ b/src/protos/gen/mediaSignalingPackage/MediaSignaling.ts @@ -0,0 +1,23 @@ +// Original file: src/protos/media-signaling.proto + +import type * as grpc from '@grpc/grpc-js' +import type { MethodDefinition } from '@grpc/proto-loader' +import type { MessageRequest as _mediaSignalingPackage_MessageRequest, MessageRequest__Output as _mediaSignalingPackage_MessageRequest__Output } from '../mediaSignalingPackage/MessageRequest'; +import type { MessageResponse as _mediaSignalingPackage_MessageResponse, MessageResponse__Output as _mediaSignalingPackage_MessageResponse__Output } from '../mediaSignalingPackage/MessageResponse'; + +export interface MediaSignalingClient extends grpc.Client { + Message(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_mediaSignalingPackage_MessageRequest, _mediaSignalingPackage_MessageResponse__Output>; + Message(options?: grpc.CallOptions): grpc.ClientDuplexStream<_mediaSignalingPackage_MessageRequest, _mediaSignalingPackage_MessageResponse__Output>; + message(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_mediaSignalingPackage_MessageRequest, _mediaSignalingPackage_MessageResponse__Output>; + message(options?: grpc.CallOptions): grpc.ClientDuplexStream<_mediaSignalingPackage_MessageRequest, _mediaSignalingPackage_MessageResponse__Output>; + +} + +export interface MediaSignalingHandlers extends grpc.UntypedServiceImplementation { + Message: grpc.handleBidiStreamingCall<_mediaSignalingPackage_MessageRequest__Output, _mediaSignalingPackage_MessageResponse>; + +} + +export interface MediaSignalingDefinition extends grpc.ServiceDefinition { + Message: MethodDefinition<_mediaSignalingPackage_MessageRequest, _mediaSignalingPackage_MessageResponse, _mediaSignalingPackage_MessageRequest__Output, _mediaSignalingPackage_MessageResponse__Output> +} diff --git a/src/protos/gen/mediaSignalingPackage/MessageRequest.ts b/src/protos/gen/mediaSignalingPackage/MessageRequest.ts new file mode 100644 index 0000000..6b94a14 --- /dev/null +++ b/src/protos/gen/mediaSignalingPackage/MessageRequest.ts @@ -0,0 +1,14 @@ +// Original file: src/protos/media-signaling.proto + + +export interface MessageRequest { + 'action'?: (string); + 'args'?: (string); + 'requestId'?: (string); +} + +export interface MessageRequest__Output { + 'action'?: (string); + 'args'?: (string); + 'requestId'?: (string); +} diff --git a/src/protos/gen/mediaSignalingPackage/MessageResponse.ts b/src/protos/gen/mediaSignalingPackage/MessageResponse.ts new file mode 100644 index 0000000..4824732 --- /dev/null +++ b/src/protos/gen/mediaSignalingPackage/MessageResponse.ts @@ -0,0 +1,14 @@ +// Original file: src/protos/media-signaling.proto + + +export interface MessageResponse { + 'action'?: (string); + 'args'?: (string); + 'requestId'?: (string); +} + +export interface MessageResponse__Output { + 'action'?: (string); + 'args'?: (string); + 'requestId'?: (string); +} diff --git a/src/protos/media-signaling.proto b/src/protos/media-signaling.proto index 83160de..4d23769 100644 --- a/src/protos/media-signaling.proto +++ b/src/protos/media-signaling.proto @@ -1,16 +1,19 @@ syntax = "proto3"; -package media_signaling_package; + +package mediaSignalingPackage; service MediaSignaling { - rpc SendMessage(stream SendMessageRequest) returns (stream SendMessageResponse) {}; -} -message SendMessageRequest { - string type = 1; - map args = 2; + rpc Message(stream MessageRequest) returns (stream MessageResponse) {}; } -message SendMessageResponse { - string type = 1; - map args = 2; +message MessageRequest { + string action = 1; + string args = 2; + string requestId = 3; } +message MessageResponse { + string action = 1; + string args = 2; + string requestId = 3; +} diff --git a/src/protos/media-signaling.ts b/src/protos/media-signaling.ts deleted file mode 100644 index b4f09b7..0000000 --- a/src/protos/media-signaling.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type * as grpc from '@grpc/grpc-js'; -import type { MessageTypeDefinition } from '@grpc/proto-loader'; - -import type { MediaSignalingClient as _media_signaling_package_MediaSignalingClient, MediaSignalingDefinition as _media_signaling_package_MediaSignalingDefinition } from './media_signaling_package/MediaSignaling'; -import type { SendMessageRequest as _media_signaling_package_SendMessageRequest, SendMessageRequest__Output as _media_signaling_package_SendMessageRequest__Output } from './media_signaling_package/SendMessageRequest'; -import type { SendMessageResponse as _media_signaling_package_SendMessageResponse, SendMessageResponse__Output as _media_signaling_package_SendMessageResponse__Output } from './media_signaling_package/SendMessageResponse'; - -type SubtypeConstructor any, Subtype> = { - new(...args: ConstructorParameters): Subtype; -}; - -export interface ProtoGrpcType { - media_signaling_package: { - MediaSignaling: SubtypeConstructor & { service: _media_signaling_package_MediaSignalingDefinition } - SendMessageRequest: MessageTypeDefinition<_media_signaling_package_SendMessageRequest, _media_signaling_package_SendMessageRequest__Output> - SendMessageResponse: MessageTypeDefinition<_media_signaling_package_SendMessageResponse, _media_signaling_package_SendMessageResponse__Output> - } -} - diff --git a/src/protos/media_signaling_package/MediaSignaling.ts b/src/protos/media_signaling_package/MediaSignaling.ts deleted file mode 100644 index 05b30dc..0000000 --- a/src/protos/media_signaling_package/MediaSignaling.ts +++ /dev/null @@ -1,23 +0,0 @@ -// Original file: src/protos/media-signaling.proto - -import type * as grpc from '@grpc/grpc-js' -import type { MethodDefinition } from '@grpc/proto-loader' -import type { SendMessageRequest as _media_signaling_package_SendMessageRequest, SendMessageRequest__Output as _media_signaling_package_SendMessageRequest__Output } from '../media_signaling_package/SendMessageRequest'; -import type { SendMessageResponse as _media_signaling_package_SendMessageResponse, SendMessageResponse__Output as _media_signaling_package_SendMessageResponse__Output } from '../media_signaling_package/SendMessageResponse'; - -export interface MediaSignalingClient extends grpc.Client { - SendMessage(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_media_signaling_package_SendMessageRequest, _media_signaling_package_SendMessageResponse__Output>; - SendMessage(options?: grpc.CallOptions): grpc.ClientDuplexStream<_media_signaling_package_SendMessageRequest, _media_signaling_package_SendMessageResponse__Output>; - sendMessage(metadata: grpc.Metadata, options?: grpc.CallOptions): grpc.ClientDuplexStream<_media_signaling_package_SendMessageRequest, _media_signaling_package_SendMessageResponse__Output>; - sendMessage(options?: grpc.CallOptions): grpc.ClientDuplexStream<_media_signaling_package_SendMessageRequest, _media_signaling_package_SendMessageResponse__Output>; - -} - -export interface MediaSignalingHandlers extends grpc.UntypedServiceImplementation { - SendMessage: grpc.handleBidiStreamingCall<_media_signaling_package_SendMessageRequest__Output, _media_signaling_package_SendMessageResponse>; - -} - -export interface MediaSignalingDefinition extends grpc.ServiceDefinition { - SendMessage: MethodDefinition<_media_signaling_package_SendMessageRequest, _media_signaling_package_SendMessageResponse, _media_signaling_package_SendMessageRequest__Output, _media_signaling_package_SendMessageResponse__Output> -} diff --git a/src/protos/media_signaling_package/SendMessageRequest.ts b/src/protos/media_signaling_package/SendMessageRequest.ts deleted file mode 100644 index d533e6d..0000000 --- a/src/protos/media_signaling_package/SendMessageRequest.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Original file: src/protos/media-signaling.proto - - -export interface SendMessageRequest { - 'type'?: (string); - 'args'?: ({[key: string]: string}); -} - -export interface SendMessageRequest__Output { - 'type'?: (string); - 'args'?: ({[key: string]: string}); -} diff --git a/src/protos/media_signaling_package/SendMessageResponse.ts b/src/protos/media_signaling_package/SendMessageResponse.ts deleted file mode 100644 index 3502749..0000000 --- a/src/protos/media_signaling_package/SendMessageResponse.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Original file: src/protos/media-signaling.proto - - -export interface SendMessageResponse { - 'type'?: (string); - 'args'?: ({[key: string]: string}); -} - -export interface SendMessageResponse__Output { - 'type'?: (string); - 'args'?: ({[key: string]: string}); -} diff --git a/src/servers/grpc-server.ts b/src/servers/grpc-server.ts index f7c692b..5052762 100644 --- a/src/servers/grpc-server.ts +++ b/src/servers/grpc-server.ts @@ -1,77 +1,543 @@ +import path from 'path'; +import { EventEmitter } from 'events'; + import * as grpc from '@grpc/grpc-js'; import * as protoLoader from '@grpc/proto-loader'; -import path from 'path'; -import { ProtoGrpcType } from '../protos/media-signaling'; -import { MediaSignalingHandlers } from '../protos/media_signaling_package/MediaSignaling'; -class GrpcServer { +import { MessageRequest } from '../protos/gen/mediaSignalingPackage/MessageRequest'; +import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; +import { MediaSignalingHandlers } from '../protos/gen/mediaSignalingPackage/MediaSignaling'; +import { ProtoGrpcType } from '../protos/gen/media-signaling'; + +import config from '../config'; +import SignalNode from '../services/signalnode'; +import { MediaSignalingActions as MSA } from '../types/actions'; + +interface ServerStats { + totalConnections: number; + activeConnections: number; + totalMessagesReceived: number; + totalMessagesSent: number; + uptime: number; + startTime: Date; + errors: { + connectionErrors: number; + messageErrors: number; + protocolErrors: number; + }; +} + +interface ServerConfig { + maxConnections: number; + messageTimeout: number; + shutdownGracePeriod: number; + healthCheckInterval: number; + metricsReportInterval: number; +} + +enum ServerState { + STOPPED = 'STOPPED', + STARTING = 'STARTING', + RUNNING = 'RUNNING', + STOPPING = 'STOPPING', + ERROR = 'ERROR', +} + +class GrpcServer extends EventEmitter { private static instance: GrpcServer | null = null; private server: grpc.Server; - private connections: Map; + private serverState: ServerState; + private startTime: Date; + private stats: ServerStats; + private cleanupInterval: NodeJS.Timeout | null; + private healthCheckInterval: NodeJS.Timeout | null; + private metricsInterval: NodeJS.Timeout | null; + private readonly config: ServerConfig; + private shutdownPromise: Promise | null = null; + + private constructor(serverConfig?: Partial) { + super(); + + this.config = { + maxConnections: 1000, + messageTimeout: 30000, + shutdownGracePeriod: 10000, + healthCheckInterval: 30000, + metricsReportInterval: 60000, + ...serverConfig, + }; + + this.server = new grpc.Server({ + 'grpc.keepalive_time_ms': 10000, + 'grpc.keepalive_timeout_ms': 5000, + 'grpc.keepalive_permit_without_calls': 1, + 'grpc.http2.max_pings_without_data': 0, + 'grpc.http2.min_time_between_pings_ms': 10000, + 'grpc.http2.min_ping_interval_without_data_ms': 300000, + 'grpc.max_connection_idle_ms': 300000, + 'grpc.max_connection_age_ms': 600000, + 'grpc.max_connection_age_grace_ms': 30000, + 'grpc.max_receive_message_length': 4 * 1024 * 1024, // 4MB + 'grpc.max_send_message_length': 4 * 1024 * 1024, // 4MB + }); + + this.serverState = ServerState.STOPPED; + this.startTime = new Date(); + this.cleanupInterval = null; + this.healthCheckInterval = null; + this.metricsInterval = null; + + this.stats = { + totalConnections: 0, + activeConnections: 0, + totalMessagesReceived: 0, + totalMessagesSent: 0, + uptime: 0, + startTime: this.startTime, + errors: { + connectionErrors: 0, + messageErrors: 0, + protocolErrors: 0, + }, + }; - private constructor() { - this.server = new grpc.Server(); - this.connections = new Map(); this.setup(); + this.startPeriodicTasks(); + + // Graceful shutdown handling + process.on('SIGINT', () => this.gracefulShutdown('SIGINT')); + process.on('SIGTERM', () => this.gracefulShutdown('SIGTERM')); + process.on('uncaughtException', error => + this.handleUncaughtException(error) + ); } - static getInstance(): GrpcServer { + static getInstance(serverConfig?: Partial): GrpcServer { if (!GrpcServer.instance) { - GrpcServer.instance = new GrpcServer(); + GrpcServer.instance = new GrpcServer(serverConfig); } - return GrpcServer.instance; } - async start(port: number = 50052): Promise { + async start(): Promise { + if (this.serverState === ServerState.RUNNING) { + console.log('๐Ÿ”„ gRPC server is already running'); + return; + } + + if (this.serverState === ServerState.STARTING) { + console.log('โณ gRPC server is already starting'); + return; + } + + this.setState(ServerState.STARTING); + try { - this.server.bindAsync( - `0.0.0.0:${port}`, - grpc.ServerCredentials.createInsecure(), - (err, port) => { - if (err) { - console.error(err); - return; + await new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + reject(new Error('Server start timeout')); + }, 30000); + + this.server.bindAsync( + `0.0.0.0:${config.grpcPort}`, + grpc.ServerCredentials.createInsecure(), + (err, port) => { + clearTimeout(timeoutId); + + if (err) { + console.error('โŒ Failed to bind gRPC server:', err); + this.setState(ServerState.ERROR); + this.stats.errors.connectionErrors++; + reject(err); + return; + } + + this.setState(ServerState.RUNNING); + this.startTime = new Date(); + this.stats.startTime = this.startTime; + + console.log(`โœ… gRPC Media Signaling Server started successfully`); + console.log(`๐ŸŒ Server listening on 0.0.0.0:${port}`); + console.log(`๐Ÿ“Š Max connections: ${this.config.maxConnections}`); + + this.emit('serverStarted', { port, timestamp: new Date() }); + resolve(); } - console.log(`Your server as started on port ${port}`); - } - ); + ); + }); } catch (error) { - console.error(error); + console.error('๐Ÿ’ฅ Critical error starting gRPC server:', error); + this.setState(ServerState.ERROR); + this.emit('serverError', { error, timestamp: new Date() }); throw error; } } private setup(): void { - const PROTO_FILE = path.resolve( - __dirname, - '../protos/media-signaling.proto' - ); + try { + const PROTO_FILE = path.resolve( + __dirname, + '../protos/media-signaling.proto' + ); + + const packageDefinition = protoLoader.loadSync(PROTO_FILE, { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true, + }); - const packageDefinition = protoLoader.loadSync(PROTO_FILE); - const protoDescriptor = grpc.loadPackageDefinition( - packageDefinition - ) as unknown as ProtoGrpcType; + const protoDescriptor = grpc.loadPackageDefinition( + packageDefinition + ) as unknown as ProtoGrpcType; - const mediaSignaling = - protoDescriptor.media_signaling_package.MediaSignaling; + const mediaSignaling = + protoDescriptor.mediaSignalingPackage.MediaSignaling; - this.server.addService(mediaSignaling.service, { - SendMessage: call => { - call.on('data', chunk => { - console.log('Message from client'); - console.log(chunk); + this.server.addService(mediaSignaling.service, { + Message: this.handleMessage.bind(this), + } as MediaSignalingHandlers); + + console.log('๐Ÿ“‹ gRPC service definitions loaded successfully'); + } catch (error) { + console.error('โŒ Failed to setup gRPC service:', error); + throw error; + } + } + + private handleMessage( + call: grpc.ServerDuplexStream + ): void { + const connectionId = this.generateConnectionId(); + const clientMetadata = call.metadata; + const clientId = + clientMetadata.get('clientid')[0]?.toString() || connectionId; + const remoteAddress = call.getPeer(); + + console.log(`๐Ÿ”Œ New gRPC connection established`); + console.log(` Client ID: ${clientId}`); + console.log(` Connection ID: ${connectionId}`); + console.log(` Remote Address: ${remoteAddress}`); + + // Check connection limits + if (this.stats.activeConnections >= this.config.maxConnections) { + console.warn( + `โš ๏ธ Connection limit reached (${this.config.maxConnections}), rejecting client ${clientId}` + ); + call.destroy(new Error('Server connection limit reached')); + return; + } + + try { + // Create SignalNode with enhanced error handling + new SignalNode({ + id: clientId, + call, + connectionId, + }); + + // Update server stats + this.stats.totalConnections++; + this.stats.activeConnections++; + + // Set up connection cleanup + const cleanup = (): void => { + this.stats.activeConnections = Math.max( + 0, + this.stats.activeConnections - 1 + ); + this.emit('connectionClosed', { + clientId, + connectionId, + timestamp: new Date(), + activeConnections: this.stats.activeConnections, }); + }; + + call.on('close', cleanup); + call.on('cancelled', cleanup); + + this.emit('connectionOpened', { + clientId, + connectionId, + remoteAddress, + timestamp: new Date(), + activeConnections: this.stats.activeConnections, + }); + } catch (error) { + console.error( + `โŒ Error creating SignalNode for client ${clientId}:`, + error + ); + this.stats.errors.connectionErrors++; + // call.destroy(error); + } + } + + private generateConnectionId(): string { + return `conn_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } + + private setState(newState: ServerState): void { + if (this.serverState !== newState) { + const oldState = this.serverState; + this.serverState = newState; + console.log(`๐Ÿ“ก Server state changed: ${oldState} -> ${newState}`); + this.emit('stateChanged', { oldState, newState, timestamp: new Date() }); + } + } + + private startPeriodicTasks(): void { + // Health check interval + this.healthCheckInterval = setInterval(() => { + this.performHealthCheck(); + }, this.config.healthCheckInterval); + + // Metrics reporting interval + this.metricsInterval = setInterval(() => { + this.reportMetrics(); + }, this.config.metricsReportInterval); - call.write({ - type: 'confirm connection', - args: { - status: 'success', - }, + // Cleanup interval for stale connections + this.cleanupInterval = setInterval(() => { + this.cleanupStaleConnections(); + }, 60000); // Every minute + } + + private performHealthCheck(): void { + try { + const stats = this.getStats(); + const memoryUsage = process.memoryUsage(); + + console.log( + `๐Ÿ’— Health Check - Active: ${stats.activeConnections}, Memory: ${Math.round(memoryUsage.heapUsed / 1024 / 1024)}MB` + ); + + this.emit('healthCheck', { + stats, + memoryUsage, + timestamp: new Date(), + }); + + // Check for memory leaks + if (memoryUsage.heapUsed > 512 * 1024 * 1024) { + // 512MB threshold + console.warn('โš ๏ธ High memory usage detected:', memoryUsage); + this.emit('memoryWarning', { memoryUsage, timestamp: new Date() }); + } + } catch (error) { + console.error('โŒ Health check failed:', error); + } + } + + private reportMetrics(): void { + const stats = this.getStats(); + console.log(`๐Ÿ“Š Server Metrics:`, { + uptime: `${Math.round(stats.uptime / 1000 / 60)}min`, + connections: `${stats.activeConnections}/${stats.totalConnections}`, + messages: `R:${stats.totalMessagesReceived} S:${stats.totalMessagesSent}`, + errors: stats.errors, + }); + + this.emit('metricsReport', { stats, timestamp: new Date() }); + } + + private cleanupStaleConnections(): void { + try { + const nodes = SignalNode.getNodes(); + const staleThreshold = 5 * 60 * 1000; // 5 minutes + let cleanedCount = 0; + + nodes.forEach(node => { + if (node.isStale(staleThreshold)) { + console.log(`๐Ÿงน Cleaning up stale connection: ${node.id}`); + node.forceDisconnect('stale_connection_cleanup'); + cleanedCount++; + } + }); + + if (cleanedCount > 0) { + console.log(`๐Ÿงน Cleaned up ${cleanedCount} stale connections`); + } + } catch (error) { + console.error('โŒ Error during cleanup:', error); + } + } + + async stop(): Promise { + if (this.shutdownPromise) { + return this.shutdownPromise; + } + + if (this.serverState === ServerState.STOPPED) { + console.log('โน๏ธ gRPC server is already stopped'); + return; + } + + this.setState(ServerState.STOPPING); + console.log('โน๏ธ Stopping gRPC server...'); + + this.shutdownPromise = this.performShutdown(); + return this.shutdownPromise; + } + + private async performShutdown(): Promise { + try { + // Stop accepting new connections + console.log('๐Ÿšซ Stopping new connection acceptance...'); + + // Close all active connections gracefully + await this.closeAllConnections(); + + // Clear all intervals + this.clearIntervals(); + + // Shutdown server + await new Promise(resolve => { + const timeoutId = setTimeout(() => { + console.warn('โš ๏ธ Graceful shutdown timeout, forcing shutdown'); + this.server.forceShutdown(); + resolve(); + }, this.config.shutdownGracePeriod); + + this.server.tryShutdown(error => { + clearTimeout(timeoutId); + if (error) { + console.error('โŒ Error during server shutdown:', error); + this.server.forceShutdown(); + } + resolve(); }); - }, - } as MediaSignalingHandlers); + }); + + this.setState(ServerState.STOPPED); + console.log('โœ… gRPC server stopped successfully'); + this.emit('serverStopped', { timestamp: new Date() }); + } catch (error) { + console.error('๐Ÿ’ฅ Error during shutdown:', error); + this.setState(ServerState.ERROR); + throw error; + } finally { + this.shutdownPromise = null; + } + } + + private clearIntervals(): void { + if (this.cleanupInterval) { + clearInterval(this.cleanupInterval); + this.cleanupInterval = null; + } + if (this.healthCheckInterval) { + clearInterval(this.healthCheckInterval); + this.healthCheckInterval = null; + } + if (this.metricsInterval) { + clearInterval(this.metricsInterval); + this.metricsInterval = null; + } + } + + private async closeAllConnections(): Promise { + const nodes = SignalNode.getNodes(); + console.log(`๐Ÿ”Œ Closing ${nodes.length} active connections...`); + + if (nodes.length === 0) { + return; + } + + const closePromises = nodes.map(node => + node.gracefulDisconnect('server_shutdown').catch(error => { + console.warn(`โš ๏ธ Error closing connection ${node.id}:`, error); + }) + ); + + await Promise.allSettled(closePromises); + console.log('โœ… All connections closed'); + } + + private async gracefulShutdown(signal: string): Promise { + console.log(`\n๐Ÿ›‘ Received ${signal}, starting graceful shutdown...`); + + try { + await this.stop(); + this.removeAllListeners(); + console.log('โœ… Graceful shutdown completed'); + process.exit(0); + } catch (error) { + console.error('๐Ÿ’ฅ Error during graceful shutdown:', error); + process.exit(1); + } + } + + private handleUncaughtException(error: Error): void { + console.error('๐Ÿšจ Uncaught Exception:', error); + this.emit('uncaughtException', { error, timestamp: new Date() }); + + // Attempt graceful shutdown + this.gracefulShutdown('uncaughtException').catch(() => { + process.exit(1); + }); + } + + // Public API methods + public getStats(): ServerStats { + return { + ...this.stats, + uptime: Date.now() - this.stats.startTime.getTime(), + }; + } + + public getState(): ServerState { + return this.serverState; + } + + public isRunning(): boolean { + return this.serverState === ServerState.RUNNING; + } + + public incrementMessageStats(sent: number = 0, received: number = 0): void { + this.stats.totalMessagesSent += sent; + this.stats.totalMessagesReceived += received; + } + + public incrementErrorStats(type: keyof ServerStats['errors']): void { + this.stats.errors[type]++; + } + + public broadcastMessage(message: { + type: MSA; + args?: { [key: string]: unknown }; + excludeIds?: string[]; + }): { sent: number; failed: number } { + const nodes = SignalNode.getNodes(); + const eligibleNodes = message.excludeIds + ? nodes.filter(node => !message.excludeIds!.includes(node.id)) + : nodes; + + let sent = 0; + let failed = 0; + + eligibleNodes.forEach(node => { + if (node.sendMessage(message.type, message.args)) { + sent++; + } else { + failed++; + } + }); + + console.log(`๐Ÿ“ข Broadcast ${message.type}: ${sent} sent, ${failed} failed`); + return { sent, failed }; + } + + public getConnectionById(id: string): SignalNode | undefined { + return SignalNode.getNodeById(id); + } + + public getAllConnections(): SignalNode[] { + return SignalNode.getNodes(); } } export const grpcServer = GrpcServer.getInstance(); +export { ServerState, GrpcServer }; diff --git a/src/servers/mediasoup-server.ts b/src/servers/mediasoup-server.ts new file mode 100644 index 0000000..d27e0db --- /dev/null +++ b/src/servers/mediasoup-server.ts @@ -0,0 +1,461 @@ +import { + createWorker, + types as mediasoupTypes, + observer as mediasoupObserver, +} from 'mediasoup'; +import config from '../config'; + +interface WorkerInfo { + pid: number; + worker: mediasoupTypes.Worker; + load: number; + isHealthy: boolean; + usage?: mediasoupTypes.WorkerResourceUsage; +} + +interface ServerMetrics { + totalWorkers: number; + activeWorkers: number; + totalLoad: number; + averageLoad: number; +} + +export class MediasoupServer { + private static instance: MediasoupServer | null = null; + private workers: Map = new Map(); + private isRunning: boolean = false; + private routerRtpCapabilities: mediasoupTypes.RtpCapabilities | null = null; + private initializationPromise: Promise | null = null; + private readonly maxWorkerLoad: number; + + private constructor() { + this.maxWorkerLoad = config.mediasoup.maxWorkerLoad; + this.setupGracefulShutdown(); + } + + static getInstance(): MediasoupServer { + if (!MediasoupServer.instance) { + MediasoupServer.instance = new MediasoupServer(); + } + return MediasoupServer.instance; + } + + async waitForInitialization(): Promise { + if (this.initializationPromise) { + await this.initializationPromise; + } + } + + async start(): Promise { + try { + this.observe(); + await this.createWorkers(); + this.isRunning = true; + + console.info('MediasoupServer started successfully'); + } catch (error) { + console.error('Failed to start MediasoupServer:', error); + throw error; + } + } + + private async createWorkers(): Promise { + const workerPromises: Promise[] = []; + const numWorkers = config.cpus; + + for (let i = 0; i < numWorkers; i++) { + workerPromises.push(this.createSingleWorker(i)); + } + + await Promise.all(workerPromises); + console.info(`Created ${numWorkers} mediasoup workers`); + } + + private async createSingleWorker(index: number): Promise { + try { + const worker = await createWorker({ + ...config.mediasoup.workerSettings, + appData: { index }, + }); + + // Create WebRTC server for this worker + await worker.createWebRtcServer(config.mediasoup.webRtcServer); + + this.setupWorkerEventHandlers(worker); + + const workerInfo: WorkerInfo = { + pid: worker.pid, + worker, + load: 0, + isHealthy: true, + }; + + this.workers.set(worker.pid, workerInfo); + console.info(`Worker ${worker.pid} created successfully`); + } catch (error) { + console.error(`Failed to create worker ${index}:`, error); + throw error; + } + } + + private setupWorkerEventHandlers(worker: mediasoupTypes.Worker): void { + worker.once('died', () => { + console.error(`Worker ${worker.pid} died unexpectedly`); + // this.handleWorkerDeath(worker.pid); + }); + + worker.on('subprocessclose', () => { + console.warn(`Worker ${worker.pid} subprocess closed`); + }); + + // Handle resource usage monitoring + worker.observer.on('close', () => { + console.info(`Worker ${worker.pid} closed`); + this.workers.delete(worker.pid); + }); + } + + increaseWorkerLoad(workerPid: number): boolean { + this.ensureRunning(); + + const workerInfo = this.workers.get(workerPid); + if (!workerInfo) { + console.warn(`Worker ${workerPid} not found`); + return false; + } + + if (workerInfo.load >= this.maxWorkerLoad) { + console.warn(`Worker ${workerPid} is at maximum load`); + return false; + } + + workerInfo.load += 1; + return true; + } + + decreaseWorkerLoad(workerPid: number): boolean { + this.ensureRunning(); + + const workerInfo = this.workers.get(workerPid); + if (!workerInfo) { + console.warn(`Worker ${workerPid} not found`); + return false; + } + + workerInfo.load = Math.max(0, workerInfo.load - 1); + return true; + } + + getHealthyWorkers(): WorkerInfo[] { + return Array.from(this.workers.values()).filter(info => info.isHealthy); + } + + getLeastLoadedWorker(): mediasoupTypes.Worker | null { + const healthyWorkers = this.getHealthyWorkers(); + + if (healthyWorkers.length === 0) { + console.error('No healthy workers available'); + return null; + } + + const leastLoaded = healthyWorkers.reduce((min, current) => + current.load < min.load ? current : min + ); + + return leastLoaded.worker; + } + + async getWorkerWithCapacity(): Promise { + const worker = this.getLeastLoadedWorker(); + if (!worker) return null; + + const workerInfo = this.workers.get(worker.pid); + if (!workerInfo || workerInfo.load >= this.maxWorkerLoad) { + console.warn('All workers are at capacity'); + return null; + } + + return worker; + } + + async getRouterRtpCapabilities(): Promise { + this.ensureRunning(); + + if (this.routerRtpCapabilities) { + return this.routerRtpCapabilities; + } + + try { + const worker = this.getLeastLoadedWorker(); + if (!worker) { + throw new Error('No healthy workers available for router creation'); + } + + const router = await worker.createRouter({ + mediaCodecs: config.mediasoup.routerMediaCodecs, + }); + + this.routerRtpCapabilities = router.rtpCapabilities; + + // Close the temporary router + router.close(); + + return this.routerRtpCapabilities; + } catch (error) { + console.error('Failed to get router RTP capabilities:', error); + throw error; + } + } + + getServerMetrics(): ServerMetrics { + const workerInfos = Array.from(this.workers.values()); + const activeWorkers = workerInfos.filter(info => info.isHealthy); + const totalLoad = activeWorkers.reduce((sum, info) => sum + info.load, 0); + + return { + totalWorkers: workerInfos.length, + activeWorkers: activeWorkers.length, + totalLoad, + averageLoad: + activeWorkers.length > 0 ? totalLoad / activeWorkers.length : 0, + }; + } + + gracefulShutdown(): void { + console.info('Starting graceful shutdown...'); + this.isRunning = false; + + Array.from(this.workers.values()).map(workerInfo => { + try { + workerInfo.worker.close(); + } catch (error) { + console.error(`Error closing worker ${workerInfo.worker.pid}:`, error); + } + }); + + this.workers.clear(); + console.info('Graceful shutdown completed'); + } + + private ensureRunning(): void { + if (!this.isRunning) { + throw new Error( + 'MediasoupServer is not running. Call waitForInitialization() first.' + ); + } + } + + private setupGracefulShutdown(): void { + const shutdown = (): void => { + this.gracefulShutdown(); + process.exit(0); + }; + + process.on('SIGINT', shutdown); + process.on('SIGTERM', shutdown); + process.on('uncaughtException', error => { + console.error('Uncaught exception:', error); + shutdown(); + }); + process.on('unhandledRejection', (reason, promise) => { + console.error('Unhandled rejection at:', promise, 'reason:', reason); + shutdown(); + }); + } + + private observe(): void { + mediasoupObserver.on('newworker', worker => { + // Initialize worker app data + worker.appData.routers = new Map(); + worker.appData.transports = new Map(); + worker.appData.producers = new Map(); + worker.appData.consumers = new Map(); + worker.appData.dataProducers = new Map(); + worker.appData.dataConsumers = new Map(); + worker.appData.webRtcServer = null; + + // Set up observer chain for resource tracking + this.setupWorkerObservers(worker); + }); + } + + private setupWorkerObservers(worker: mediasoupTypes.Worker): void { + worker.observer.on('close', () => { + console.info(`Worker ${worker.pid} observer closed`); + }); + + worker.observer.on('newwebrtcserver', webRtcServer => { + console.info(`WebRTC server created for worker ${worker.pid}`); + worker.appData.webRtcServer = webRtcServer; + }); + + worker.observer.on('newrouter', router => { + this.setupRouterObservers(router, worker); + }); + } + + private setupRouterObservers( + router: mediasoupTypes.Router, + worker: mediasoupTypes.Worker + ): void { + // Initialize router app data + router.appData.transports = new Map(); + router.appData.producers = new Map(); + router.appData.consumers = new Map(); + router.appData.dataProducers = new Map(); + router.appData.dataConsumers = new Map(); + router.appData.webRtcServer = worker.appData.webRtcServer; + router.appData.worker = worker; + + // Add router to worker's collection + (worker.appData.routers as Map).set( + router.id, + router + ); + + router.observer.on('close', () => { + (worker.appData.routers as Map).delete( + router.id + ); + }); + + router.observer.on('newtransport', transport => { + this.setupTransportObservers(transport, router, worker); + }); + } + + private setupTransportObservers( + transport: mediasoupTypes.Transport, + router: mediasoupTypes.Router, + worker: mediasoupTypes.Worker + ): void { + // Initialize transport app data + transport.appData.producers = new Map(); + transport.appData.consumers = new Map(); + transport.appData.dataProducers = new Map(); + transport.appData.dataConsumers = new Map(); + transport.appData.router = router; + + // Add transport to collections + (router.appData.transports as Map).set( + transport.id, + transport + ); + (worker.appData.transports as Map).set( + transport.id, + transport + ); + + transport.observer.on('close', () => { + ( + router.appData.transports as Map + ).delete(transport.id); + ( + worker.appData.transports as Map + ).delete(transport.id); + }); + + transport.observer.on('newproducer', producer => { + this.setupProducerObservers(producer, transport, router, worker); + }); + + transport.observer.on('newconsumer', consumer => { + this.setupConsumerObservers(consumer, transport, router, worker); + }); + } + + private setupProducerObservers( + producer: mediasoupTypes.Producer, + transport: mediasoupTypes.Transport, + router: mediasoupTypes.Router, + worker: mediasoupTypes.Worker + ): void { + producer.appData.transport = transport; + + // Add producer to collections + (transport.appData.producers as Map).set( + producer.id, + producer + ); + (router.appData.producers as Map).set( + producer.id, + producer + ); + (worker.appData.producers as Map).set( + producer.id, + producer + ); + + producer.observer.on('close', () => { + ( + transport.appData.producers as Map + ).delete(producer.id); + (router.appData.producers as Map).delete( + producer.id + ); + (worker.appData.producers as Map).delete( + producer.id + ); + }); + } + + private setupConsumerObservers( + consumer: mediasoupTypes.Consumer, + transport: mediasoupTypes.Transport, + router: mediasoupTypes.Router, + worker: mediasoupTypes.Worker + ): void { + consumer.appData.transport = transport; + + // Add consumer to collections + (transport.appData.consumers as Map).set( + consumer.id, + consumer + ); + (router.appData.consumers as Map).set( + consumer.id, + consumer + ); + (worker.appData.consumers as Map).set( + consumer.id, + consumer + ); + + consumer.observer.on('close', () => { + ( + transport.appData.consumers as Map + ).delete(consumer.id); + (router.appData.consumers as Map).delete( + consumer.id + ); + (worker.appData.consumers as Map).delete( + consumer.id + ); + }); + } + + // Utility methods for debugging and monitoring + + async getWorkerResourceUsage(workerPid: number): Promise { + const workerInfo = this.workers.get(workerPid); + if (!workerInfo) return null; + return { + ...workerInfo, + usage: await workerInfo.worker.getResourceUsage(), + }; + } + + async getWorkersResourceUsage(): Promise { + const usagePromises = Array.from(this.workers.values()).map( + async workerInfo => ({ + ...workerInfo, + usage: await workerInfo.worker.getResourceUsage(), + }) + ); + + return await Promise.all(usagePromises); + } +} + +// Export singleton instance +export const mediaSoupServer = MediasoupServer.getInstance(); diff --git a/src/servers/redis-server.ts b/src/servers/redis-server.ts index be4cd8c..fce89d4 100644 --- a/src/servers/redis-server.ts +++ b/src/servers/redis-server.ts @@ -1,7 +1,7 @@ import { createClient, RedisClientType, SetOptions } from 'redis'; import config from '../config'; -import { PubSubEvents } from '../types/events'; +import { PubSubActions } from '../types/actions'; class RedisServer { private static instance: RedisServer | null = null; @@ -40,12 +40,12 @@ class RedisServer { private async subscribe(): Promise { if (!this.isConnected) throw new Error('Redis clients are not connected. Call connect() first'); - await this.subClient.subscribe(PubSubEvents.Message, message => { + await this.subClient.subscribe(PubSubActions.Message, message => { const { event, args, }: { - event: PubSubEvents; + event: PubSubActions; args: { [key: string]: unknown }; } = JSON.parse(message); @@ -61,14 +61,14 @@ class RedisServer { event, args, }: { - event: PubSubEvents; + event: PubSubActions; args: { [key: string]: unknown }; }): Promise { if (!this.isConnected) throw new Error('Redis clients are not connected. Call connect() first'); const message = JSON.stringify({ event, args }); - await this.pubClient.publish(PubSubEvents.Message, message); + await this.pubClient.publish(PubSubActions.Message, message); console.info(`Message published to channe ${message}`); } diff --git a/src/services/medianode.ts b/src/services/medianode.ts new file mode 100644 index 0000000..e8dab4f --- /dev/null +++ b/src/services/medianode.ts @@ -0,0 +1,211 @@ +import { EventEmitter } from 'events'; +import { types as mediasoupTypes } from 'mediasoup'; +import Room from './room'; +import { AppDataWithRouterId, TransportConnectionParams } from '../types'; +import config from '../config'; + +class MediaNode extends EventEmitter { + id: string; + roomId: string; + // map routerId to pipetransport + sendPipeTransports: Map< + string, + mediasoupTypes.PipeTransport + >; + recvRouter: mediasoupTypes.Router; + // map of remote sendTranportId to recvPipeTransport + recvPipeTransports: Map< + string, + mediasoupTypes.PipeTransport + >; + producers: Map; + consumers: Map; + + private static MediaNodes: Map = new Map(); + + constructor({ + id, + roomId, + sendPipeTransports, + recvRouter, + }: { + id: string; + roomId: string; + recvRouter: mediasoupTypes.Router; + sendPipeTransports: Map< + string, + mediasoupTypes.PipeTransport + >; + }) { + super(); + + this.id = id; + this.roomId = roomId; + + this.producers = new Map(); + this.consumers = new Map(); + this.recvRouter = recvRouter; + this.sendPipeTransports = sendPipeTransports; + this.recvPipeTransports = new Map(); + + // MediaNode.MediaNodes.set(id, this) + // consider increasing workerload systematically. + } + + close(): void { + this.producers.clear(); + this.consumers.clear(); + this.sendPipeTransports.clear(); + + // this.emit(SERVICE_EVENTS.close); + this.removeAllListeners(); + } + + static async create({ + room, + mediaNodeId, + }: { + room: Room; + mediaNodeId: string; + }): Promise { + try { + const sendRouters = Array.from(room.getRouters()); + const sendPipeTransports = await Promise.all( + sendRouters.map(router => room.createPipeTransport({ router })) + ); + + const sendPipeTansportsMap = new Map< + string, + mediasoupTypes.PipeTransport + >(); + for (const transport of sendPipeTransports) { + const routerId = transport.appData.routerId as string; + sendPipeTansportsMap.set(routerId, transport); + } + + const recvRouter = room.getLeastLoadedRouter(); + if (!recvRouter) throw 'No router found'; + // const recvPipeTransport = await meeting.createPipeTransport({ router: recvRouter }); + + const mediaNode = new MediaNode({ + id: mediaNodeId, + roomId: room.roomId, + sendPipeTransports: sendPipeTansportsMap, + recvRouter, + }); + + room.addMediaNode(mediaNode); + return mediaNode; + } catch (error) { + console.error('create medianode failed', error); + throw error; + } + } + + static getMediaNdode = (nodeId: string): MediaNode | undefined => { + return MediaNode.MediaNodes.get(nodeId); + }; + + async connectPipeTransport({ + connectionParam, + transport, + }: { + connectionParam: TransportConnectionParams; + transport: mediasoupTypes.PipeTransport; + }): Promise { + try { + await transport.connect({ + ip: connectionParam.ip, + port: connectionParam.port, + srtpParameters: connectionParam.srtpParameters, + }); + } catch (error) { + console.error('connectPipeTransport failed', error); + } + } + + async createRecvPipeTransport(): Promise { + try { + const pipeTransport = await this.recvRouter.createPipeTransport({ + listenInfo: config.mediasoup.transportListenInfo, + enableSctp: true, + numSctpStreams: { OS: 1024, MIS: 1024 }, + enableRtx: false, + enableSrtp: false, + appData: { + routerId: this.recvRouter.id, + }, + }); + return pipeTransport; + } catch (error) { + console.error('createRecvPipeTransport failed', error); + throw error; + } + } + + getSendPipeTransportsConnectionParam(): TransportConnectionParams[] { + // get array of the connection params of send tranports + const connectionParams: TransportConnectionParams[] = []; + + this.sendPipeTransports.forEach((transport, routerId) => { + connectionParams.push({ + routerId, + transportId: transport.id, + sendTransportId: transport.id, + ip: transport.tuple.localIp, + port: transport.tuple.localPort, + srtpParameters: transport.srtpParameters, + }); + }); + + return connectionParams; + } + + getSendPipeTransport( + routerId: string + ): mediasoupTypes.PipeTransport | undefined { + return this.sendPipeTransports.get(routerId); + } + + getRecvPipeTransport( + remoteSendTranportId: string + ): mediasoupTypes.PipeTransport | undefined { + return this.recvPipeTransports.get(remoteSendTranportId); + } + + removeConsumer(id: string): void { + this.consumers.delete(id); + } + + addConsumer(consumer: mediasoupTypes.Consumer): void { + this.consumers.set(consumer.id, consumer); + consumer.observer.on('close', () => { + this.consumers.delete(consumer.id); + }); + } + getConsumer(id: string): mediasoupTypes.Consumer | undefined { + return this.consumers.get(id); + } + + // Producers methods + addProducer(producer: mediasoupTypes.Producer): void { + this.producers.set(producer.id, producer); + producer.observer.on('close', () => { + this.producers.delete(producer.id); + }); + } + + getProducers(): mediasoupTypes.Producer[] { + return Array.from(this.producers.values()); + } + + getProducer(id: string): mediasoupTypes.Producer | undefined { + return this.producers.get(id); + } + + removeProducer(id: string): void { + this.producers.delete(id); + } +} + +export default MediaNode; diff --git a/src/services/peer.ts b/src/services/peer.ts new file mode 100644 index 0000000..102e20b --- /dev/null +++ b/src/services/peer.ts @@ -0,0 +1,151 @@ +import EventEmitter from 'events'; +import { types as mediasoupTypes } from 'mediasoup'; +import { PeerType, ProducerSource } from '../types'; +import { mediaSoupServer } from '../servers/mediasoup-server'; + +class Peer extends EventEmitter { + id: string; + roomId: string; + closed: boolean; + signalNodeId: string; + type: PeerType; + + rtpCapabilities: mediasoupTypes.RtpCapabilities; + transports: Map; + producers: Map; + consumers: Map; + + router: mediasoupTypes.Router; + workerPid: number; + + static peers = new Map(); + + constructor({ + id, + roomId, + router, + rtpCapabilities, + signalNodeId, + type, + }: { + id: string; + roomId: string; + router: mediasoupTypes.Router; + rtpCapabilities: mediasoupTypes.RtpCapabilities; + signalNodeId: string; + type: PeerType; + }) { + super(); + this.id = id; + this.roomId = roomId; + this.closed = false; + this.rtpCapabilities = rtpCapabilities; + this.router = router; + this.workerPid = (router.appData.worker as mediasoupTypes.Worker).pid; + this.transports = new Map(); + this.producers = new Map(); + this.consumers = new Map(); + this.signalNodeId = signalNodeId; + this.type = type; + // increment worker load + } + + close(): void { + this.closed = true; + + for (const consumer of this.consumers.values()) { + consumer.close(); + } + for (const producer of this.producers.values()) { + producer.close(); + } + for (const transport of this.transports.values()) { + transport.close(); + } + + this.transports.clear(); + this.producers.clear(); + this.consumers.clear(); + + mediaSoupServer.decreaseWorkerLoad(this.workerPid); + + this.removeAllListeners(); // Prevent potential memory leaks + console.info('Peer closed'); + } + + // transport methods + addTransport(transport: mediasoupTypes.WebRtcTransport): void { + this.transports.set(transport.id, transport); + transport.observer.on('close', () => { + this.transports.delete(transport.id); + }); + } + + getTransport(id: string): mediasoupTypes.WebRtcTransport | undefined { + return this.transports.get(id); + } + + removeTransport(id: string): void { + this.transports.delete(id); + } + + // Producer methods + addProducer(producer: mediasoupTypes.Producer): void { + this.producers.set(producer.id, producer); + producer.observer.on('close', () => { + this.producers.delete(producer.id); + }); + } + + getProducer(id: string): mediasoupTypes.Producer | undefined { + return this.producers.get(id); + } + + removeProducer(id: string): void { + this.producers.delete(id); + } + + getProducersBySource(source: ProducerSource): mediasoupTypes.Producer[] { + const allProducers = Array.from(this.producers.values()); + const producers = allProducers.filter( + producer => producer.appData.source === source + ); + return producers; + } + + // Consumers methods + addConsumer(consumer: mediasoupTypes.Consumer): void { + this.consumers.set(consumer.id, consumer); + consumer.observer.on('close', () => { + this.consumers.delete(consumer.id); + }); + } + + getConsumer(id: string): mediasoupTypes.Consumer | undefined { + return this.consumers.get(id); + } + + getConsumerByProducerId( + producerId: string + ): mediasoupTypes.Consumer | undefined { + const allConsumers = Array.from(this.consumers.values()); + const consumer = allConsumers.find( + consumer => consumer.producerId === producerId + ); + return consumer; + } + + getConsumersBySource(source: ProducerSource): mediasoupTypes.Consumer[] { + const allConsumers = Array.from(this.consumers.values()); + const consumer = allConsumers.filter( + consumer => consumer.appData.source === source + ); + return consumer; + } + + removeConsumer(id: string): void { + this.consumers.delete(id); + } +} + +export default Peer; diff --git a/src/services/room.ts b/src/services/room.ts new file mode 100644 index 0000000..7580efb --- /dev/null +++ b/src/services/room.ts @@ -0,0 +1,387 @@ +import { types as mediasoupTypes } from 'mediasoup'; +import { EventEmitter } from 'events'; +import Peer from './peer'; +import config from '../config'; +import { mediaSoupServer } from '../servers/mediasoup-server'; +import { redisServer } from '../servers/redis-server'; +import { getRedisKey } from '../lib/utils'; +import { ServiceActions } from '../types/actions'; +import MediaNode from './medianode'; +import { AppDataWithRouterId } from '../types'; + +class Room extends EventEmitter { + roomId: string; + + activeSpeaker: { + peerId: string | null; + timestamp: number; + }; + closed: boolean; + // peers + + private peers: Map; + + //media nodes + mediaNodes: Map; + // mediasoup details + // workers: Map; + routers: Map; + audioLevelObservers: Map; + // all meets in the server + private static rooms = new Map(); + + constructor({ + roomId, + routers, + audioLevelObservers, + }: { + roomId: string; + routers: Map; + audioLevelObservers: Map; + }) { + super(); + this.roomId = roomId; + + this.peers = new Map(); + this.mediaNodes = new Map(); + + this.closed = false; + this.routers = routers; + this.audioLevelObservers = audioLevelObservers; + this.activeSpeaker = { + peerId: null, + timestamp: 0, + }; + + this.handleAudioLevelObserver(); + // this.handleEvents() + + Room.addRoom(this.roomId, this); + } + + close(): void { + // console.log('Closing room') + if (this.closed) return; + + for (const peer of this.getPeers()) { + peer.close(); + } + + // close routerss + for (const router of this.routers.values()) { + this.audioLevelObservers.delete(router.id); + router.close(); + } + + this.audioLevelObservers.clear(); + this.routers.clear(); + this.peers.clear(); + + Room.rooms.delete(this.roomId); + + // this.emit(SERVICE_EVENTS.close); + + this.removeAllListeners(); + + console.info(`Meeting - ${this.roomId} CLOSED`); + } + + static async create(roomId: string): Promise { + try { + const routers: Map = new Map(); + const audioLevelObservers: Map< + string, + mediasoupTypes.AudioLevelObserver + > = new Map(); + + for (const workerInfo of mediaSoupServer.getHealthyWorkers()) { + const router = await workerInfo.worker.createRouter({ + mediaCodecs: config.mediasoup.routerMediaCodecs, + }); + routers.set(router.id, router); + + const audioLevelObserver = await router.createAudioLevelObserver({ + maxEntries: 1, + threshold: -80, + interval: 1800, + appData: { + peerId: null, + volume: -1000, + }, + }); + + audioLevelObservers.set(router.id, audioLevelObserver); + } + + const room = new Room({ roomId, routers, audioLevelObservers }); + + // publish mediaNodeJoinMeeting + // redisServer.publish + + return room; + } catch (error) { + console.error('create meeting failed', error); + throw error; + } + } + + static addRoom(roomId: string, room: Room): void { + Room.rooms.set(roomId, room); + } + + static getRoom(roomId: string): Room | undefined { + return Room.rooms.get(roomId); + } + + static removeRoom(roomId: string): void { + Room.rooms.delete(roomId); + } + + // peers + addPeer(peer: Peer): void { + this.peers.set(peer.id, peer); + // this.emit(SERVICE_EVENTS.peerAdded, peer); + this.handlePeerEvents(peer); + } + + getPeer(id: string): Peer | undefined { + return this.peers.get(id); + } + + getPeers(): Peer[] { + return Array.from(this.peers.values()); + } + + removePeer(id: string): void { + // const peer = this.peers.get(id); + this.peers.delete(id); + + // if (peer) this.emit(SERVICE_EVENTS.peerRemoved, peer); + } + + getRouters(): mediasoupTypes.Router[] { + return Array.from(this.routers.values()); + } + + getRouterRtpCapabilities(): mediasoupTypes.RtpCapabilities { + return Array.from(this.routers.values())[0].rtpCapabilities; + } + + async assignRouterToPeer(): Promise { + const router = this.getLeastLoadedRouter(); + if (router) { + await this.pipeProducersToRouter(router); + return router; + } + return null; + } + + getRoutersToPipeTo( + originRouter: mediasoupTypes.Router + ): mediasoupTypes.Router[] { + return Array.from(this.routers.values()).filter( + router => router.id !== originRouter.id + ); + } + + getLeastLoadedRouter(): mediasoupTypes.Router | null { + // the least loaded router, + // is the room router of the least loaded worker + + const leastLoadedWorker = mediaSoupServer.getLeastLoadedWorker(); + + if (!leastLoadedWorker) throw 'Least Loaded Worker not found'; + + for (const routerId of ( + leastLoadedWorker.appData.routers as Map + ).keys()) { + const router = this.routers.get(routerId); + if (router) { + return router; + } + } + + return null; + } + + private async pipeProducersToRouter( + router: mediasoupTypes.Router + ): Promise { + try { + const peersToPipe = Array.from(this.peers.values()).filter( + peer => peer.router.id !== router.id + ); + for (const peer of peersToPipe) { + const srcRouter = peer.router; + if (srcRouter) { + for (const producerId of peer.producers.keys()) { + if ( + ( + router.appData.producers as Map + ).has(producerId) + ) { + continue; + } + await srcRouter.pipeToRouter({ + producerId, + router, + }); + } + } + } + } catch (error) { + console.error('pipeProducersToRouter', error); + } + } + + private handleAudioLevelObserver(): void { + this.audioLevelObservers.forEach(audioLevelObserver => { + audioLevelObserver.on('volumes', volumes => { + const { producer, volume } = volumes[0]; + + audioLevelObserver.appData = { + ...audioLevelObserver.appData, + peerId: producer.appData.peerId, + volume: volume, + // speakerIds + }; + this.broadcastActiveSpeakerInfo(); + }); + + audioLevelObserver.on('silence', () => { + audioLevelObserver.appData = { + ...audioLevelObserver.appData, + peerId: null, + volume: -1000, + // speakerIds: [] + }; + + this.broadcastActiveSpeakerInfo(); + }); + }); + } + + private broadcastActiveSpeakerInfo = (): void => { + let peerId = null; + let maxVolume = -1000; + const speakerIds: string[] = []; + + this.audioLevelObservers.forEach(audioLevelObserver => { + const tmpPeerId = audioLevelObserver.appData.peerId as string; + const tmpVolume = audioLevelObserver.appData.volume as number; + if (tmpPeerId) { + if (tmpVolume > maxVolume) { + peerId = tmpPeerId; + maxVolume = tmpVolume; + } + speakerIds.push(tmpPeerId); + } + }); + + if (this.activeSpeaker.peerId === peerId) return; + // this ensures that the minimun gap between the previous and the current + // active speaker is 1 second + if (Date.now() > this.activeSpeaker.timestamp + 2000) { + this.activeSpeaker = { + peerId, + timestamp: Date.now(), + }; + + // store meeting active speaker peerid in db. + // todo may require a different position when optimising for multiple media servers] + redisServer.set( + getRedisKey['roomActiveSpeakerPeerId'](this.roomId), + JSON.stringify(peerId) + ); + + // todo work on optimising this in future to send to once to a signal node and the signal node broadcast to all peers + // there is an edge case this implemenation is not working for. + // it is not broadcasting for peers in other servers. + for (const peer of this.peers.values()) { + if (peer.id === peerId) continue; + // Todo optimise + // peer.signalNode.sendMessage(SIGNALLING_EVENTS.activeSpeaker, { + // peerId: peer.id, + // peerType: peer.type, + // roomId: this.meetingId, + // volume: maxVolume, + // speakerIds, + // activeSpeakerPeerId: peerId, + // }); + } + } + }; + + private handlePeerEvents(peer: Peer): void { + peer.on(ServiceActions.Close, () => { + if (!this.getPeer(peer.id)) return; + this.removePeer(peer.id); + }); + } + + async createPipeTransport({ + router, + }: { + router: mediasoupTypes.Router; + }): Promise> { + const pipeTransport = await router.createPipeTransport({ + listenInfo: config.mediasoup.transportListenInfo, + enableSctp: true, + numSctpStreams: { OS: 1024, MIS: 1024 }, + enableRtx: false, + enableSrtp: false, + appData: { + routerId: router.id, + }, + }); + return pipeTransport; + } + + async createPipeConsumersForExistingProducers({ + consumingMediaNode, + }: { + consumingMediaNode: MediaNode; + }): Promise { + const peers = this.getPeers(); + for (const peer of peers) { + const peerProducers = peer.producers.values(); + for (const producer of peerProducers) { + this.createPipeConsumer({ + producer, + producerPeerId: peer.id, + consumingMediaNode, + }); + } + } + } + + async createPipeConsumer({ + producer, + producerPeerId, + consumingMediaNode, + }: { + producer: mediasoupTypes.Producer; + producerPeerId: string; + consumingMediaNode: MediaNode; + }): Promise { + try { + // const router = this.getLeastLoadedRouter(); + console.log(producer, producerPeerId, consumingMediaNode); + } catch (error) { + console.error(`Pipe Consumer failed ${error}`); + } + } + + addMediaNode(mediaNode: MediaNode): void { + this.mediaNodes.set(mediaNode.id, mediaNode); + mediaNode.on(ServiceActions.Close, () => { + this.mediaNodes.delete(mediaNode.id); + }); + } + + getMediaNodes(): MediaNode[] { + return Array.from(this.mediaNodes.values()); + } +} + +export default Room; diff --git a/src/services/signalnode.ts b/src/services/signalnode.ts new file mode 100644 index 0000000..b108ecb --- /dev/null +++ b/src/services/signalnode.ts @@ -0,0 +1,794 @@ +import EventEmitter from 'events'; +import * as grpc from '@grpc/grpc-js'; + +import { MediaSignalingActions as MSA } from '../types/actions'; +import { MessageRequest } from '../protos/gen/mediaSignalingPackage/MessageRequest'; +import { MessageResponse } from '../protos/gen/mediaSignalingPackage/MessageResponse'; +import { grpcServer } from '../servers/grpc-server'; +import { mediaSoupServer } from '../servers/mediasoup-server'; + +enum ConnectionState { + CONNECTING = 'CONNECTING', + CONNECTED = 'CONNECTED', + DISCONNECTING = 'DISCONNECTING', + DISCONNECTED = 'DISCONNECTED', + ERROR = 'ERROR', +} + +interface ConnectionMetrics { + connectedAt: number; + lastActivity: number; + lastHeartbeat: number; + messagesSent: number; + messagesReceived: number; + errors: number; + heartbeatsReceived: number; + heartbeatsMissed: number; +} + +interface MessageQueueItem { + action: MSA; + args?: { [key: string]: unknown }; + timestamp: number; + retries: number; + id: string; +} + +class SignalNode extends EventEmitter { + id: string; + connectionId: string; + call: grpc.ServerDuplexStream; + metadata: grpc.Metadata; + private connectionState: ConnectionState; + private metrics: ConnectionMetrics; + private heartbeatInterval?: NodeJS.Timeout; + private messageQueue: MessageQueueItem[] = []; + private maxQueueSize: number = 100; + private messageTimeout: number = 30000; + private heartbeatTimeout: number = 60000; + private maxConsecutiveErrors: number = 5; + private consecutiveErrors: number = 0; + private isShuttingDown: boolean = false; + private pendingRequests: Map void>; + + static signalNodes = new Map(); + + constructor({ + id, + call, + connectionId, + }: { + id: string; + call: grpc.ServerDuplexStream; + connectionId?: string; + }) { + super(); + + this.id = id; + this.connectionId = connectionId || this.generateConnectionId(); + this.call = call; + this.metadata = call.metadata; + this.connectionState = ConnectionState.CONNECTING; + this.pendingRequests = new Map(); + + const now = Date.now(); + this.metrics = { + connectedAt: now, + lastActivity: now, + lastHeartbeat: now, + messagesSent: 0, + messagesReceived: 0, + errors: 0, + heartbeatsReceived: 0, + heartbeatsMissed: 0, + }; + + // Add to static collection with duplicate handling + if (SignalNode.signalNodes.has(id)) { + console.warn( + `โš ๏ธ SignalNode with ID ${id} already exists, removing old instance` + ); + const oldNode = SignalNode.signalNodes.get(id); + oldNode?.forceDisconnect('duplicate_connection'); + } + + SignalNode.signalNodes.set(id, this); + + this.initialize(); + + console.log( + `โœ… New SignalNode created - ID: ${this.id}, Connection: ${this.connectionId}` + ); + } + + private initialize(): void { + try { + this.setupMessageHandlers(); + this.setupHeartbeat(); + this.setState(ConnectionState.CONNECTED); + this.sendConnectionConfirmation(); + } catch (error) { + console.error(`โŒ Error initializing SignalNode ${this.id}:`, error); + this.handleError(error as Error, 'initialization_error'); + } + } + + private generateConnectionId(): string { + return `${this.id}_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`; + } + + private setState(newState: ConnectionState): void { + if (this.connectionState !== newState) { + const oldState = this.connectionState; + this.connectionState = newState; + console.log(`๐Ÿ“ก SignalNode ${this.id} state: ${oldState} -> ${newState}`); + this.emit('stateChanged', { oldState, newState, nodeId: this.id }); + } + } + + private setupHeartbeat(): void { + this.clearHeartbeat(); + + this.heartbeatInterval = setInterval(() => { + if (!this.isActive()) { + return; + } + + const timeSinceLastActivity = Date.now() - this.metrics.lastActivity; + // const timeSinceLastHeartbeat = Date.now() - this.metrics.lastHeartbeat; + + // Check for stale connection + if (timeSinceLastActivity > this.heartbeatTimeout) { + console.warn( + `๐Ÿ’” Connection ${this.id} is stale (${timeSinceLastActivity}ms since last activity)` + ); + this.metrics.heartbeatsMissed++; + + if (this.metrics.heartbeatsMissed >= 3) { + this.handleError(new Error('Heartbeat timeout'), 'heartbeat_timeout'); + return; + } + } + + // Send heartbeat + try { + if ( + this.sendMessage(MSA.Heartbeat, { + timestamp: Date.now(), + connectionId: this.connectionId, + }) + ) { + this.metrics.lastHeartbeat = Date.now(); + } + } catch (error) { + console.error(`โŒ Error sending heartbeat to ${this.id}:`, error); + this.handleError(error as Error, 'heartbeat_error'); + } + }, 30000); // Send heartbeat every 30 seconds + } + + private clearHeartbeat(): void { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = undefined; + } + } + + private setupMessageHandlers(): void { + if (!this.call) { + throw new Error('gRPC call is null'); + } + + // Handle incoming messages + this.call.on('data', (message: MessageRequest) => { + this.handleIncomingMessage(message); + }); + + // Handle connection events + this.call.on('end', () => { + console.log(`๐Ÿ“ค Client ${this.id} ended the connection gracefully`); + this.handleClientDisconnection('client_ended'); + }); + + this.call.on('cancelled', () => { + console.log(`๐Ÿšซ Client ${this.id} cancelled the connection`); + this.handleClientDisconnection('cancelled'); + }); + + this.call.on('error', (error: Error) => { + console.error(`๐Ÿ’ฅ Stream error for client ${this.id}:`, error); + this.handleError(error, 'stream_error'); + }); + + this.call.on('close', () => { + console.log(`๐Ÿ”Œ Stream closed for client ${this.id}`); + if (this.connectionState === ConnectionState.CONNECTED) { + this.handleClientDisconnection('stream_closed'); + } + }); + } + + private handleIncomingMessage(message: MessageRequest): void { + try { + this.metrics.messagesReceived++; + this.metrics.lastActivity = Date.now(); + this.consecutiveErrors = 0; // Reset error count on successful message + + const { action, args, requestId } = message; + + if (requestId?.length) { + const resolver = this.pendingRequests.get(requestId); + if (resolver) { + // this means this instance initiated this request for response . + // resolve and return + resolver(message); + this.pendingRequests.delete(requestId); + return; + } + } + + if (!action) { + console.warn(`โš ๏ธ Received message without action from ${this.id}`); + this.handleError( + new Error('Missing action in message'), + 'protocol_error' + ); + return; + } + + console.log(`๐Ÿ“จ Received message from ${this.id}: ${action}`); + + let parsedArgs: { [key: string]: unknown } = {}; + if (args) { + try { + parsedArgs = JSON.parse(args); + } catch (parseError) { + console.error( + `โŒ Failed to parse message args from ${this.id}:`, + parseError + ); + this.handleError(parseError as Error, 'parse_error'); + return; + } + } + + // Handle special system messages + if (action === MSA.Heartbeat) { + this.handleHeartbeat(parsedArgs); + return; + } + + // Find and execute handler + const handler = this.actionHandlers[action as MSA]; + if (handler) { + try { + handler(parsedArgs); + } catch (handlerError) { + console.error( + `โŒ Error in handler for action ${action} from ${this.id}:`, + handlerError + ); + this.handleError(handlerError as Error, 'handler_error'); + } + } else { + console.warn(`โš ๏ธ No handler for action ${action} from ${this.id}`); + this.emit('unhandledMessage', { + nodeId: this.id, + action, + args: parsedArgs, + }); + } + + this.emit('messageReceived', { + nodeId: this.id, + action, + args: parsedArgs, + timestamp: Date.now(), + }); + } catch (error) { + console.error(`๐Ÿ’ฅ Error handling message from ${this.id}:`, error); + this.handleError(error as Error, 'message_handling_error'); + } + } + + private handleHeartbeat(args: { [key: string]: unknown }): void { + console.log(args); + this.metrics.heartbeatsReceived++; + this.metrics.heartbeatsMissed = 0; // Reset missed heartbeats + + // Respond to heartbeat + this.sendMessage(MSA.HeartbeatAck, { + timestamp: Date.now(), + connectionId: this.connectionId, + metrics: { + messagesSent: this.metrics.messagesSent, + messagesReceived: this.metrics.messagesReceived, + uptime: Date.now() - this.metrics.connectedAt, + }, + }); + } + + private handleError(error: Error, context: string): void { + this.metrics.errors++; + this.consecutiveErrors++; + + console.error( + `๐Ÿ’ฅ SignalNode ${this.id} error [${context}]:`, + error.message + ); + + this.emit('error', { + nodeId: this.id, + error, + context, + consecutiveErrors: this.consecutiveErrors, + timestamp: Date.now(), + }); + + // Disconnect if too many consecutive errors + if (this.consecutiveErrors >= this.maxConsecutiveErrors) { + console.error( + `๐Ÿšซ Too many consecutive errors (${this.consecutiveErrors}) for ${this.id}, disconnecting` + ); + this.handleClientDisconnection('too_many_errors', error); + } + } + + private handleClientDisconnection(reason: string, error?: Error): void { + if (this.isShuttingDown) { + return; // Already handled + } + + console.log( + `๐Ÿ”Œ Client ${this.id} disconnected (${reason})`, + error ? `: ${error.message}` : '' + ); + + this.setState(ConnectionState.DISCONNECTED); + this.cleanup(); + + this.emit('disconnected', { + nodeId: this.id, + reason, + error, + metrics: this.getMetrics(), + timestamp: Date.now(), + }); + } + + private cleanup(): void { + this.isShuttingDown = true; + + // Clear heartbeat + this.clearHeartbeat(); + + // Clear message queue + this.messageQueue = []; + + // Remove from static collection + SignalNode.signalNodes.delete(this.id); + + // Remove all listeners + this.removeAllListeners(); + + console.log(`๐Ÿงน Cleaned up SignalNode ${this.id}`); + } + + private async sendConnectionConfirmation(): Promise { + const rtpCapabilities = await mediaSoupServer.getRouterRtpCapabilities(); + this.sendMessage(MSA.Connected, { + status: 'success', + nodeId: this.id, + connectionId: this.connectionId, + message: 'Successfully connected to Media Signaling Server', + timestamp: Date.now(), + serverMetrics: grpcServer.getStats() || {}, + routerRtpCapabilities: rtpCapabilities, + }); + } + + // Public methods + sendMessage(action: MSA, args?: { [key: string]: unknown }): boolean { + if (!this.isActive()) { + console.warn(`โš ๏ธ Cannot send message to ${this.id}: node is inactive`); + return false; + } + + if (!this.call) { + console.warn(`โš ๏ธ Cannot send message to ${this.id}: call is null`); + return false; + } + + try { + const messageId = this.generateMessageId(); + const message = { + action, + args: JSON.stringify(args || {}), + }; + + this.call.write(message); + + this.metrics.messagesSent++; + this.metrics.lastActivity = Date.now(); + grpcServer.incrementMessageStats(1, 0); + + console.log(`๐Ÿ“ค Sent ${action} to ${this.id} (${messageId})`); + + return true; + } catch (error) { + console.error(`โŒ Error sending message to ${this.id}:`, error); + this.handleError(error as Error, 'send_message_error'); + return false; + } + } + + async sendMessageForResponse( + action: MSA, + args?: { [key: string]: unknown } + ): Promise { + if (!this.call) { + console.warn( + `โš ๏ธ Cannot send message to MediaNode ${this.id}: not connected` + ); + return null; + } + + try { + const requestId = crypto.randomUUID(); + const message: MessageRequest = { + action, + args: JSON.stringify(args || {}), + requestId, + }; + + return new Promise(resolve => { + if (this.call) { + this.pendingRequests.set(requestId, resolve); // save resolve + this.call.write(message); + } + }); + } catch (error) { + console.error(`โŒ Error sending message to MediaNode ${this.id}:`, error); + throw error; + } + } + + queueMessage(action: MSA, args?: { [key: string]: unknown }): boolean { + if (this.messageQueue.length >= this.maxQueueSize) { + console.warn( + `โš ๏ธ Message queue full for ${this.id}, dropping oldest message` + ); + this.messageQueue.shift(); // Remove oldest message + } + + const messageItem: MessageQueueItem = { + action, + args, + timestamp: Date.now(), + retries: 0, + id: this.generateMessageId(), + }; + + this.messageQueue.push(messageItem); + console.log( + `๐Ÿ“ Queued message ${action} for ${this.id} (queue size: ${this.messageQueue.length})` + ); + + return true; + } + + flushMessageQueue(): number { + if (!this.isActive() || this.messageQueue.length === 0) { + return 0; + } + + let sentCount = 0; + const messages = [...this.messageQueue]; + this.messageQueue = []; + + messages.forEach(messageItem => { + if (this.sendMessage(messageItem.action, messageItem.args)) { + sentCount++; + } else { + // Re-queue failed messages if under retry limit + if (messageItem.retries < 3) { + messageItem.retries++; + this.messageQueue.push(messageItem); + } + } + }); + + if (sentCount > 0) { + console.log(`๐Ÿ“ค Flushed ${sentCount} queued messages for ${this.id}`); + } + + return sentCount; + } + + async gracefulDisconnect( + reason: string = 'graceful_shutdown' + ): Promise { + if (this.connectionState === ConnectionState.DISCONNECTED) { + return; + } + + console.log(`๐Ÿ‘‹ Gracefully disconnecting ${this.id} (${reason})`); + this.setState(ConnectionState.DISCONNECTING); + + try { + // Send disconnect notification + this.sendMessage(MSA.ServerShutdown, { + message: 'Server is shutting down', + reason, + timestamp: Date.now(), + }); + + // Flush any remaining messages + this.flushMessageQueue(); + + // Wait a bit for messages to be sent + await new Promise(resolve => setTimeout(resolve, 1000)); + + // End the call gracefully + if (this.call) { + this.call.end(); + } + } catch (error) { + console.warn( + `โš ๏ธ Error during graceful disconnect for ${this.id}:`, + error + ); + } finally { + this.handleClientDisconnection(reason); + } + } + + forceDisconnect(reason: string = 'force_disconnect'): void { + console.log(`๐Ÿ’ฅ Force disconnecting ${this.id} (${reason})`); + + try { + if (this.call) { + this.call.destroy(); + } + } catch (error) { + console.warn(`โš ๏ธ Error during force disconnect for ${this.id}:`, error); + } finally { + this.handleClientDisconnection(reason); + } + } + + // Utility methods + isActive(): boolean { + return ( + this.connectionState === ConnectionState.CONNECTED && !this.isShuttingDown + ); + } + + isStale(threshold: number = 300000): boolean { + // 5 minutes default + return Date.now() - this.metrics.lastActivity > threshold; + } + + getMetrics(): ConnectionMetrics { + return { ...this.metrics }; + } + + getState(): ConnectionState { + return this.connectionState; + } + + getUptime(): number { + return Date.now() - this.metrics.connectedAt; + } + + getQueueSize(): number { + return this.messageQueue.length; + } + + private generateMessageId(): string { + return `msg_${Date.now()}_${Math.random().toString(36).substr(2, 6)}`; + } + + // Action handlers for different message types + private actionHandlers: { + [key in MSA]?: ( + args: { [key: string]: unknown }, + requestId?: string + ) => void; + } = { + [MSA.Connected]: args => { + console.log(`โœ… Connection confirmed from ${this.id}:`, args); + this.emit('connectionConfirmed', { nodeId: this.id, args }); + }, + + [MSA.Ping]: (args, requestId) => { + console.log('Signal Server Pinged Mediaserver'); + this.call.write({ + action: MSA.Pong, + args: JSON.stringify(args), + requestId, + }); + }, + + [MSA.HeartbeatAck]: args => { + console.log(`๐Ÿ’— Heartbeat acknowledged by ${this.id}`, args); + this.metrics.lastActivity = Date.now(); + }, + + // Add more handlers as needed for your specific actions + }; + + // Static methods for managing all nodes + static getNodes(): SignalNode[] { + return Array.from(SignalNode.signalNodes.values()); + } + + static getNodeById(id: string): SignalNode | undefined { + return SignalNode.signalNodes.get(id); + } + + static getActiveNodes(): SignalNode[] { + return Array.from(SignalNode.signalNodes.values()).filter(node => + node.isActive() + ); + } + + static getNodeCount(): number { + return SignalNode.signalNodes.size; + } + + static getActiveNodeCount(): number { + return this.getActiveNodes().length; + } + + static async disconnectAll(): Promise { + console.log( + `๐Ÿ›‘ Disconnecting all ${SignalNode.signalNodes.size} signal nodes...` + ); + + const nodes = Array.from(SignalNode.signalNodes.values()); + const disconnectPromises = nodes.map(node => + node.gracefulDisconnect('disconnect_all').catch(error => { + console.warn(`โš ๏ธ Error disconnecting node ${node.id}:`, error); + node.forceDisconnect('disconnect_all_force'); + }) + ); + + await Promise.allSettled(disconnectPromises); + SignalNode.signalNodes.clear(); + console.log('โœ… All signal nodes disconnected'); + } + + static broadcastMessage( + action: MSA, + args?: { [key: string]: unknown }, + options?: { + excludeIds?: string[]; + onlyActive?: boolean; + queueIfUnavailable?: boolean; + } + ): { sent: number; queued: number; failed: number } { + const opts = { + excludeIds: [], + onlyActive: true, + queueIfUnavailable: false, + ...options, + }; + + let nodes = Array.from(SignalNode.signalNodes.values()); + + // Filter nodes based on options + if (opts.excludeIds.length > 0) { + nodes = nodes.filter(node => !opts.excludeIds!.includes(node.id)); + } + + if (opts.onlyActive) { + nodes = nodes.filter(node => node.isActive()); + } + + let sent = 0; + let queued = 0; + let failed = 0; + + nodes.forEach(node => { + if (node.isActive()) { + if (node.sendMessage(action, args)) { + sent++; + } else { + failed++; + } + } else if (opts.queueIfUnavailable) { + if (node.queueMessage(action, args)) { + queued++; + } else { + failed++; + } + } else { + failed++; + } + }); + + console.log( + `๐Ÿ“ข Broadcast ${action}: ${sent} sent, ${queued} queued, ${failed} failed` + ); + return { sent, queued, failed }; + } + + static getConnectionStats(): { + total: number; + active: number; + connecting: number; + disconnecting: number; + disconnected: number; + error: number; + } { + const nodes = Array.from(SignalNode.signalNodes.values()); + + return { + total: nodes.length, + active: nodes.filter(n => n.connectionState === ConnectionState.CONNECTED) + .length, + connecting: nodes.filter( + n => n.connectionState === ConnectionState.CONNECTING + ).length, + disconnecting: nodes.filter( + n => n.connectionState === ConnectionState.DISCONNECTING + ).length, + disconnected: nodes.filter( + n => n.connectionState === ConnectionState.DISCONNECTED + ).length, + error: nodes.filter(n => n.connectionState === ConnectionState.ERROR) + .length, + }; + } + + static getDetailedMetrics(): { + totalNodes: number; + activeNodes: number; + totalMessagesSent: number; + totalMessagesReceived: number; + totalErrors: number; + avgUptime: number; + nodes: Array<{ + id: string; + connectionId: string; + state: ConnectionState; + metrics: ConnectionMetrics; + uptime: number; + queueSize: number; + }>; + } { + const nodes = Array.from(SignalNode.signalNodes.values()); + const activeNodes = nodes.filter(n => n.isActive()); + + return { + totalNodes: nodes.length, + activeNodes: activeNodes.length, + totalMessagesSent: nodes.reduce( + (sum, node) => sum + node.metrics.messagesSent, + 0 + ), + totalMessagesReceived: nodes.reduce( + (sum, node) => sum + node.metrics.messagesReceived, + 0 + ), + totalErrors: nodes.reduce((sum, node) => sum + node.metrics.errors, 0), + avgUptime: + nodes.length > 0 + ? nodes.reduce((sum, node) => sum + node.getUptime(), 0) / + nodes.length + : 0, + nodes: nodes.map(node => ({ + id: node.id, + connectionId: node.connectionId, + state: node.connectionState, + metrics: node.getMetrics(), + uptime: node.getUptime(), + queueSize: node.getQueueSize(), + })), + }; + } +} + +export default SignalNode; +export { ConnectionState, SignalNode }; diff --git a/src/types/actions.ts b/src/types/actions.ts new file mode 100644 index 0000000..343cea6 --- /dev/null +++ b/src/types/actions.ts @@ -0,0 +1,56 @@ +export enum SignalingClientActions { + Message = 'message', + Heartbeat = 'heartbeat', + Connected = 'connected', + JoinRoom = 'join-room', + JoinVisitors = 'join-visitors', + JoinWaiters = 'join-waiters', + GetRoomData = 'get-room-data', + GetRtpCapabilities = 'get-rtp-capabilities', +} +export enum PubSubActions { + Message = 'message', + EndMeeting = 'end-meeting', +} + +export enum ServiceActions { + Close = 'close', +} + +export enum MediaSignalingActions { + // Connection lifecycle + Connected = 'connected', + Disconnect = 'disconnect', + Reconnect = 'reconnect', + + // Health monitoring + Heartbeat = 'heartbeat', + HeartbeatAck = 'heartbeat_ack', + Ping = 'ping', + Pong = 'pong', + + // Server management` + ServerShutdown = 'server_shutdown', + ServerRestart = 'server_restart', + + // Error handling + Error = 'error', + ConnectionError = 'connection_error', + + // Media specific actions (add your custom actions here) + MediaOffer = 'media_offer', + MediaAnswer = 'media_answer', + IceCandidate = 'ice_candidate', + MediaStreamStart = 'media_stream_start', + MediaStreamStop = 'media_stream_stop', + + // Room/channel management + JoinRoom = 'join_room', + LeaveRoom = 'leave_room', + RoomUpdate = 'room_update', + + // Custom actions placeholder + Custom = 'custom', + + RtpCapabilities = 'rtp_capabilities', +} diff --git a/src/types/events.ts b/src/types/events.ts deleted file mode 100644 index a5fdc40..0000000 --- a/src/types/events.ts +++ /dev/null @@ -1,17 +0,0 @@ -export enum SignallingEvents { - Message = 'message', - Connected = 'connected', - JoinRoom = 'join-room', - JoinVisitors = 'join-visitors', - JoinWaiters = 'join-waiters', - GetRoomData = 'get-room-data', - GetRtpCapabilities = 'get-rtp-capabilities', -} -export enum PubSubEvents { - Message = 'message', - EndMeeting = 'end-meeting', -} - -export enum ServiceEvents { - Close = 'close', -} diff --git a/src/types/interfaces.ts b/src/types/index.ts similarity index 73% rename from src/types/interfaces.ts rename to src/types/index.ts index 76fe77f..e1effb1 100644 --- a/src/types/interfaces.ts +++ b/src/types/index.ts @@ -1,3 +1,5 @@ +import { types as mediasoupTypes } from 'mediasoup'; + export type ProducerSource = 'mic' | 'camera' | 'screen' | 'screenAudio'; export type TransportKind = 'producer' | 'consumer'; @@ -6,7 +8,7 @@ export type AckCallback = (res: { error?: Error | unknown | null; response?: T; }) => void; -export type PeerType = 'Recorder' | 'Attendee'; +export type PeerType = 'Recorder' | 'Participant'; export enum Role { Moderator = 'Moderator', @@ -117,7 +119,8 @@ export interface ChatData { export interface MediaNodeData { id: string; ip: string; - host: string; + address: string; + grpcPort: string; } export interface Reaction { @@ -128,3 +131,30 @@ export interface Reaction { position: `${number}%`; timestamp: number; } + +export interface PipeConsumerParams { + producerId: string; + kind: mediasoupTypes.MediaKind; + producerPaused: boolean; + rtpParameters: mediasoupTypes.RtpParameters; + sendTranportId: string; + roomId: string; + recvMediaNodeId: string; + sendMediaNodeId: string; + producerPeerId: string; + appData: mediasoupTypes.AppData; +} + +export interface TransportConnectionParams { + routerId: string; + transportId: string; + sendTransportId?: string; + recvTransportId?: string; + ip: string; + port: number; + srtpParameters?: mediasoupTypes.SrtpParameters; +} + +export type AppDataWithRouterId = mediasoupTypes.AppData & { routerId: string }; + +// WorkerData, RouterData, TransportData, ConsumerData, Producer