diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..571cd0f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,13 @@ +# GIT +.git +.gitignore + +# Build +/target + +# IDE +.vscode + +# Environment +.env +.env.example \ No newline at end of file diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..d61423d --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,24 @@ +name: build +on: + pull_request: + branches: + - main + - dev +jobs: + build: + name: Build + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v3 + - name: Build an image from Dockerfile + run: docker build -t docker.io/account0123/volcano:${{ github.sha }} . + - name: Run Trivy vulnerability scanner + uses: aquasecurity/trivy-action@0.28.0 + with: + image-ref: 'docker.io/account0123/volcano:${{ github.sha }}' + format: 'table' + exit-code: '1' + ignore-unfixed: true + vuln-type: 'os,library' + severity: 'CRITICAL,HIGH' \ No newline at end of file diff --git a/.gitignore b/.gitignore index ccb5166..34dfdb1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target -.vscode \ No newline at end of file +.vscode +config.toml \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index d331488..a2184fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,55 @@ dependencies = [ "memchr", ] +[[package]] +name = "anstream" +version = "0.6.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "862ed96ca487e809f1c8e5a8447f6ee2cf102f846893800b20cebdf541fc6bbd" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.82" @@ -236,9 +285,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.94" +version = "1.2.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f6e324229dc011159fcc089755d1e2e216a90d43a7dea6853ca740b84f35e7" +checksum = "d0fc897dc1e865cc67c0e05a836d9d3f1df3cbe442aa4a9473b18e12624a4951" +dependencies = [ + "shlex", +] [[package]] name = "ccm" @@ -268,6 +320,52 @@ dependencies = [ "inout", ] +[[package]] +name = "clap" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40b6887a1d8685cebccf115538db5c0efe625ccac9696ad45c409d96566e910f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0c66c08ce9f0c698cbce5c0279d0bb6ac936d8674174fe48f736533b964f59e" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c7947ae4cc3d851207c1adb5b5e260ff0cca11446b1d6d1423788e442257ce" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675" + +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "const-oid" version = "0.9.6" @@ -686,6 +784,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -771,9 +875,9 @@ dependencies = [ [[package]] name = "interceptor" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5ab04c530fd82e414e40394cabe5f0ebfe30d119f10fe29d6e3561926af412e" +checksum = "1ac0781c825d602095113772e389ef0607afcb869ae0e68a590d8e0799cdcef8" dependencies = [ "async-trait", "bytes", @@ -806,6 +910,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" + [[package]] name = "itoa" version = "1.0.11" @@ -829,9 +939,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.153" +version = "0.2.172" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "lock_api" @@ -1301,24 +1411,23 @@ dependencies = [ [[package]] name = "ring" -version = "0.17.8" +version = "0.17.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" +checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", "getrandom", "libc", - "spin", "untrusted", "windows-sys 0.52.0", ] [[package]] name = "rtcp" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8306430fb118b7834bbee50e744dc34826eca1da2158657a3d6cbc70e24c2096" +checksum = "e9689528bf3a9eb311fd938d05516dd546412f9ce4fffc8acfc1db27cc3dbf72" dependencies = [ "bytes", "thiserror", @@ -1327,9 +1436,9 @@ dependencies = [ [[package]] name = "rtp" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e68baca5b6cb4980678713f0d06ef3a432aa642baefcbfd0f4dd2ef9eb5ab550" +checksum = "c54733451a67d76caf9caa07a7a2cec6871ea9dda92a7847f98063d459200f4b" dependencies = [ "bytes", "memchr", @@ -1366,9 +1475,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.15" +version = "0.23.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" +checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" dependencies = [ "once_cell", "ring", @@ -1380,15 +1489,18 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.10.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" +checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79" +dependencies = [ + "zeroize", +] [[package]] name = "rustls-webpki" -version = "0.102.8" +version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ "ring", "rustls-pki-types", @@ -1409,9 +1521,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "sdp" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a526161f474ae94b966ba622379d939a8fe46c930eebbadb73e339622599d5" +checksum = "4cd277015eada44a0bb810a4b84d3bf6e810573fa62fb442f457edf6a1087a69" dependencies = [ "rand", "substring", @@ -1481,10 +1593,11 @@ dependencies = [ [[package]] name = "server" -version = "0.1.0" +version = "0.1.2" dependencies = [ "anyhow", "async-trait", + "clap", "futures", "log", "postage", @@ -1520,6 +1633,12 @@ dependencies = [ "digest", ] +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -1573,12 +1692,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "spki" version = "0.7.3" @@ -1595,11 +1708,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "stun" -version = "0.7.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea256fb46a13f9204e9dee9982997b2c3097db175a9fddaa8350310d03c4d5a3" +checksum = "7dbc2bab375524093c143dc362a03fb6a1fb79e938391cdb21665688f88a088a" dependencies = [ "base64", "crc", @@ -1836,9 +1955,9 @@ dependencies = [ [[package]] name = "turn" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0044fdae001dd8a1e247ea6289abf12f4fcea1331a2364da512f9cd680bbd8cb" +checksum = "3f5aea1116456e1da71c45586b87c72e3b43164fbf435eb93ff6aa475416a9a4" dependencies = [ "async-trait", "base64", @@ -1926,6 +2045,12 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.8.0" @@ -1943,7 +2068,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "volcano-sfu" -version = "0.3.4" +version = "0.3.9" dependencies = [ "anyhow", "async-trait", @@ -2045,9 +2170,9 @@ dependencies = [ [[package]] name = "webrtc" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30367074d9f18231d28a74fab0120856b2b665da108d71a12beab7185a36f97b" +checksum = "24bab7195998d605c862772f90a452ba655b90a2f463c850ac032038890e367a" dependencies = [ "arc-swap", "async-trait", @@ -2089,9 +2214,9 @@ dependencies = [ [[package]] name = "webrtc-data" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dec93b991efcd01b73c5b3503fa8adba159d069abe5785c988ebe14fcf8f05d1" +checksum = "4e97b932854da633a767eff0cc805425a2222fc6481e96f463e57b015d949d1d" dependencies = [ "bytes", "log", @@ -2104,9 +2229,9 @@ dependencies = [ [[package]] name = "webrtc-dtls" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b7c9b89fc909f9da0499283b1112cd98f72fec28e55a54a9e352525ca65cd95c" +checksum = "5ccbe4d9049390ab52695c3646c1395c877e16c15fb05d3bda8eee0c7351711c" dependencies = [ "aes", "aes-gcm", @@ -2141,9 +2266,9 @@ dependencies = [ [[package]] name = "webrtc-ice" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0348b28b593f7709ac98d872beb58c0009523df652c78e01b950ab9c537ff17d" +checksum = "eb51bde0d790f109a15bfe4d04f1b56fb51d567da231643cb3f21bb74d678997" dependencies = [ "arc-swap", "async-trait", @@ -2166,9 +2291,9 @@ dependencies = [ [[package]] name = "webrtc-mdns" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6dfe9686c6c9c51428da4de415cb6ca2dc0591ce2b63212e23fd9cccf0e316b" +checksum = "979cc85259c53b7b620803509d10d35e2546fa505d228850cbe3f08765ea6ea8" dependencies = [ "log", "socket2", @@ -2179,9 +2304,9 @@ dependencies = [ [[package]] name = "webrtc-media" -version = "0.9.0" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e153be16b8650021ad3e9e49ab6e5fa9fb7f6d1c23c213fd8bbd1a1135a4c704" +checksum = "80041211deccda758a3e19aa93d6b10bc1d37c9183b519054b40a83691d13810" dependencies = [ "byteorder", "bytes", @@ -2192,9 +2317,9 @@ dependencies = [ [[package]] name = "webrtc-sctp" -version = "0.11.0" +version = "0.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5faf3846ec4b7e64b56338d62cbafe084aa79806b0379dff5cc74a8b7a2b3063" +checksum = "07439c134425d51d2f10907aaf2f815fdfb587dce19fe94a4ae8b5faf2aae5ae" dependencies = [ "arc-swap", "async-trait", @@ -2210,9 +2335,9 @@ dependencies = [ [[package]] name = "webrtc-srtp" -version = "0.14.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "771db9993712a8fb3886d5be4613ebf27250ef422bd4071988bf55f1ed1a64fa" +checksum = "01e773f79b09b057ffbda6b03fe7b43403b012a240cf8d05d630674c3723b5bb" dependencies = [ "aead", "aes", @@ -2233,9 +2358,9 @@ dependencies = [ [[package]] name = "webrtc-util" -version = "0.10.0" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1438a8fd0d69c5775afb4a71470af92242dbd04059c61895163aa3c1ef933375" +checksum = "64bfb10dbe6d762f80169ae07cf252bafa1f764b9594d140008a0231c0cdce58" dependencies = [ "async-trait", "bitflags", diff --git a/Cargo.toml b/Cargo.toml index 93bf034..ed69efe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] members = ["volcano-sfu", "server"] -resolver = "2" \ No newline at end of file +resolver = "2" diff --git a/Dockerfile b/Dockerfile index c3e091c..cf14092 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,31 +6,23 @@ WORKDIR /home/rust RUN mkdir volcano WORKDIR /home/rust/volcano COPY Cargo.toml Cargo.lock ./ - -# Create lib -RUN USER=root cargo new --lib volcano-sfu COPY volcano-sfu ./volcano-sfu - -# Create server -RUN USER=root cargo new --bin server COPY server ./server # Build -RUN cargo build --locked --release -RUN cargo install --path server -RUN rm */src/*.rs target/release/deps/volcano* +RUN cargo install --locked --path server --root /usr/local # Bundle -FROM debian:bookworm -COPY --from=build /usr/local/cargo/bin/server ./volcano-server -COPY config.toml ./config.toml +FROM gcr.io/distroless/cc-debian12 +COPY --from=build /usr/local/bin/server /etc/volcano/volcano-server +COPY config.example.toml /etc/volcano/config.toml # Signaling server port EXPOSE 4000/tcp -#TURN server port -EXPOSE 3478/udp +# TURN server port +#EXPOSE 3478/udp ENV RUST_LOG=debug -CMD ["./volcano-server"] \ No newline at end of file +CMD ["./etc/volcano/volcano-server"] \ No newline at end of file diff --git a/README.md b/README.md index a175d68..807af43 100644 --- a/README.md +++ b/README.md @@ -4,17 +4,28 @@ Rust implementation of a WebRTC Selective Forwarding Unit A selective forwarding unit is a video routing service which allows webrtc sessions to scale more efficiently. -## Build and run +## Development -### Build +### Setup + +Clone the repository. ```sh -cargo build +git clone https://github.com/MilcaoStudio/volcano.git +cd volcano ``` -### Run server +Copy `config.example.toml` to `config.toml`. +```sh +cp config.example.toml config.toml +``` + +### Build and run + +It's recommended to enable logging for `server` and `volcano_sfu`. +Set `RUST_LOG="debug"` to enable debug level for all crates (including `webrtc`). ```sh -RUST_LOG=debug -./target/debug/server +RUST_LOG="server, volcano_sfu" +cargo run ``` ### License diff --git a/config.toml b/config.example.toml similarity index 94% rename from config.toml rename to config.example.toml index 6cd98bc..8f45904 100644 --- a/config.toml +++ b/config.example.toml @@ -8,16 +8,16 @@ maxpackettrack = 500 # Values from [0-127] where 0 is the loudest. # Audio levels are read from rtp extension header according to: # https://tools.ietf.org/html/rfc6464 -audiolevelthreshold = 40 +audiolevelthreshold = 65 # Sets the interval in which the SFU will check the audio level # in [ms]. If the active speaker has changed, the sfu will # emit an event to clients. -audiolevelinterval=1000 +audiolevelinterval=500 # Sets minimum percentage of events required to fire an audio level # according to the expected events from the audiolevelinterval, # calculated as audiolevelinterval/packetization time (20ms for 8kHz) # Values from [0-100] -audiolevelfilter = 20 +audiolevelfilter = 50 withstats = false [router.simulcast] @@ -31,17 +31,6 @@ enabletemporallayer = false # Single port, portrange will not work if you enable this # singleport = 5000 -# Range of ports that ion accepts WebRTC traffic on -# Format: [min, max] and max - min >= 100 -portrange = [5000, 5200] -# if sfu behind nat, set iceserver -# [[webrtc.iceserver]] -# urls = ["stun:stun.stunprotocol.org:3478"] -# [[webrtc.iceserver]] -# urls = ["turn:turn.awsome.org:3478"] -# username = "awsome" -# credential = "awsome" - # sdp semantics: # "unified-plan" # "plan-b" @@ -50,6 +39,17 @@ sdpsemantics = "unified-plan" # toggle multicast dns support: https://tools.ietf.org/html/draft-mdns-ice-candidates-00 mdns = false +# Range of ports that ion accepts WebRTC traffic on +# Format: [min, max] and max - min >= 100 +portrange = [5000, 5200] +# if sfu behind nat, set iceserver +[[webrtc.iceservers]] +urls = ["stun:stun.l.google.com:19302", "stun:stun1.l.google.com:19302"] +[[webrtc.iceservers]] +# urls = ["turn:turn.awsome.org:3478"] +# username = "awsome" +# credential = "awsome" + [webrtc.candidates] # In case you're deploying ion-sfu on a server which is configured with # a 1:1 NAT (e.g., Amazon EC2), you might want to also specify the public diff --git a/server/Cargo.toml b/server/Cargo.toml index 5aadeea..ebe4545 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "server" -version = "0.1.0" +version = "0.1.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -24,8 +24,9 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.115" # WebRTC -webrtc = "0.12.0" +webrtc = "0.13.0" anyhow = "1.0.82" postage = "0.5.0" -volcano-sfu = {path = "../volcano-sfu"} \ No newline at end of file +volcano-sfu = {path = "../volcano-sfu"} +clap = { version = "4.5.40", features = ["derive"] } diff --git a/server/src/main.rs b/server/src/main.rs index c86771f..9962007 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,3 +1,8 @@ +use std::fs; + +use clap::Parser; +use volcano_sfu::rtc::config; + #[macro_use] extern crate log; #[macro_use] @@ -5,10 +10,21 @@ extern crate serde; pub mod signaling; +#[derive(clap::Parser)] +struct Cli { + #[arg(short = 'c', long = "config", default_value = "./config.toml")] + config_path: String, +} + #[tokio::main] async fn main() -> anyhow::Result<()> { - pretty_env_logger::init(); - signaling::server::launch("0.0.0.0:4000", Box::new(move |token| { + pretty_env_logger::init_timed(); + let cli = Cli::parse(); + let content = fs::read_to_string(cli.config_path).inspect_err(|e| error!("Error loading config file: {e}")).unwrap_or_default(); + let config = config::load(&content) + .inspect_err(|e| error!("Error loading config: {e}. Loading default config.")) + .unwrap_or_default(); + signaling::server::launch("0.0.0.0:4000", config, Box::new(move |token| { Box::pin(async move { use signaling::server::{UserCapabilities, UserInformation}; @@ -24,4 +40,4 @@ async fn main() -> anyhow::Result<()> { }) }) })).await -} +} \ No newline at end of file diff --git a/server/src/signaling/client.rs b/server/src/signaling/client.rs index b24ed47..c9143fb 100644 --- a/server/src/signaling/client.rs +++ b/server/src/signaling/client.rs @@ -2,7 +2,8 @@ use std::sync::Arc; use anyhow::Result; use futures::{ - future::{select, Either}, pin_mut, FutureExt, TryStreamExt + future::{select, Either}, + pin_mut, FutureExt, TryStreamExt, }; use postage::stream::Stream; use volcano_sfu::rtc::{ @@ -10,7 +11,12 @@ use volcano_sfu::rtc::{ peer::{JoinConfig, Peer}, room::{Room, RoomSignal}, }; -use webrtc::{ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::session_description::RTCSessionDescription}; +use webrtc::{ + ice_transport::{ + ice_candidate::RTCIceCandidateInit, ice_connection_state::RTCIceConnectionState, + }, + peer_connection::sdp::session_description::RTCSessionDescription, +}; use super::{ packets::{PacketC2S, PacketS2C, ServerError}, @@ -61,22 +67,37 @@ impl Client { let ws_worker = async { // Read incoming messages while let Some(msg) = read.try_next().await? { - debug!("Websocket message received."); - match PacketC2S::from(msg) { - Ok(packet) => self.handle_message(packet, &write).await?, - Err(e) => { - match e { - ServerError::UnknownRequest => write - .send(PacketS2C::Error { - error: e.to_string(), - }) - .await?, - _ => { - debug!("Websocket message is not a packet."); - error!("Error message not handled: {e}"); - } + debug!("[Incoming] Message received."); + + match PacketC2S::from(&msg) { + Ok(packet) => { + info!("[Incoming] C->S: {:?}", packet); + let result = self.handle_message(packet, &write).await; + match result { + Ok(_) => debug!("[Incoming] Done!"), + Err(e) => error!("[Incoming] Error: {e}"), } } + Err(e) => match e { + ServerError::UnknownRequest => { + write + .send(PacketS2C::Error { + error: e.to_string(), + }) + .await? + } + ServerError::UnproccesableEntity => { + debug!("[Incoming] Message is not text."); + debug!( + "msg -> {}", + msg.into_text().unwrap_or_else(|e| e.to_string()) + ) + } + _ => { + debug!("Websocket message is not a packet."); + error!("Error message not handled: {e}"); + } + }, } } @@ -85,7 +106,7 @@ impl Client { Ok(()) } .fuse(); - + let room_worker = async { debug!("Created room listener"); let mut listener = signal.listener(); @@ -98,7 +119,8 @@ impl Client { // TODO: maybe throw an error for listener being closed? info!("Closing room listener"); anyhow::Ok(()) - }.fuse(); + } + .fuse(); // Pin futures on the stack pin_mut!(ws_worker, room_worker); @@ -112,27 +134,26 @@ impl Client { /// Clean up after ourselves by disconnecting from the room, /// closing the peer connection and removing tracks. pub async fn lifecycle_clean_up(&mut self) -> Result<()> { - info!("User {} disconnected", self.user.id); + let user_id = &self.user.id; + info!("User {} disconnected", user_id); if let Some(room) = &self.room { - room.unsubscribe_signal(&self.user.id).await; - room.remove_user(&self.user.id).await; + room.unsubscribe_signal(user_id).await; + room.remove_peer(user_id).await; + room.remove_user(user_id).await; if room.is_empty() { - room.close().await; + debug!("Room {} is empty. Should clean up?", room.id); } } - self.peer.clean_up().await + Ok(()) } /// Handle incoming packet async fn handle_message(&mut self, packet: PacketC2S, write: &Sender) -> Result<()> { - info!("C->S: {:?}", packet); - let peer = self.peer.clone(); match packet { PacketC2S::Answer { description } => peer.set_remote_description(description).await, - PacketC2S::Connect { .. } => Err(ServerError::AlreadyConnected.into()), + PacketC2S::Connect { .. } => write.send(PacketS2C::ServerError { error: ServerError::AlreadyConnected }).await, PacketC2S::Continue { .. } => { - // TODO: Add Continue event Ok(()) } @@ -142,15 +163,16 @@ impl Client { offer, cfg, } => { - let room = Room::get(&room_id); + let router_config = &peer.config().router; + let room = Room::get_or_create(&room_id, router_config); self.room = Some(room.clone()); - self.handle_join(write, room.clone(), offer, cfg, id).await + self.handle_join(write, room, offer, &cfg, id).await } PacketC2S::Leave => { match &self.room { Some(room) => { // Close all peers - self.peer.clean_up().await?; + room.remove_peer(&self.user.id).await; // Remove user room.remove_user(&self.user.id).await; if room.is_empty() { @@ -163,12 +185,10 @@ impl Client { } } PacketC2S::Remove { removed_tracks: _ } => Ok(()), - PacketC2S::Offer {id, description } => { + PacketC2S::Offer { id, description } => { Self::handle_offer(peer, write.clone(), description, id).await } - PacketC2S::Trickle { candidate, target } => { - peer.trickle(candidate, target).await - } + PacketC2S::Trickle { candidate, target } => peer.trickle(candidate, target).await, } } @@ -177,15 +197,15 @@ impl Client { write: &Sender, room: Arc, initial_offer: RTCSessionDescription, - cfg: JoinConfig, + cfg: &JoinConfig, id: u32, ) -> Result<()> { - // Signaling was experimental. // room.subscribe_signal(self.signal.clone()).await; - let peer = self.peer.clone(); + let peer = &self.peer; let write_out_1 = write.clone(); let write_out_2 = write.clone(); + let write_out_3 = write.clone(); peer.on_offer(Box::new(move |offer| { let write_in = write_out_1.clone(); Box::pin(async move { @@ -211,14 +231,32 @@ impl Client { )) .await; let peer_id = peer.id().clone(); - peer.register_on_ice_connection_state_change(Box::new( move |state| { + peer.register_on_ice_connection_state_change(Box::new(move |state| { let peer_id_in = peer_id.clone(); + let write_in = write_out_3.clone(); Box::pin(async move { - debug!("[Publisher {}] ICE connection state changed to: {}", peer_id_in, state); + debug!( + "[Publisher {}] ICE connection state changed to: {}", + peer_id_in, state + ); + match state { + RTCIceConnectionState::Failed => { + if let Err(err) = write_in + .send(PacketS2C::ServerError { + error: ServerError::PeerConnectionFailed, + }) + .await + { + error!("Write failed: {err}"); + }; + } + _ => {} + } }) - })).await; + })) + .await; - if let Err(err) = peer.join(room.id.clone(), cfg).await { + if let Err(err) = peer.join(room.clone(), cfg).await { error!("join error: {}", err); return Err(err); } @@ -235,10 +273,20 @@ impl Client { } Err(err) => { // Client should know error - write.send(PacketS2C::Error { error: err.to_string() }).await?; + write + .send(PacketS2C::Error { + error: err.to_string(), + }) + .await?; error!("answer error: {}", err); } }; + + // Send room info + let room_info = room.get_room_info(); + if let Err(err) = write.send(PacketS2C::RoomInfo { room: room_info }).await { + error!("send room info error: {}", err); + }; Ok(()) } diff --git a/server/src/signaling/packets.rs b/server/src/signaling/packets.rs index 43d03ae..d9ae5ee 100644 --- a/server/src/signaling/packets.rs +++ b/server/src/signaling/packets.rs @@ -2,7 +2,10 @@ use thiserror::Error; use tokio_tungstenite::tungstenite::Message; use volcano_sfu::rtc::{peer::JoinConfig, room::RoomInfo}; -use webrtc::{ice_transport::ice_candidate::RTCIceCandidateInit, peer_connection::sdp::session_description::RTCSessionDescription}; +use webrtc::{ + ice_transport::{ice_candidate::RTCIceCandidateInit, ice_server::RTCIceServer}, + peer_connection::sdp::session_description::RTCSessionDescription, +}; /// Available types of media tracks #[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] @@ -48,9 +51,7 @@ pub enum Negotiation { #[serde(tag = "type")] pub enum PacketC2S { /// Answer (from client subscriber) - Answer { - description: RTCSessionDescription, - }, + Answer { description: RTCSessionDescription }, /// Offer (from negotiation) Offer { id: u32, @@ -101,7 +102,9 @@ pub enum PacketS2C { /// Accept authentication Accept { id: u32, - available_rooms: Vec + user_id: String, + ice_servers: Vec, + available_rooms: Vec, }, /// Answer (for client publisher) Answer { @@ -115,6 +118,9 @@ pub enum PacketS2C { /// IDs of tracks that are no longer being produced removed_tracks: Vec, }, + RoomInfo { + room: RoomInfo, + }, /// Offer (for client subscriber) Offer { description: RTCSessionDescription, @@ -152,7 +158,13 @@ pub enum PacketS2C { user_id: String, }, /// Disconnection error - Error { error: String }, + Error { + error: String, + }, + /// Custom server error + ServerError { + error: ServerError, + }, } /// An error occurred on the server @@ -166,14 +178,20 @@ pub enum ServerError { FailedToAuthenticate, #[error("Already connected to a room!")] AlreadyConnected, + #[error("Not authenticated in this session.")] + NotAuthenticated, #[error("Not connected to any room!")] NotConnected, #[error("Media type already has an existing track!")] MediaTypeSatisfied, - #[error("Request cannot be parsed.")] - NotDeserializable, + #[error("Peer connection failed!")] + PeerConnectionFailed, + #[error("Bad Request. Reason: {reason}")] + BadRequest { reason: String }, #[error("Request type is unknown.")] UnknownRequest, + #[error("Received message is not a text.")] + UnproccesableEntity, } impl std::fmt::Display for MediaType { @@ -189,18 +207,19 @@ impl std::fmt::Display for MediaType { impl PacketC2S { /// Create a packet from incoming Message - pub fn from(message: Message) -> Result { + pub fn from(message: &Message) -> Result { if let Message::Text(text) = message { - match serde_json::from_str(&text) { + match serde_json::from_str(text) { Ok(packet) => Ok(packet), Err(e) => { - error!("Error: {e}"); error!("Tried to parse packet: {text}"); - Err(ServerError::UnknownRequest) + let reason = e.to_string(); + error!("Error: {reason}"); + Err(ServerError::BadRequest { reason }) } } } else { - Err(ServerError::NotDeserializable) + Err(ServerError::UnproccesableEntity) } } } @@ -249,4 +268,4 @@ pub struct RemoteMedia { pub audio: bool, #[serde(skip_serializing_if = "Option::is_none")] pub layers: Option>, -} \ No newline at end of file +} diff --git a/server/src/signaling/server.rs b/server/src/signaling/server.rs index 4282535..073de8d 100644 --- a/server/src/signaling/server.rs +++ b/server/src/signaling/server.rs @@ -1,14 +1,11 @@ use anyhow::Result; use futures::{Future, StreamExt}; -use std::{fs, pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc}; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; -use volcano_sfu::{ - rtc::{ - config::{self, WebRTCTransportConfig}, +use volcano_sfu::rtc::{ + config::{Config, WebRTCTransportConfig}, room::Room, - }, - turn, -}; + }; use super::{ client::Client, @@ -39,22 +36,18 @@ type AuthFn = Box< >; /// Launch a new signaling server -pub async fn launch(addr: A, auth: AuthFn) -> Result<()> { +pub async fn launch(addr: A, config: Config, auth: AuthFn) -> Result<()> { // Create TCP listener let try_socket = TcpListener::bind(addr).await; let listener = try_socket.expect("Failed to bind"); info!("Server listening on {}", listener.local_addr().unwrap()); - - let content = fs::read_to_string("./config.toml")?; - let c = config::load(&content) - .inspect_err(|e| error!("Error loading config: {e}. Loading default config.")) - .unwrap_or_default(); - let config = c.clone(); - if c.turn.enabled { - turn::init_turn_server(c.turn, c.turn_auth).await?; - } + + //if c.turn.enabled { + // turn::init_turn_server(c.turn, c.turn_auth).await?; + //} let webrtc_config = Arc::new(WebRTCTransportConfig::new(&config).await?); + info!("WebRTC configuration for SFU v{} loaded!", webrtc_config.version); // Accept new connections let auth = Arc::new(auth); while let Ok((stream, _)) = listener.accept().await { @@ -86,12 +79,7 @@ async fn accept_connection(stream: TcpStream, auth: Arc, w: Arc = None; while let Some(msg) = read.next().await { - match PacketC2S::from(msg?) { + match PacketC2S::from(&msg?) { Ok(packet) => match packet { PacketC2S::Connect { id, @@ -114,6 +102,7 @@ async fn handle_connection( if let Ok(user) = (auth)(token).await { info!("Authenticated user {}", user.id); + let user_id = user.id.clone(); // Create a new client client = Some(Client::new(user, Arc::clone(&w)).await?); @@ -124,16 +113,24 @@ async fn handle_connection( write .send(PacketS2C::Accept { id, + user_id, + ice_servers: w.configuration.ice_servers.clone(), available_rooms, }) .await?; } break; } - _ => {} + _ => { + write + .send(PacketS2C::ServerError { + error: ServerError::NotAuthenticated, + }) + .await?; + } }, Err(e) => { - error!("Error: {e}"); + write.send(PacketS2C::ServerError { error: e }).await?; } } } diff --git a/volcano-sfu/Cargo.toml b/volcano-sfu/Cargo.toml index 5ae6200..c17c3f9 100644 --- a/volcano-sfu/Cargo.toml +++ b/volcano-sfu/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "volcano-sfu" -version = "0.3.4" +version = "0.3.9" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -29,6 +29,6 @@ serde = { version = "1.0.197", features = ["derive"] } serde_json = "1.0.115" # WebRTC -webrtc = "0.12.0" +webrtc = "0.13.0" toml = "0.8.19" diff --git a/volcano-sfu/src/buffer/buffer.rs b/volcano-sfu/src/buffer/buffer.rs index 2cf6ef7..7029ea4 100644 --- a/volcano-sfu/src/buffer/buffer.rs +++ b/volcano-sfu/src/buffer/buffer.rs @@ -37,7 +37,7 @@ pub type OnFeedbackCallBackFn = Box< >; pub type OnAudioLevelFn = - Box Pin + Send + 'static>>) + Send + Sync>; + Box Pin + Send + 'static>>) + Send + Sync>; pub enum BufferPacketType { RTPBufferPacket = 1, RTCPBufferPacket = 2, @@ -510,7 +510,7 @@ impl AtomicBuffer { if let Ok(data) = rv { let mut handler = buffer.on_audio_level.lock().await; if let Some(f) = &mut *handler { - f(data.level).await; + f(data.voice, data.level).await; } } } diff --git a/volcano-sfu/src/rtc/config.rs b/volcano-sfu/src/rtc/config.rs index 7e890ea..fdcf9d2 100644 --- a/volcano-sfu/src/rtc/config.rs +++ b/volcano-sfu/src/rtc/config.rs @@ -4,13 +4,19 @@ use tokio::{net::UdpSocket, sync::Mutex}; use webrtc::{api::setting_engine::SettingEngine, ice::{mdns::MulticastDnsMode, udp_mux::{UDPMuxDefault, UDPMuxParams}, udp_network::{EphemeralUDP, UDPNetwork}}, ice_transport::{ice_candidate_type::RTCIceCandidateType, ice_server::RTCIceServer}, peer_connection::{configuration::RTCConfiguration, policy::sdp_semantics::RTCSdpSemantics}, turn::auth::AuthHandler,}; use anyhow::Result; -use crate::turn::{self, TurnConfig}; +use crate::turn::TurnConfig; use crate::{buffer::factory::AtomicFactory, track::error::ConfigError}; +// 4096 port range +pub const ICE_MIN_PORT: u16 = 36864; +pub const ICE_MAX_PORT: u16 = 40959; + #[derive(Clone, Deserialize)] struct ICEServerConfig { urls: Vec, - user_name: String, + #[serde(default)] + username: String, + #[serde(default)] credential: String, } #[derive(Clone, Default, Deserialize)] @@ -22,6 +28,7 @@ struct Candidates { } #[derive(Default)] pub struct WebRTCTransportConfig { + pub version: String, pub configuration: RTCConfiguration, pub setting: SettingEngine, pub router: RouterConfig, @@ -41,6 +48,7 @@ pub struct WebRTCConfig { ice_single_port: Option, #[serde(rename = "portrange")] pub ice_port_range: Option>, + #[serde(rename = "iceservers")] ice_servers: Option>, candidates: Candidates, #[serde(rename = "sdpsemantics")] @@ -67,11 +75,9 @@ pub struct RouterConfig { #[serde(rename = "audiolevelinterval")] pub audio_level_interval: i32, #[serde(rename = "audiolevelthreshold")] - #[allow(dead_code)] - audio_level_threshold: u8, + pub audio_level_threshold: u8, #[serde(rename = "audiolevelfilter")] - #[allow(dead_code)] - audio_level_filter: i32, + pub audio_level_filter: i32, pub simulcast: SimulcastConfig, } @@ -111,8 +117,8 @@ impl WebRTCTransportConfig { let mut ice_port_end: u16 = 0; if c.turn.enabled && c.turn.port_range.is_none() { - ice_port_start = turn::ICE_MIN_PORT; - ice_port_end = turn::ICE_MAX_PORT; + ice_port_start = ICE_MIN_PORT; + ice_port_end = ICE_MAX_PORT; } else if let Some(ice_port_range) = &c.webrtc.ice_port_range { if ice_port_range.len() == 2 { ice_port_start = ice_port_range[0]; @@ -129,24 +135,21 @@ impl WebRTCTransportConfig { } - let mut ice_servers: Vec = vec![RTCIceServer { - urls: vec!["stun:stun.l.google.com:19302".to_owned(), "stun:stun1.l.google.com:19302".to_owned(), "stun:stun.12connect.com:3478".to_owned()], - ..Default::default() - }]; - - if let Some(ice_lite) = c.webrtc.candidates.ice_lite { - if ice_lite { - se.set_lite(ice_lite); - } else if let Some(ice_servers_cfg) = &c.webrtc.ice_servers { - for ice_server in ice_servers_cfg { - let s = RTCIceServer { - urls: ice_server.urls.clone(), - username: ice_server.user_name.clone(), - credential: ice_server.credential.clone(), - }; - - ice_servers.push(s); - } + let mut ice_servers: Vec = Vec::default(); + let ice_lite = c.webrtc.candidates.ice_lite.unwrap_or_default(); + se.set_lite(ice_lite); + + if !ice_lite { + if let Some(ice_servers_cfg) = &c.webrtc.ice_servers { + for ice_server in ice_servers_cfg { + let s = RTCIceServer { + urls: ice_server.urls.clone(), + username: ice_server.username.clone(), + credential: ice_server.credential.clone(), + }; + + ice_servers.push(s); + } } } @@ -188,6 +191,7 @@ impl WebRTCTransportConfig { setting: se, router: c.router.clone(), factory: Arc::new(Mutex::new(AtomicFactory::new(1000, 1000))), + version: env!("CARGO_PKG_VERSION").to_string(), }; if let Some(nat1toiips) = &c.webrtc.candidates.nat1_to_1ips { @@ -201,8 +205,7 @@ impl WebRTCTransportConfig { w.setting .set_ice_multicast_dns_mode(MulticastDnsMode::Disabled); } - - info!("WebRTCTransport configuration finished"); + Ok(w) } } \ No newline at end of file diff --git a/volcano-sfu/src/rtc/peer.rs b/volcano-sfu/src/rtc/peer.rs index e2ee8a9..908298d 100644 --- a/volcano-sfu/src/rtc/peer.rs +++ b/volcano-sfu/src/rtc/peer.rs @@ -15,7 +15,7 @@ use webrtc::{ ice_candidate::{RTCIceCandidate, RTCIceCandidateInit}, ice_connection_state::RTCIceConnectionState }, peer_connection::{ - configuration::RTCConfiguration, sdp::session_description::RTCSessionDescription, signaling_state::RTCSignalingState + configuration::RTCConfiguration, offer_answer_options::RTCOfferOptions, sdp::session_description::RTCSessionDescription, signaling_state::RTCSignalingState }, }; @@ -119,7 +119,7 @@ impl Peer { } } /// Clean up any open connections - pub async fn clean_up(&self) -> Result<()> { + pub async fn clean_up(&self) { // Takes out mutex peers let subscriber = self.subscriber.lock().await.take(); let publisher = self.publisher.lock().await.take(); @@ -130,48 +130,52 @@ impl Peer { if let Some(p) = publisher { p.close().await; } - - Ok(()) - - // TODO: find out if tracks are removed too - //self.pc.close().await.map_err(Into::into) } + pub fn config(&self) -> Arc { + self.config.clone() + } pub fn id(&self) -> String { self.id.clone() } - pub async fn join(self: &Arc, room_id: String, cfg: JoinConfig) -> Result<()> { + pub async fn join(self: &Arc, room: Arc, cfg: &JoinConfig) -> Result<()> { let id = &self.id; - info!("[{id}] Join to {room_id} requested"); + info!("[{id}] Join to {} requested", room.id); - let room = Room::get(&room_id); *self.room.lock().await = Some(room.clone()); let rtc_config_clone = RTCConfiguration { ice_servers: self.config.configuration.ice_servers.clone(), ..Default::default() }; - let config = WebRTCTransportConfig { + let peer_config = WebRTCTransportConfig { configuration: rtc_config_clone, setting: self.config.setting.clone(), router: self.config.router.clone(), factory: Arc::new(Mutex::new(AtomicFactory::new(1000, 1000))), + version: self.config.version.clone(), }; if !cfg.no_subscribe { let mut inner_subscriber = - Subscriber::new(self.user_id.clone(), self.config.clone()).await?; + Subscriber::new(self.user_id.clone(), &self.config).await?; inner_subscriber.no_auto_subscribe = cfg.no_auto_subscribe; let subscriber = Arc::new(inner_subscriber); let remote_answer_pending_out = self.remote_answer_pending.clone(); + //let remote_answer_pending_out_2 = self.remote_answer_pending.clone(); let negotiation_pending_out = self.negotiation_pending.clone(); + //let negotiation_pending_out_2 = self.negotiation_pending.clone(); let closed_out = self.closed.clone(); + //let closed_out_2 = self.closed.clone(); let sub = Arc::clone(&subscriber); + //let sub_2 = Arc::clone(&subscriber); let on_offer_handler_out = self.on_offer_fn.clone(); + //let on_offer_handler_out_2 = self.on_offer_fn.clone(); let id_clone_out = id.clone(); + //let id_clone_out_2 = id.clone(); subscriber - .register_on_negociate(Box::new(move || { + .register_on_negotiate(Box::new(move |offer_options: Option| { let remote_answer_pending_in = remote_answer_pending_out.clone(); let negotiation_pending_in = negotiation_pending_out.clone(); let closed_in = closed_out.clone(); @@ -179,12 +183,14 @@ impl Peer { let sub_in = sub.clone(); let on_offer_handler_in = on_offer_handler_out.clone(); Box::pin(async move { + debug!("Start negotiation"); if remote_answer_pending_in.load(Ordering::Relaxed) { (*negotiation_pending_in).store(true, Ordering::Relaxed); + debug!("Negotiation set to pending. Reason: Remote answer pending"); return Ok(()); } - let offer = sub_in.create_offer().await?; + let offer = sub_in.create_offer(offer_options).await?; (*remote_answer_pending_in).store(true, Ordering::Relaxed); if let Some(on_offer) = &mut *on_offer_handler_in.lock().await { @@ -198,6 +204,7 @@ impl Peer { }) })) .await; + let on_ice_candidate_out = self.on_ice_candidate_fn.clone(); let closed_out_ = self.closed.clone(); subscriber.register_on_ice_candidate(Box::new(move |candidate| { @@ -228,7 +235,7 @@ impl Peer { if let Some(sub) = &*self.subscriber.lock().await { sub.add_data_channel(&dc.config.label).await?; info!("[Subscriber {}] Trying to offer...", sub.id); - sub.create_offer().await?; + sub.create_offer(None).await?; } } } @@ -236,7 +243,7 @@ impl Peer { let closed_out_1 = self.closed.clone(); let publisher = - Arc::new(Publisher::new(self.user_id.clone(), room.clone(), config).await?); + Arc::new(Publisher::new(self.user_id.clone(), room.clone(), peer_config).await?); publisher.on_ice_candidate(Box::new(move |candidate: Option| { let on_ice_candidate_in = on_ice_candidate_out.clone(); @@ -280,8 +287,11 @@ impl Peer { room.add_peer(self.clone()).await; info!( "[Peer {}] Adds to room {}", - id, room_id + id, room.id ); + + // Send user join event with no tracks + room.join_user(id.to_owned(), Vec::new()).await; if !cfg.no_subscribe { room.subscribe(self.clone()).await; @@ -309,15 +319,26 @@ impl Peer { pub async fn set_remote_description(&self, sdp: RTCSessionDescription) -> Result<()> { if let Some(subscriber) = &*self.subscriber.lock().await { - info!("PeerLocal {} sets remote description", self.id); + info!("[Peer {}] sets remote description", self.id); subscriber.set_remote_description(sdp).await?; self.remote_answer_pending.store(false, Ordering::Relaxed); if self.negotiation_pending.load(Ordering::Relaxed) { self.negotiation_pending.store(false, Ordering::Relaxed); info!("Subscriber negotiate"); - subscriber.negotiate().await?; + subscriber.negotiate(None).await?; } + + // set renegotation method for subscriber + let sub = Arc::clone(&subscriber); + subscriber.pc.on_negotiation_needed(Box::new(move || { + let sub_in = sub.clone(); + Box::pin(async move { + info!("Start renegotiation"); + if let Err(err) = sub_in.negotiate(Some( RTCOfferOptions { voice_activity_detection: true, ice_restart: true })).await { + error!("renegotiate err: {}", err); + } + })})); } else { return Err(Error::ErrNoTransportEstablished.into()); } diff --git a/volcano-sfu/src/rtc/peer/api.rs b/volcano-sfu/src/rtc/peer/api.rs index 6bd223d..635d496 100644 --- a/volcano-sfu/src/rtc/peer/api.rs +++ b/volcano-sfu/src/rtc/peer/api.rs @@ -19,7 +19,7 @@ const MIME_TYPE_VP9: &str = "video/vp9"; const FRAME_MARKING: &str = "urn:ietf:params:rtp-hdrext:framemarking"; /// Initialise a new RTCPeerConnection -pub async fn create_subscriber_connection(cfg: Arc) -> Result> { +pub async fn create_subscriber_connection(cfg: &Arc) -> Result> { // Create a MediaEngine object to configure the supported codec let mut m = MediaEngine::default(); //m.register_default_codecs()?; diff --git a/volcano-sfu/src/rtc/peer/publisher.rs b/volcano-sfu/src/rtc/peer/publisher.rs index 82ec9ef..5f18725 100644 --- a/volcano-sfu/src/rtc/peer/publisher.rs +++ b/volcano-sfu/src/rtc/peer/publisher.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::pin::Pin; use std::future::Future; use std::sync::Arc; @@ -121,6 +122,15 @@ impl Publisher { } pub async fn close(&self) { + let observer = self.room.audio_observer.lock().await; + + // Remove publisher streams from audio observer + let tracks = &*self.tracks.lock().await; + let stream_ids: BTreeSet = tracks.iter().map(|t| t.track.stream_id()).collect(); + for stream_id in &stream_ids { + observer.remove_stream(stream_id).await; + } + self.router.stop().await; if let Err(err) = self.pc.close().await { error!("close err: {}", err); @@ -205,7 +215,7 @@ impl Publisher { if channel.label() == super::subscriber::API_CHANNEL_LABEL { info!("[Publisher {id_in}] API data channel published from client!"); return Box::pin(async move { - room_in.add_api_channel(&id_in).await; + //room_in.add_api_channel(&id_in).await; }); } Box::pin(async move { diff --git a/volcano-sfu/src/rtc/peer/subscriber.rs b/volcano-sfu/src/rtc/peer/subscriber.rs index f3ba769..92e2cc9 100644 --- a/volcano-sfu/src/rtc/peer/subscriber.rs +++ b/volcano-sfu/src/rtc/peer/subscriber.rs @@ -7,6 +7,7 @@ use tokio::time::{sleep, Duration}; use webrtc::data_channel::data_channel_init::RTCDataChannelInit; use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState; use webrtc::ice_transport::ice_gatherer::OnLocalCandidateHdlrFn; +use webrtc::peer_connection::offer_answer_options::RTCOfferOptions; use webrtc::peer_connection::sdp::session_description::RTCSessionDescription; use webrtc::rtcp::source_description::SourceDescription; use webrtc::rtp_transceiver::rtp_codec::RTPCodecType; @@ -38,14 +39,17 @@ pub struct Subscriber { channels: Arc>>>, candidates: Arc>>, on_negotiate: Arc>>, + on_renegotiate: Arc>>, pub no_auto_subscribe: bool, } pub type OnNegotiateFn = - Box Pin> + Send + 'static>>) + Send + Sync>; + Box) -> Pin> + Send + 'static>>) + Send + Sync>; +pub type OnRenegotiateFn = + Box Pin> + Send + 'static>>) + Send + Sync>; impl Subscriber { - pub async fn new(id: String, c: Arc) -> Result { + pub async fn new(id: String, c: &Arc) -> Result { let pc = api::create_subscriber_connection(c).await?; let api_channel = pc.create_data_channel(API_CHANNEL_LABEL, Some(RTCDataChannelInit::default())).await?; info!("[Subscriber {id}] Created data channel `{API_CHANNEL_LABEL}` (awaiting for offer)"); @@ -59,6 +63,7 @@ impl Subscriber { channels: Default::default(), candidates: Default::default(), on_negotiate: Default::default(), + on_renegotiate: Default::default(), no_auto_subscribe: Default::default(), }; subscriber.on_ice_connection_state_change().await; @@ -73,7 +78,7 @@ impl Subscriber { info!("[{}] Created data channel `{}` (awaiting for offer)", self.id, ndc.label()); let tracks_out = self.tracks.clone(); - let ndc_1 = ndc.clone(); + let ndc_1: Arc = ndc.clone(); let ndc_2 = ndc.clone(); ndc.on_open(Box::new(move || { Box::pin(async move { @@ -129,8 +134,8 @@ impl Subscriber { Ok(data_channel) } - pub async fn create_offer(&self) -> Result { - let offer = self.pc.create_offer(None).await?; + pub async fn create_offer(&self, options: Option) -> Result { + let offer = self.pc.create_offer(options).await?; self.pc.set_local_description(offer.clone()).await?; Ok(offer) } @@ -170,11 +175,16 @@ impl Subscriber { self.pc.on_ice_candidate(f) } - pub async fn register_on_negociate(&self, f: OnNegotiateFn) { + pub async fn register_on_negotiate(&self, f: OnNegotiateFn) { let mut handler = self.on_negotiate.lock().await; *handler = Some(f); } + pub async fn register_on_renegotiate(&self, f: OnRenegotiateFn) { + let mut handler = self.on_renegotiate.lock().await; + *handler = Some(f); + } + pub async fn data_channel(&self, label: &String) -> Option> { self.channels.lock().await.get(label).cloned() } @@ -183,11 +193,18 @@ impl Subscriber { self.tracks.lock().await.get(stream_id).cloned() } - pub async fn negotiate(&self) -> Result<()> { - info!("Calling negotitation"); + pub async fn negotiate(&self, offer_options: Option) -> Result<()> { let mut handler = self.on_negotiate.lock().await; if let Some(f) = &mut *handler { - f().await?; + f(offer_options).await?; + } + Ok(()) + } + + pub async fn renegotiate(&self, ice_restart: bool) -> Result<()> { + let mut handler = self.on_renegotiate.lock().await; + if let Some(f) = &mut *handler { + f(ice_restart).await?; } Ok(()) } @@ -268,14 +285,16 @@ impl Subscriber { pub async fn set_remote_description(&self, sdp: RTCSessionDescription) -> Result<()> { self.pc.set_remote_description(sdp).await?; - let candidates = self.candidates.lock().await; + let mut candidates = self.candidates.lock().await; + + info!("[Subscriber {}] ICE candidates ({})", self.id, candidates.len()); for candidate in &*candidates { - self.pc.add_ice_candidate(candidate.clone()).await?; + if let Err(err) = self.pc.add_ice_candidate(candidate.clone()).await { + warn!("add_ice_candidate error: {}", err); + }; } - - self.candidates.lock().await.clear(); - - info!("Answer accepted"); + + candidates.clear(); Ok(()) } } diff --git a/volcano-sfu/src/rtc/room.rs b/volcano-sfu/src/rtc/room.rs index 2939100..bdc2e0e 100644 --- a/volcano-sfu/src/rtc/room.rs +++ b/volcano-sfu/src/rtc/room.rs @@ -1,7 +1,7 @@ -use std::sync::{ +use std::{collections::BTreeMap, fmt::Debug, sync::{ atomic::{AtomicBool, Ordering}, Arc, -}; +}}; use dashmap::{DashMap, DashSet}; use postage::{ @@ -11,19 +11,21 @@ use postage::{ use tokio::sync::Mutex; use ulid::Ulid; use webrtc::{ - data::data_channel::DataChannel, - data_channel::{ + data::data_channel::DataChannel, data_channel::{ data_channel_message::DataChannelMessage, data_channel_state::RTCDataChannelState, RTCDataChannel, - }, track::track_local::{TrackLocal, track_local_static_rtp::TrackLocalStaticRTP}, + }, peer_connection::offer_answer_options::RTCOfferOptions, track::track_local::{track_local_static_rtp::TrackLocalStaticRTP, TrackLocal} }; -use crate::track::{audio_observer::AudioObserver, router::LocalRouter}; +use serde::Serialize; + +use crate::{rtc::config::RouterConfig, track::{audio_observer::AudioObserver, router::LocalRouter}}; use super::peer::Peer; /// Room event which indicates something happened to a peer -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", content = "data")] pub enum RoomEvent { Create(String), Close(String), @@ -32,10 +34,15 @@ pub enum RoomEvent { room_id: String, }, DataChannelMessage(Vec), + RoomInfo(RoomInfo), RemoveTrack { removed_tracks: Vec, room: String, }, + VoiceActivity { + room_id: String, + stream_ids: Vec, + }, UserJoin { room_id: String, user_id: String, @@ -47,6 +54,12 @@ pub enum RoomEvent { }, } +#[derive(Clone, Debug, Serialize)] +pub struct RoomInfo { + pub id: String, + pub users: BTreeMap>, +} + /// Room consisting of clients which can communicate with one another #[derive(Debug)] pub struct Room { @@ -59,17 +72,12 @@ pub struct Room { /// Signalers for this room signalers: Arc>>>, sender: Sender, - participants: DashSet, + //participants: DashSet, pub audio_observer: Arc>, user_tracks: DashMap>, peers: DashMap>, tracks: DashMap>, } -#[derive(Debug, Serialize)] -pub struct RoomInfo { - id: String, - participants: Vec, -} lazy_static! { static ref ROOMS: DashMap> = DashMap::new(); @@ -77,8 +85,12 @@ lazy_static! { impl Room { /// Create a new Room and initialise internal channels and maps - fn new(id: String) -> Arc { + fn new(id: String, router_config: &RouterConfig) -> Arc { let (sender, _dropped) = channel(10); + let audio_threshold = router_config.audio_level_threshold; + let audio_interval = router_config.audio_level_interval; + let audio_filter = router_config.audio_level_filter; + let audio_observer = AudioObserver::new(audio_threshold, audio_interval, audio_filter); Arc::new(Room { closed: Default::default(), @@ -88,8 +100,7 @@ impl Room { signalers: Default::default(), labels: Default::default(), peers: Default::default(), - participants: Default::default(), - audio_observer: Arc::new(Mutex::new(AudioObserver::new(100, 80, 50))), + audio_observer: Arc::new(Mutex::new(audio_observer)), user_tracks: Default::default(), tracks: Default::default(), }) @@ -168,7 +179,10 @@ impl Room { } info!("Data channel negotiation"); - if let Err(err) = subscriber.negotiate().await { + if let Err(err) = subscriber.negotiate(Some(RTCOfferOptions { + ice_restart: true, + voice_activity_detection: true, + })).await { error!("negotiate error:{}", err); } else { info!("Data channel negotiation successful"); @@ -179,28 +193,32 @@ impl Room { pub(crate) async fn add_api_channel(self: &Arc, id: &str) { if let Some(peer) = self.get_peer(id).await { let room_out = self.clone(); - let user_id_out = id.to_owned(); + //let user_id_out = id.to_owned(); if peer.subscriber().await.is_none() { error!("add_api_channel No subscriber available"); return; } let subscriber = peer.subscriber().await.unwrap(); - let user_tracks = self.user_tracks.get(id).map(|tracks| tracks.clone()); + //let user_tracks = self.user_tracks.get(id).map(|tracks| tracks.clone()); subscriber.api_channel().on_open(Box::new( move || { let room_in = room_out.clone(); - let user_id_in = user_id_out.clone(); - let tracks_in = user_tracks.unwrap_or_default(); + //let user_id_in = user_id_out.clone(); + //let tracks_in = user_tracks.unwrap_or_default(); Box::pin(async move { - room_in.join_user(user_id_in, tracks_in).await + //room_in.join_user(user_id_in, tracks_in).await; + info!("[Room {}] API channel opened", room_in.id); + warn!("DataChannelOpen event not implemented"); }) })); let room_id = self.id.clone(); info!("[Room {room_id}] Data channel negotiation"); - if let Err(err) = subscriber.negotiate().await { + if let Err(err) = subscriber.negotiate(None).await { error!("[Room {room_id}] negotiate error: {}", err); } else { - info!("[Room {room_id}] Data channel negotiation successful"); + info!("[Room {room_id}] Negotiation successful"); } + } else { + error!("[Room {}] Unknown peer {id}", self.id); } } @@ -220,13 +238,16 @@ impl Room { } /// Get or create a Room by its ID - pub fn get(id: &str) -> Arc { - if let Some(room) = ROOMS.get(id) { - room.clone() + pub fn get(id: &str) -> Option> { + ROOMS.get(id).map(|room| room.clone()) + } + + pub fn get_or_create(id: &str, router_config: &RouterConfig) -> Arc { + if let Some(room) = Self::get(id) { + room } else { - let room: Arc = Room::new(id.to_owned()); + let room: Arc = Room::new(id.to_owned(), router_config); ROOMS.insert(id.to_string(), room.clone()); - room } } @@ -235,10 +256,7 @@ impl Room { let mut available_rooms = Vec::new(); for id in ids { if let Some(room) = ROOMS.get(id) { - available_rooms.push(RoomInfo { - id: id.clone(), - participants: room.participants.clone().into_iter().collect(), - }) + available_rooms.push(room.get_room_info()) } } available_rooms @@ -322,7 +340,9 @@ impl Room { } pub async fn remove_peer(&self, peer_id: &str) -> usize { - self.peers.remove(peer_id); + if let Some((_, peer)) = self.peers.remove(peer_id) { + peer.clean_up().await; + }; let peer_count = self.peers.len(); peer_count @@ -348,12 +368,20 @@ impl Room { user_id: id.clone(), user_tracks: tracks.clone(), }; - self.send_room_event(ev).await; + self.send_message(ev).await; - // Add tracks to map + // Insert tracks self.user_tracks.insert(id, tracks); } + pub fn get_room_info(&self,) -> RoomInfo { + let user_tracks = self.user_tracks.clone(); + let mut users = BTreeMap::new(); + // Serialize user tracks + user_tracks.into_iter().for_each(|(key, value)| {users.insert(key, value);}); + return RoomInfo { id: self.id.clone(), users }; + } + pub async fn subscribe(self: &Arc, peer: Arc) { info!("Subscribing a new peer"); @@ -377,6 +405,10 @@ impl Room { } } + if let Some(publisher) = peer.publisher().await { + publisher.router().start_audio_observer_task().await; + } + for cur_peer in self.peers.iter() { let cur_id = cur_peer.id(); let peer_id = peer.id(); @@ -401,10 +433,13 @@ impl Room { } info!("Subscribe Negotiate"); - if let Err(err) = peer.subscriber().await.unwrap().negotiate().await { + if let Err(err) = peer.subscriber().await.unwrap().negotiate(None).await { error!("negotiate error: {}", err); } } + + // Offer API data channel to client subscriber + self.add_api_channel(&peer.id()).await; } /// Remove a user from the room pub async fn remove_user(&self, id: &str) { @@ -419,7 +454,8 @@ impl Room { self.close_track(id); } - self.publish(RoomEvent::RemoveTrack { + //self.publish(RoomEvent::RemoveTrack { + self.send_message(RoomEvent::RemoveTrack { room: self.id.clone(), removed_tracks, }) @@ -427,7 +463,7 @@ impl Room { } // Let everyone know we left - self.send_room_event(RoomEvent::UserLeft { + self.send_message(RoomEvent::UserLeft { room_id: self.id.clone(), user_id: id.to_owned(), }) @@ -455,7 +491,7 @@ impl Room { pub async fn remove_track(&self, id: String) { self.close_track(&id); - self.send_room_event(RoomEvent::RemoveTrack { removed_tracks: vec![id], room: self.id.clone() }).await; + self.send_message(RoomEvent::RemoveTrack { removed_tracks: vec![id], room: self.id.clone() }).await; } pub async fn publish_track( @@ -491,8 +527,9 @@ impl Room { // TODO: stop the RTP sender thread and drop } - async fn send_room_event(&self, event: RoomEvent) { - if let Ok(payload) = serde_json::to_string(&event) { + /// Send a serializable message to all peers' subscribers + pub async fn send_message(&self, msg: Message) where Message: Serialize + Debug { + if let Ok(payload) = serde_json::to_string(&msg) { info!("[Room {}] Sending room event: {}", self.id, payload); for peer in self.peers.iter() { if let Some(subscriber) = peer.subscriber().await { @@ -502,7 +539,7 @@ impl Room { } } } else { - error!("Error parsing {:?}", event); + error!("Error parsing {:?}", msg); }; } } diff --git a/volcano-sfu/src/track/audio_observer.rs b/volcano-sfu/src/track/audio_observer.rs index 50a6ba5..b3dddc6 100644 --- a/volcano-sfu/src/track/audio_observer.rs +++ b/volcano-sfu/src/track/audio_observer.rs @@ -10,8 +10,9 @@ pub struct AudioStream { #[derive(Default, Clone, Debug)] pub struct AudioObserver { streams: Arc>>, - expected: i32, - threshold: u8, + pub expected: i32, + pub threshold: u8, + pub interval: i32, previous: Vec, } @@ -19,7 +20,7 @@ impl AudioObserver { /// Creates an audio observer with threshold lower than 128, an interval, and a filter from 0 to 100. /// ## Example /// ``` - /// let observer = AudioObserver::new(100, 80, 50); + /// let observer = AudioObserver::new(70, 80, 50); /// ``` pub fn new(threshold_parameter: u8, interval_parameter: i32, filter_parameter: i32) -> Self { let mut threshold: u8 = threshold_parameter; @@ -35,6 +36,7 @@ impl AudioObserver { } Self { threshold, + interval: interval_parameter, expected: interval_parameter * filter / 2000, ..Default::default() } @@ -47,12 +49,13 @@ impl AudioObserver { }) } - pub async fn remove_stream(&mut self, stream_id: &str) { + pub async fn remove_stream(&self, stream_id: &str) { + debug!("Remove stream {}", stream_id); let mut streams = self.streams.lock().await; streams.retain(|stream| !stream.id.eq(stream_id)); } - /// Observes whether d_bov is higher than threshold for target stream. + /// Observes whether d_bov is higher than threshold for target stream, then it should be ignored. /// If d_bov is lower or equal than treshold, it sums d_bov into target stream. pub async fn observe(&self, stream_id: &str, d_bov: u8) { let mut streams = self.streams.lock().await; @@ -61,6 +64,7 @@ impl AudioObserver { .find(|stream| stream.id.eq(stream_id)); if let Some(stream) = target { + // Active voice level should be lower than threshold if d_bov <= self.threshold { stream.sum += d_bov as i32; stream.total += 1; @@ -85,7 +89,10 @@ impl AudioObserver { for stream in streams.iter_mut() { if stream.total >= self.expected { + debug!("[stream {}] {}/{} (acceptable)", stream.id, stream.total, self.expected); stream_ids.push(stream.id.clone()); + } else { + debug!("[stream {}] {}/{} (not acceptable)", stream.id, stream.total, self.expected); } stream.total = 0; @@ -94,16 +101,23 @@ impl AudioObserver { if self.previous.len() == stream_ids.len() { for idx in 0..self.previous.len() { + // If any stream id is different, reset the previous vector. if self.previous[idx] != stream_ids[idx] { self.previous = stream_ids.clone(); return Some(stream_ids); } } + // If all stream ids are the same, do not reset the previous vector. return None; } self.previous = stream_ids.clone(); Some(stream_ids) } + + /// Returns true if there are no streams. + pub async fn is_empty(&self) -> bool { + self.streams.lock().await.is_empty() + } } diff --git a/volcano-sfu/src/track/downtrack.rs b/volcano-sfu/src/track/downtrack.rs index 1be124b..59b9293 100644 --- a/volcano-sfu/src/track/downtrack.rs +++ b/volcano-sfu/src/track/downtrack.rs @@ -73,7 +73,7 @@ pub struct DownTrackInternal { pub last_ssrc: AtomicU32, pub codec: RTCRtpCodecCapability, pub receiver: Arc, - pub write_stream: Mutex>>, //TrackLocalWriter, + pub write_stream: Mutex>>, //Option, on_bind_handler: Arc>>, #[allow(dead_code)] @@ -223,7 +223,7 @@ impl DownTrackInternal { } if !fwd_pkts.is_empty() { - if let Err(err) = receiver.send_rtcp(fwd_pkts) { + if let Err(err) = receiver.send_rtcp(fwd_pkts).await { log::error!("send_rtcp err:{}", err); } } @@ -253,7 +253,7 @@ impl TrackLocal for DownTrackInternal { let mut payload_type = self.payload_type.lock().await; *payload_type = codec.payload_type; let mut write_stream = self.write_stream.lock().await; - *write_stream = t.write_stream(); + *write_stream = Some(t.write_stream()); let mut mime = self.mime.lock().await; *mime = codec.capability.mime_type.to_lowercase(); self.re_sync.store(true, Ordering::Relaxed); @@ -658,7 +658,7 @@ impl DownTrack { receiver.send_rtcp(vec![Box::new(PictureLossIndication { sender_ssrc: ssrc, media_ssrc, - })])?; + })]).await?; return Ok(()); } @@ -742,7 +742,7 @@ impl DownTrack { receiver.send_rtcp(vec![Box::new(PictureLossIndication { sender_ssrc: ssrc, media_ssrc: ext_packet.packet.header.ssrc, - })])?; + })]).await?; return Ok(()); } diff --git a/volcano-sfu/src/track/receiver.rs b/volcano-sfu/src/track/receiver.rs index 1fa965e..9235042 100644 --- a/volcano-sfu/src/track/receiver.rs +++ b/volcano-sfu/src/track/receiver.rs @@ -7,7 +7,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{self, Sender}; use tokio::sync::Mutex; use webrtc::rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication; use webrtc::rtp::packet::Packet as RTCPacket; @@ -26,8 +26,8 @@ use super::error::{Error, Result}; use super::sequencer::PacketMeta; use super::{modify_vp8_temporal_payload, simulcast}; -pub type RtcpDataReceiver = UnboundedReceiver>>; -pub type RtcpDataSender = UnboundedSender>>; +pub type RtcpDataReceiver = mpsc::Receiver>>; +pub type RtcpDataSender = mpsc::Sender>>; pub type OnCloseHandlerFn = Box Pin + Send + 'static>>) + Send + Sync>; @@ -55,10 +55,10 @@ pub trait Receiver: Send + Sync { -> Result<()>; async fn delete_down_track(&self, layer: usize, id: String); async fn register_on_close(&self, f: OnCloseHandlerFn); - fn send_rtcp(&self, p: Vec>) -> Result<()>; + async fn send_rtcp(&self, p: Vec>) -> Result<()>; fn set_rtcp_channel( &mut self, - sender: Arc>>>, + sender: Arc>>>, ); async fn get_sender_report_time(&self, layer: usize) -> (u32, u64); fn as_any(&self) -> &(dyn Any + Send + Sync); @@ -93,7 +93,7 @@ pub struct WebRTCReceiver { impl WebRTCReceiver { pub async fn new(receiver: Arc, track: Arc, pid: String) -> Self { - let (s, _) = tokio::sync::mpsc::unbounded_channel(); + let (s, _) = tokio::sync::mpsc::channel(1024); Self { peer_id: pid, receiver, @@ -322,7 +322,7 @@ impl Receiver for WebRTCReceiver { *handler = Some(f); } - fn send_rtcp(&self, p: Vec>) -> Result<()> { + async fn send_rtcp(&self, p: Vec>) -> Result<()> { // Checks if first packet is PLI if let Some(packet) = p.get(0) { if packet.as_any().downcast_ref::().is_some() { @@ -342,7 +342,7 @@ impl Receiver for WebRTCReceiver { } } - if self.rtcp_sender.send(p).is_err() { + if self.rtcp_sender.send(p).await.is_err() { return Err(Error::ErrChannelSend); } @@ -351,7 +351,7 @@ impl Receiver for WebRTCReceiver { fn set_rtcp_channel( &mut self, - sender: Arc>>>, + sender: Arc>>>, ) { self.rtcp_sender = sender; } @@ -458,7 +458,7 @@ impl Receiver for WebRTCReceiver { self.send_rtcp(vec![Box::new(PictureLossIndication { sender_ssrc, media_ssrc, - })])?; + })]).await?; } } diff --git a/volcano-sfu/src/track/router.rs b/volcano-sfu/src/track/router.rs index af193ee..d67c746 100644 --- a/volcano-sfu/src/track/router.rs +++ b/volcano-sfu/src/track/router.rs @@ -1,25 +1,28 @@ -use std::{pin::Pin, sync::Arc}; use std::future::Future; +use std::{pin::Pin, sync::Arc}; use dashmap::DashMap; -use tokio::sync::mpsc; -use tokio::sync::{mpsc::{UnboundedReceiver, UnboundedSender}, Mutex}; +use tokio::sync::broadcast::Sender; +use tokio::sync::{broadcast, mpsc}; +use tokio::sync::Mutex; +use tokio::time::{sleep, Duration}; +use webrtc::error::Error as RTCError; use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState; use webrtc::rtcp::packet::Packet as RtcpPacket; -use webrtc::error::Error as RTCError; use webrtc::rtp_transceiver::rtp_codec::{RTCRtpCodecCapability, RTPCodecType}; use webrtc::rtp_transceiver::rtp_receiver::RTCRtpReceiver; use webrtc::rtp_transceiver::rtp_transceiver_direction::RTCRtpTransceiverDirection; use webrtc::rtp_transceiver::{RTCPFeedback, RTCRtpTransceiverInit}; use webrtc::track::track_remote::TrackRemote; -use crate::rtc::peer::subscriber::Subscriber; -use crate::rtc::room::Room; -use crate::{buffer::factory::AtomicFactory, buffer::buffer::BufferIO, rtc::config::RouterConfig}; -use crate::buffer::buffer::Options as BufferOptions; use super::downtrack::{DownTrack, DownTrackInternal}; use super::error::Result; use super::receiver::{Receiver, RtcpDataReceiver, RtcpDataSender, WebRTCReceiver}; +use crate::buffer::buffer::Options as BufferOptions; +use crate::rtc::peer::subscriber::Subscriber; +use crate::rtc::room::{Room, RoomEvent}; +use crate::track::audio_observer::AudioObserver; +use crate::{buffer::buffer::BufferIO, buffer::factory::AtomicFactory, rtc::config::RouterConfig}; pub type RtcpWriterFn = Box< dyn (FnMut( @@ -44,11 +47,11 @@ pub type OnDelReciverTrackFn = Box< >; pub struct LocalRouter { id: String, + audio_observer: Arc>, //twcc: Arc>>, rtcp_sender_channel: Arc, rtcp_receiver_channel: Arc>, - stop_sender_channel: Arc>>, - stop_receiver_channel: Arc>>, + stop_sender_channel: Arc>, config: RouterConfig, receivers: Arc>>>, buffer_factory: AtomicFactory, @@ -60,16 +63,20 @@ pub struct LocalRouter { impl LocalRouter { pub fn new(id: String, room: Arc, config: RouterConfig) -> Self { - let (s, r) = mpsc::unbounded_channel(); - let (sender, receiver) = mpsc::unbounded_channel(); + let (s, r) = mpsc::channel(1024); + let (sender, _) = broadcast::channel(1); + let audio_threshold = config.audio_level_threshold; + let audio_interval = config.audio_level_interval; + let audio_filter = config.audio_level_filter; + let audio_observer = AudioObserver::new(audio_threshold, audio_interval, audio_filter); Self { id, //twcc: Arc::new(Mutex::new(None)), //stats: Arc::new(Mutex::new(HashMap::new())), + audio_observer: Arc::new(Mutex::new(audio_observer)), rtcp_sender_channel: Arc::new(s), rtcp_receiver_channel: Arc::new(Mutex::new(r)), - stop_sender_channel: Arc::new(Mutex::new(sender)), - stop_receiver_channel: Arc::new(Mutex::new(receiver)), + stop_sender_channel: Arc::new(sender), config, receivers: Arc::new(Mutex::new(DashMap::new())), room, @@ -80,7 +87,7 @@ impl LocalRouter { } } - /* + /* fn get_receivers(&self) -> Arc>>> { self.receivers.clone() }*/ @@ -98,7 +105,11 @@ impl LocalRouter { self.id.clone() } - pub async fn add_down_track(&self, subscriber: Arc, receiver: Arc) -> Result>> { + pub async fn add_down_track( + &self, + subscriber: Arc, + receiver: Arc, + ) -> Result>> { let downtracks = subscriber.get_tracks(&receiver.stream_id()).await; // Checks for available tracks if let Some(downtracks_data) = downtracks { @@ -112,7 +123,11 @@ impl LocalRouter { } let codec = receiver.codec(); - subscriber.m.lock().await.register_codec(codec.clone(), receiver.kind())?; + subscriber + .m + .lock() + .await + .register_codec(codec.clone(), receiver.kind())?; let codec_capability = RTCRtpCodecCapability { mime_type: codec.capability.mime_type, clock_rate: codec.capability.clock_rate, @@ -135,14 +150,22 @@ impl LocalRouter { }; // New local down track - let down_track_local = Arc::new(DownTrackInternal::new(codec_capability, receiver.clone(), self.config.max_packet_track).await); - let transceiver = - subscriber.pc.add_transceiver_from_track( + let down_track_local = Arc::new( + DownTrackInternal::new( + codec_capability, + receiver.clone(), + self.config.max_packet_track, + ) + .await, + ); + let transceiver = subscriber + .pc + .add_transceiver_from_track( down_track_local.clone(), Some(RTCRtpTransceiverInit { direction: RTCRtpTransceiverDirection::Sendonly, send_encodings: Vec::new(), - }) + }), ) .await?; // New local track @@ -154,67 +177,79 @@ impl LocalRouter { let s_out = subscriber.clone(); let r_out = receiver.clone(); let down_track_out = down_track_arc.clone(); - down_track_arc.register_on_close(Box::new(move || { - let s_in = s_out.clone(); - let r_in = r_out.clone(); - let transceiver_in = transceiver.clone(); - let down_track_in = down_track_out.clone(); - Box::pin(async move { - if s_in.pc.connection_state() != RTCPeerConnectionState::Closed { - // Remove track from subscriber peer connection - let rv = s_in - .pc - .remove_track(&transceiver_in.sender().await) - .await; - match rv { - Ok(_) => { - info!("Remove DownTrack for {}", &r_in.stream_id()); - s_in.remove_down_track(&r_in.stream_id(), &down_track_in) - .await; - info!("RemoveDownTrack Negotiate"); - if let Err(err) = s_in.negotiate().await { - error!("negotiate err:{} ", err); + down_track_arc + .register_on_close(Box::new(move || { + let s_in = s_out.clone(); + let r_in = r_out.clone(); + let transceiver_in = transceiver.clone(); + let down_track_in = down_track_out.clone(); + Box::pin(async move { + if s_in.pc.connection_state() != RTCPeerConnectionState::Closed { + // Remove track from subscriber peer connection + let rv = s_in.pc.remove_track(&transceiver_in.sender().await).await; + match rv { + Ok(_) => { + info!("Remove DownTrack for {}", &r_in.stream_id()); + s_in.remove_down_track(&r_in.stream_id(), &down_track_in) + .await; + info!("RemoveDownTrack Negotiate"); + if let Err(err) = s_in.negotiate(None).await { + error!("negotiate err:{} ", err); + } } - } - Err(err) => { - if err == RTCError::ErrConnectionClosed { - // return; + Err(err) => { + if err == RTCError::ErrConnectionClosed { + // return; + } } } } - } - })} - )).await; + }) + })) + .await; let s_out_1 = subscriber.clone(); let r_out_1 = receiver.clone(); - down_track_arc.register_on_bind(Box::new(move || { - let s_in = s_out_1.clone(); - let r_in = r_out_1.clone(); + down_track_arc + .register_on_bind(Box::new(move || { + let s_in = s_out_1.clone(); + let r_in = r_out_1.clone(); - Box::pin(async move { - tokio::spawn(async move { - s_in.send_stream_down_track_reports(&r_in.stream_id()).await; - }); - }) - })).await; + Box::pin(async move { + tokio::spawn(async move { + s_in.send_stream_down_track_reports(&r_in.stream_id()).await; + }); + }) + })) + .await; - subscriber.add_down_track(receiver.stream_id(), down_track_arc.clone()).await; - receiver.add_down_track(down_track_arc, self.config.simulcast.best_quality_first).await; + subscriber + .add_down_track(receiver.stream_id(), down_track_arc.clone()) + .await; + receiver + .add_down_track(down_track_arc, self.config.simulcast.best_quality_first) + .await; Ok(None) } - pub async fn add_down_tracks(&self, subscriber: Arc, r: Option>,) -> Result<()> { + pub async fn add_down_tracks( + &self, + subscriber: Arc, + r: Option>, + ) -> Result<()> { if subscriber.no_auto_subscribe { info!("Skipping no auto subscribe"); return Ok(()); } if let Some(receiver) = r { - info!("Add actual downtrack to subscriber, subscriber: {}", subscriber.id); + info!( + "Add actual downtrack to subscriber, subscriber: {}", + subscriber.id + ); self.add_down_track(subscriber.clone(), receiver).await?; - subscriber.negotiate().await?; + subscriber.negotiate(None).await?; return Ok(()); } @@ -231,43 +266,78 @@ impl LocalRouter { for val in recs { self.add_down_track(subscriber.clone(), val.clone()).await?; } - subscriber.negotiate().await?; + subscriber.negotiate(None).await?; } Ok(()) } - pub async fn add_receiver(&self, receiver: Arc, track: Arc, track_id: String, stream_id: String) -> (Arc, bool) { - info!("add_receiver -> track {}, stream: {}", track.id(), stream_id); + pub async fn add_receiver( + self: &Arc, + receiver: Arc, + track: Arc, + track_id: String, + stream_id: String, + ) -> (Arc, bool) { + info!( + "add_receiver -> track {}, stream: {}", + track.id(), + stream_id + ); let mut published = false; let buffer = self.buffer_factory.get_or_new_buffer(track.ssrc()).await; let sender = self.rtcp_sender_channel.clone(); - buffer.register_on_feedback(Box::new(move |packets: Vec>| { - let sender_in = Arc::clone(&sender); - Box::pin(async move { - if let Err(err) = sender_in.send(packets) { - error!("send err: {}", err); - } - }) - })).await; + buffer + .register_on_feedback(Box::new( + move |packets: Vec>| { + let sender_in = Arc::clone(&sender); + Box::pin(async move { + if let Err(err) = sender_in.send(packets).await { + error!("send err: {}", err); + } + }) + }, + )) + .await; match track.kind() { RTPCodecType::Audio => { - let room_out = self.room.clone(); + let router_out = self.clone(); let stream_id_out = stream_id.clone(); - buffer.register_on_audio_level(Box::new(move |level| { - let room_in = room_out.clone(); - let stream_id_in = stream_id_out.clone(); - Box::pin(async move { - room_in.audio_observer.lock().await.observe(&stream_id_in, level).await; - }) - })).await; - self.room.audio_observer.lock().await.add_stream(stream_id).await; - }, + buffer + .register_on_audio_level(Box::new(move |voice, level| { + let router_in = router_out.clone(); + let stream_id_in = stream_id_out.clone(); + Box::pin(async move { + if !voice { + debug!("Skip observation"); + return; + } + router_in + .audio_observer + .lock() + .await + .observe(&stream_id_in, level) + .await; + }) + })) + .await; + debug!( + "[Room {}] add stream {} to audio observer", + self.room.id, stream_id + ); + let router_2 = self.clone(); + router_2 + .audio_observer + .lock() + .await + .add_stream(stream_id) + .await; + } RTPCodecType::Video => { debug!("Video tracking not implemented"); - }, - _ => {}, + } + _ => {} } // TODO: implement twcc @@ -280,13 +350,16 @@ impl LocalRouter { //let stats_out = Arc::clone(&self.stats); let buffer_out = Arc::clone(&buffer); let with_status = self.config.with_stats; - rtcp_reader.lock().await.register_on_packet(Box::new(move |packet: Vec| { - let buffer_in = Arc::clone(&buffer_out); - Box::pin(async move { - let mut buf = &packet[..]; - let pkts_result = webrtc::rtcp::packet::unmarshal(&mut buf)?; - for pkt in pkts_result { - if let Some(description) = + rtcp_reader + .lock() + .await + .register_on_packet(Box::new(move |packet: Vec| { + let buffer_in = Arc::clone(&buffer_out); + Box::pin(async move { + let mut buf = &packet[..]; + let pkts_result = webrtc::rtcp::packet::unmarshal(&mut buf)?; + for pkt in pkts_result { + if let Some(description) = pkt.as_any() .downcast_ref::() { @@ -308,16 +381,18 @@ impl LocalRouter { // TODO: update stats } } - } - Ok(()) - }) - })).await; + } + Ok(()) + }) + })) + .await; let receivers = self.receivers.lock().await; let result_receiver; match receivers.get(&track.id()) { - Some(r)=>result_receiver = r.clone(), - None=>{ - let mut rv = WebRTCReceiver::new(receiver.clone(), track.clone(), self.id.clone()).await; + Some(r) => result_receiver = r.clone(), + None => { + let mut rv = + WebRTCReceiver::new(receiver.clone(), track.clone(), self.id.clone()).await; rv.set_rtcp_channel(self.rtcp_sender_channel.clone()); let recv_kind = rv.kind(); let room_out = self.room.clone(); @@ -327,13 +402,16 @@ impl LocalRouter { let stream_id_in = stream_id.clone(); Box::pin(async move { if recv_kind == RTPCodecType::Audio { - room_in.audio_observer - .lock().await - .remove_stream(&stream_id_in).await; - } + room_in + .audio_observer + .lock() + .await + .remove_stream(&stream_id_in) + .await; } - ) - })).await; + }) + })) + .await; result_receiver = Arc::new(rv); receivers.insert(track_id, result_receiver.clone()); published = true; @@ -367,7 +445,7 @@ impl LocalRouter { let buffer_clone = buffer.clone(); tokio::spawn(async move { let mut b = vec![0u8; 1500]; - + while let Ok((pkt, _)) = track.read(&mut b).await { if let Err(err) = buffer_clone.write(pkt).await { error!("write error: {}", err); @@ -379,14 +457,54 @@ impl LocalRouter { (result_receiver, published) } + pub async fn start_audio_observer_task(&self) { + let mut stop_receiver = self.stop_sender_channel.subscribe(); + let id_out = self.id.clone(); + let observer = self.audio_observer.clone(); + let interval_ms = observer.lock().await.interval as u64; + let interval = Duration::from_millis(interval_ms); + let room_out = self.room.clone(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = sleep(interval) => { + let is_empty = { + observer.lock().await.is_empty().await + }; + + if is_empty { + info!("Continue"); + continue; + } + + let streams = { + observer.lock().await.calc().await + }; + + if let Some(streams) = streams { + info!("Streams {:?}", streams); + room_out.send_message(RoomEvent::VoiceActivity { room_id: room_out.id.clone(), stream_ids: streams }).await; + } + } + _ = stop_receiver.recv() => { + info!("[Router {}] Stopping audio observer task", id_out); + break; + } + } + } + }); + info!("Audio observer task started"); + } + pub async fn set_rtcp_writer(&self, writer: RtcpWriterFn) { let mut handler = self.rtcp_writer_handler.lock().await; *handler = Some(writer); } pub async fn send_rtcp(&self) { + let mut stop = self.stop_sender_channel.subscribe(); + let mut rtcp_receiver = self.rtcp_receiver_channel.lock().await; loop { - let mut rtcp_receiver = self.rtcp_receiver_channel.lock().await; - let mut stop_receiver = self.stop_receiver_channel.lock().await; + //let mut rtcp_receiver = self.rtcp_receiver_channel.lock().await; tokio::select! { data = rtcp_receiver.recv() => { if let Some(val) = data{ @@ -395,7 +513,7 @@ impl LocalRouter { } } } - _data = stop_receiver.recv() => { + _data = stop.recv() => { info!("Stop receiver signal. Exiting loop"); return ; } @@ -403,8 +521,8 @@ impl LocalRouter { } } pub async fn stop(&self) { - if let Err(err) = self.stop_sender_channel.lock().await.send(()) { + if let Err(err) = self.stop_sender_channel.send(()) { error!("stop err: {}", err); } } -} \ No newline at end of file +} diff --git a/volcano-sfu/src/turn.rs b/volcano-sfu/src/turn.rs index b988be7..d3fc94d 100644 --- a/volcano-sfu/src/turn.rs +++ b/volcano-sfu/src/turn.rs @@ -8,10 +8,6 @@ use webrtc::util::vnet::net::*; pub const TURN_MIN_PORT: u16 = 32768; pub const TURN_MAX_PORT: u16 = 36863; -// 4096 port range -pub const ICE_MIN_PORT: u16 = 36864; -pub const ICE_MAX_PORT: u16 = 40959; - #[derive(Clone, Default, Deserialize)] pub(super) struct TurnAuth { #[serde(rename = "credentials")] @@ -56,9 +52,10 @@ impl AuthHandler for CustomAuthHandler { ) -> Result, Error> { debug!("realm val: {}", realm); if let Some(val) = self.users_map.get(&username.to_string()) { + debug!("username accepted: {}", username); return Ok(val.clone()); } - + error!("Invalid username"); Err(Error::ErrNilConn) } } @@ -68,7 +65,7 @@ pub async fn init_turn_server( auth: Option>, ) -> anyhow::Result { let conn = Arc::new(UdpSocket::bind(conf.address.clone()).await?); - info!("UDP listening {}...", conn.local_addr()?); + info!("TURN server listening {}...", conn.local_addr()?); let mut new_auth: Option> = auth; @@ -114,7 +111,7 @@ pub async fn init_turn_server( relay_addr_generator: Box::new(RelayAddressGeneratorRanges { min_port, max_port, - max_retries: 1, + max_retries: 20, relay_address: IpAddr::from_str(addr[0])?, address: "0.0.0.0".to_owned(), net: Arc::new(Net::new(Some(NetConfig::default()))), @@ -129,5 +126,6 @@ pub async fn init_turn_server( alloc_close_notify: None, }) .await?; + info!("TURN server started"); Ok(turn_server) }