diff --git a/Cargo.lock b/Cargo.lock index 0f8bc5da..e3135876 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -371,6 +371,8 @@ checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" [[package]] name = "bytesio" version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff19234db41a12c2dedbfda4b113b7e4bc39a3dba3ce29ff0ce258cad0a54129" dependencies = [ "async-trait", "byteorder", @@ -386,9 +388,7 @@ dependencies = [ [[package]] name = "bytesio" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff19234db41a12c2dedbfda4b113b7e4bc39a3dba3ce29ff0ce258cad0a54129" +version = "0.3.2" dependencies = [ "async-trait", "byteorder", @@ -448,6 +448,7 @@ dependencies = [ "js-sys", "num-integer", "num-traits", + "serde", "time 0.1.44", "wasm-bindgen", "winapi", @@ -506,7 +507,7 @@ dependencies = [ [[package]] name = "commonlib" -version = "0.1.0" +version = "0.1.1" dependencies = [ "anyhow", "chrono", @@ -821,7 +822,7 @@ dependencies = [ [[package]] name = "env_logger_extend" -version = "0.1.2" +version = "0.1.3" dependencies = [ "anyhow", "chrono", @@ -1130,7 +1131,7 @@ version = "0.2.0" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "failure", "log", ] @@ -1143,7 +1144,7 @@ checksum = "c8f343949d0d608d23ba6ebf2d79337ef188749bae7143c187da2851e80bc171" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytesio 0.3.1", "failure", "log", ] @@ -1320,6 +1321,7 @@ dependencies = [ "axum 0.7.4", "byteorder", "bytes", + "chrono", "commonlib", "failure", "futures", @@ -2259,7 +2261,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytesio 0.3.1", "chrono", "failure", "h264-decoder 0.2.1", @@ -2283,7 +2285,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "chrono", "commonlib", "failure", @@ -2662,7 +2664,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytesio 0.3.1", "chrono", "failure", "indexmap 1.9.3", @@ -2682,7 +2684,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "chrono", "failure", "indexmap 1.9.3", @@ -3779,7 +3781,7 @@ version = "0.3.0" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "failure", "h264-decoder 0.2.0", "indexmap 1.9.3", @@ -3795,7 +3797,7 @@ checksum = "99aa4657821f6191a4640e7f7566f8aece2efd6269845403b520dd26266a7577" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytesio 0.3.1", "failure", "h264-decoder 0.2.1", "log", @@ -3834,7 +3836,7 @@ version = "0.2.0" dependencies = [ "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "failure", ] @@ -3846,7 +3848,7 @@ dependencies = [ "base64", "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "chrono", "commonlib", "failure", @@ -3856,6 +3858,7 @@ dependencies = [ "lazy_static", "log", "rand 0.8.5", + "serde_json", "streamhub 0.1.1", "tokio", ] @@ -3868,7 +3871,7 @@ dependencies = [ "audiopus", "byteorder", "bytes", - "bytesio 0.3.1", + "bytesio 0.3.2", "commonlib", "failure", "fdk-aac", diff --git a/application/xiu/CHANGELOG.md b/application/xiu/CHANGELOG.md index cb30e77a..00fd4e3f 100644 --- a/application/xiu/CHANGELOG.md +++ b/application/xiu/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.12.5] +- Support querying more detailed statistic data by adding two new HTTP APIs. +- Fix publishing RTSP stream error caused by network problem. by @bailb +- Fix the bug that stopping the playback of RTSP stream leads to push(publish) failure. +- Upgrade failure library. + ## [0.12.4] - Fix the failure in generating Docker images. diff --git a/application/xiu/Cargo.toml b/application/xiu/Cargo.toml index 4cf275f3..6429e13f 100644 --- a/application/xiu/Cargo.toml +++ b/application/xiu/Cargo.toml @@ -15,7 +15,7 @@ serde_derive = "1.0" serde = { version = "1.0.101", optional = true, features = ["derive"] } anyhow = "^1.0" log = "0.4.0" -failure = "0.1.1" +failure = "0.1.8" clap = "4.1.4" libc = "0.2.139" serde_json = { version = "1", default-features = false, features = [ diff --git a/application/xiu/src/api.rs b/application/xiu/src/api.rs index 0fbfc4dc..de31deed 100644 --- a/application/xiu/src/api.rs +++ b/application/xiu/src/api.rs @@ -1,22 +1,45 @@ use { anyhow::Result, axum::{ + extract::Query, routing::{get, post}, Json, Router, }, serde::Deserialize, + serde_json::Value, std::sync::Arc, - streamhub::{define, define::StreamHubEventSender, utils::Uuid}, - { - tokio, - tokio::sync::{mpsc, oneshot}, + streamhub::{ + define::{self, StreamHubEventSender}, + stream::StreamIdentifier, + utils::Uuid, }, + tokio::{self, sync::oneshot}, }; +#[derive(serde::Serialize)] +struct ApiResponse { + error_code: i32, + desp: String, + data: T, +} + // the input to our `KickOffClient` handler #[derive(Deserialize)] struct KickOffClient { - id: String, + uuid: String, +} + +#[derive(Deserialize, Debug)] +struct QueryWholeStreamsParams { + // query top N by subscriber's count. + top: Option, +} + +#[derive(Deserialize)] +struct QueryStream { + identifier: StreamIdentifier, + // if specify uuid, then query the stream by uuid and filter no used data. + uuid: Option, } #[derive(Clone)] @@ -28,50 +51,89 @@ impl ApiService { async fn root(&self) -> String { String::from( "Usage of xiu http api: - ./get_stream_status(get) get audio and video stream statistic information. - ./kick_off_client(post) kick off client by publish/subscribe id.\n", + ./api/query_whole_streams(get) query whole streams' information or top streams' information. + ./api/query_stream(post) query stream information by identifier and uuid. + ./api/kick_off_client(post) kick off client by publish/subscribe id.\n", ) } - async fn get_stream_status(&self) -> Result { - let (data_sender, mut data_receiver) = mpsc::unbounded_channel(); - let (size_sender, size_receiver) = oneshot::channel(); + async fn query_whole_streams( + &self, + params: QueryWholeStreamsParams, + ) -> Json> { + log::info!("query_whole_streams: {:?}", params); + let (result_sender, result_receiver) = oneshot::channel(); let hub_event = define::StreamHubEvent::ApiStatistic { - data_sender, - size_sender, + top_n: params.top, + identifier: None, + uuid: None, + result_sender, }; if let Err(err) = self.channel_event_producer.send(hub_event) { log::error!("send api event error: {}", err); } - let mut data = Vec::new(); - match size_receiver.await { - Ok(size) => { - if size == 0 { - return Ok(String::from("no stream data")); - } - loop { - if let Some(stream_statistics) = data_receiver.recv().await { - data.push(stream_statistics); - } - if data.len() == size { - break; - } - } + + match result_receiver.await { + Ok(dat_val) => { + let api_response = ApiResponse { + error_code: 0, + desp: String::from("succ"), + data: dat_val, + }; + Json(api_response) } Err(err) => { - log::error!("start_api_service recv size error: {}", err); + let api_response = ApiResponse { + error_code: -1, + desp: String::from("failed"), + data: serde_json::json!(err.to_string()), + }; + Json(api_response) } } + } + + async fn query_stream(&self, stream: QueryStream) -> Json> { + let uuid = if let Some(uid) = stream.uuid { + Uuid::from_str2(&uid) + } else { + None + }; - if let Ok(data) = serde_json::to_string(&data) { - return Ok(data); + let (result_sender, result_receiver) = oneshot::channel(); + let hub_event = define::StreamHubEvent::ApiStatistic { + top_n: None, + identifier: Some(stream.identifier), + uuid, + result_sender, + }; + + if let Err(err) = self.channel_event_producer.send(hub_event) { + log::error!("send api event error: {}", err); } - Ok(String::from("")) + match result_receiver.await { + Ok(dat_val) => { + let api_response = ApiResponse { + error_code: 0, + desp: String::from("succ"), + data: dat_val, + }; + Json(api_response) + } + Err(err) => { + let api_response = ApiResponse { + error_code: -1, + desp: String::from("failed"), + data: serde_json::json!(err.to_string()), + }; + Json(api_response) + } + } } async fn kick_off_client(&self, id: KickOffClient) -> Result { - let id_result = Uuid::from_str2(&id.id); + let id_result = Uuid::from_str2(&id.uuid); if let Some(id) = id_result { let hub_event = define::StreamHubEvent::ApiKickClient { id }; @@ -93,17 +155,19 @@ pub async fn run(producer: StreamHubEventSender, port: usize) { let api_root = api.clone(); let root = move || async move { api_root.root().await }; - let get_status = api.clone(); - let status = move || async move { - match get_status.get_stream_status().await { - Ok(response) => response, - Err(_) => "error".to_owned(), - } + let api_query_streams = api.clone(); + let query_streams = move |Query(params): Query| async move { + api_query_streams.query_whole_streams(params).await + }; + + let api_query_stream = api.clone(); + let query_stream = move |Json(stream): Json| async move { + api_query_stream.query_stream(stream).await }; - let kick_off = api.clone(); - let kick = move |Json(id): Json| async move { - match kick_off.kick_off_client(id).await { + let api_kick_off = api.clone(); + let kick_off = move |Json(id): Json| async move { + match api_kick_off.kick_off_client(id).await { Ok(response) => response, Err(_) => "error".to_owned(), } @@ -111,8 +175,9 @@ pub async fn run(producer: StreamHubEventSender, port: usize) { let app = Router::new() .route("/", get(root)) - .route("/get_stream_status", get(status)) - .route("/kick_off_client", post(kick)); + .route("/api/query_whole_streams", get(query_streams)) + .route("/api/query_stream", post(query_stream)) + .route("/api/kick_off_client", post(kick_off)); log::info!("Http api server listening on http://0.0.0.0:{}", port); axum::Server::bind(&([0, 0, 0, 0], port as u16).into()) diff --git a/confs/local/flv.Cargo.toml b/confs/local/flv.Cargo.toml index 3e455013..956ba59a 100644 --- a/confs/local/flv.Cargo.toml +++ b/confs/local/flv.Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" serde = { version = "1.0", features = ["derive", "rc"] } log = "0.4" diff --git a/confs/local/h264.Cargo.toml b/confs/local/h264.Cargo.toml index 7170f937..876093f7 100644 --- a/confs/local/h264.Cargo.toml +++ b/confs/local/h264.Cargo.toml @@ -13,6 +13,6 @@ repository = "https://github.com/harlanc/xiu" byteorder = "1.4.2" bytes = "1.0.0" log = "0.4" -failure = "0.1.1" +failure = "0.1.8" bytesio = { path = "../../bytesio/" } diff --git a/confs/local/hls.Cargo.toml b/confs/local/hls.Cargo.toml index de28e0bc..9acdd6c7 100644 --- a/confs/local/hls.Cargo.toml +++ b/confs/local/hls.Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" axum = { version = "0.7.4" } tokio-util = { version = "0.6.5", features = ["codec"] } diff --git a/confs/local/httpflv.Cargo.toml b/confs/local/httpflv.Cargo.toml index c90c2dbb..e2da0043 100644 --- a/confs/local/httpflv.Cargo.toml +++ b/confs/local/httpflv.Cargo.toml @@ -13,15 +13,16 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" axum = { version = "0.7.4" } futures = "0.3" +chrono = "0.4" streamhub = { path = "../../library/streamhub/" } xflv = { path = "../../library/container/flv/" } -rtmp = { path = "../rtmp/" } #"0.0.4" commonlib = { path = "../../library/common/" } + [dependencies.tokio] version = "1.4.0" default-features = false diff --git a/confs/local/mpegts.Cargo.toml b/confs/local/mpegts.Cargo.toml index da5ede52..675c60fb 100644 --- a/confs/local/mpegts.Cargo.toml +++ b/confs/local/mpegts.Cargo.toml @@ -13,5 +13,5 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" bytesio = { path = "../../bytesio/" } diff --git a/confs/local/rtmp.Cargo.toml b/confs/local/rtmp.Cargo.toml index 8992a5f7..3593e9a4 100644 --- a/confs/local/rtmp.Cargo.toml +++ b/confs/local/rtmp.Cargo.toml @@ -14,7 +14,7 @@ edition = "2018" byteorder = "1.4.2" bytes = "1.0.0" rand = "0.3" -failure = "0.1.1" +failure = "0.1.8" hmac = "0.11.0" sha2 = "0.9" # uuid = { version = "0.6.5", features = ["v4"] } diff --git a/confs/local/rtsp.Cargo.toml b/confs/local/rtsp.Cargo.toml index f52a73d6..b08431f5 100644 --- a/confs/local/rtsp.Cargo.toml +++ b/confs/local/rtsp.Cargo.toml @@ -11,7 +11,7 @@ byteorder = "1.4.2" tokio = "1.4.0" bytes = "1.0.0" log = "0.4" -failure = "0.1.1" +failure = "0.1.8" http = "0.2.9" indexmap = "1.9.3" lazy_static = "1.4.0" @@ -19,7 +19,12 @@ chrono = "0.4" async-trait = "0.1.70" base64 = "0.21.2" hex = "0.4.3" +serde_json = { version = "1", default-features = false, features = [ + "alloc", + "raw_value", + "std", +] } bytesio = { path = "../../library/bytesio/" } streamhub = { path = "../../library/streamhub/" } -commonlib = { path = "../../library/common/" } \ No newline at end of file +commonlib = { path = "../../library/common/" } diff --git a/confs/local/streamhub.Cargo.toml b/confs/local/streamhub.Cargo.toml index f1ec3850..769a8b5c 100644 --- a/confs/local/streamhub.Cargo.toml +++ b/confs/local/streamhub.Cargo.toml @@ -10,14 +10,15 @@ repository = "https://github.com/harlanc/xiu" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -failure = "0.1.1" +failure = "0.1.8" byteorder = "1.4.2" bytes = "1.0.0" rand = "0.8" log = "0.4" -chrono = "0.4" +chrono = { version = "0.4", features = ["serde"] } indexmap = "1.9.3" -reqwest = "0.11.14" +#use vendored feature to enable cross compile for openssl +reqwest = { version = "0.11.24", features = ["native-tls-vendored"] } async-trait = "0.1.70" serde_json = { version = "1", default-features = false, features = [ "alloc", diff --git a/confs/local/webrtc.Cargo.toml b/confs/local/webrtc.Cargo.toml index 7718dec8..87c3722b 100644 --- a/confs/local/webrtc.Cargo.toml +++ b/confs/local/webrtc.Cargo.toml @@ -15,7 +15,7 @@ http = "0.2.9" byteorder = "1.4.2" bytes = "1.0.0" tokio = "1.4.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" webrtc = "0.8.0" async-trait = "0.1.70" diff --git a/confs/local/xiu.Cargo.toml b/confs/local/xiu.Cargo.toml index a80459a0..75108be8 100644 --- a/confs/local/xiu.Cargo.toml +++ b/confs/local/xiu.Cargo.toml @@ -15,7 +15,7 @@ serde_derive = "1.0" serde = { version = "1.0.101", optional = true, features = ["derive"] } anyhow = "^1.0" log = "0.4.0" -failure = "0.1.1" +failure = "0.1.8" clap = "4.1.4" libc = "0.2.139" serde_json = { version = "1", default-features = false, features = [ diff --git a/confs/online/common.Cargo.toml b/confs/online/common.Cargo.toml index c940b211..518be33a 100644 --- a/confs/online/common.Cargo.toml +++ b/confs/online/common.Cargo.toml @@ -1,6 +1,6 @@ [package] name = "commonlib" -version = "0.1.0" +version = "0.1.1" authors = ["HarlanC "] edition = "2018" description = "a common library for xiu project." @@ -14,7 +14,7 @@ anyhow = "^1.0" env_logger = "0.10.0" job_scheduler_ng = "2.0.4" chrono = "0.4" -failure = "0.1.1" +failure = "0.1.8" log = "0.4.0" indexmap = "1.9.3" md5 = "0.7.0" diff --git a/confs/online/flv.Cargo.toml b/confs/online/flv.Cargo.toml index 4b8d6f31..6a2305e6 100644 --- a/confs/online/flv.Cargo.toml +++ b/confs/online/flv.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "xflv" description = "flv library." -version = "0.4.1" +version = "0.4.2" authors = ["HarlanC "] description = "a h264 decoder" @@ -13,7 +13,7 @@ repository = "https://github.com/harlanc/xiu" byteorder = "1.4.2" bytes = "1.0.0" log = "0.4" -failure = "0.1.1" +failure = "0.1.8" -bytesio = "0.3.1" +bytesio = "0.3.2" diff --git a/confs/online/hls.Cargo.toml b/confs/online/hls.Cargo.toml index 0ee9749a..a3ef2ea9 100644 --- a/confs/online/hls.Cargo.toml +++ b/confs/online/hls.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "hls" description = "hls library." -version = "0.5.2" +version = "0.5.3" authors = ["HarlanC "] @@ -15,7 +15,7 @@ byteorder = "1.4.2" tokio = "1.4.0" bytes = "1.0.0" log = "0.4" -failure = "0.1.1" +failure = "0.1.8" http = "0.2.9" indexmap = "1.9.3" lazy_static = "1.4.0" @@ -24,6 +24,6 @@ async-trait = "0.1.70" base64 = "0.21.2" hex = "0.4.3" -bytesio = "0.3.1" -streamhub = "0.2.1" -commonlib = "0.1.0" +bytesio = "0.3.2" +streamhub = "0.2.2" +commonlib = "0.1.1" diff --git a/confs/online/streamhub.Cargo.toml b/confs/online/streamhub.Cargo.toml index 5b17bc9b..ef2cc141 100644 --- a/confs/online/streamhub.Cargo.toml +++ b/confs/online/streamhub.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "streamhub" description = "It receives streams from publishers(rtmp/rtsp etc.) and send streams to subscribers(rtmp/rtsp/httpflv/hls)" -version = "0.2.1" +version = "0.2.2" edition = "2021" authors = ["HarlanC "] license = "MIT" @@ -10,7 +10,7 @@ repository = "https://github.com/harlanc/xiu" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -failure = "0.1.1" +failure = "0.1.8" byteorder = "1.4.2" bytes = "1.0.0" rand = "0.8" @@ -27,8 +27,8 @@ serde_json = { version = "1", default-features = false, features = [ ] } serde = { version = "1.0", features = ["derive", "rc"] } -xflv = "0.4.1" -bytesio = "0.3.1" +xflv = "0.4.2" +bytesio = "0.3.2" [dependencies.tokio] version = "1.4.0" diff --git a/confs/online/webrtc.Cargo.toml b/confs/online/webrtc.Cargo.toml index 3990e04b..3dbf1358 100644 --- a/confs/online/webrtc.Cargo.toml +++ b/confs/online/webrtc.Cargo.toml @@ -1,6 +1,6 @@ [package] name = "xwebrtc" -version = "0.3.1" +version = "0.3.2" description = "A whip/whep library." edition = "2021" authors = ["HarlanC "] @@ -15,14 +15,14 @@ http = "0.2.9" byteorder = "1.4.2" bytes = "1.0.0" tokio = "1.4.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" webrtc = "0.10.1" async-trait = "0.1.70" fdk-aac = "0.6.0" audiopus = "0.3.0-rc.0" -bytesio = "0.3.1" -streamhub = "0.2.1" -xflv = "0.4.1" -commonlib = "0.1.0" +bytesio = "0.3.2" +streamhub = "0.2.2" +xflv = "0.4.2" +commonlib = "0.1.1" diff --git a/confs/online/xiu.Cargo.toml b/confs/online/xiu.Cargo.toml index 95825eb8..08e6d2ba 100644 --- a/confs/online/xiu.Cargo.toml +++ b/confs/online/xiu.Cargo.toml @@ -1,7 +1,7 @@ [package] name = "xiu" description = "A powerful live server by Rust ." -version = "0.12.4" +version = "0.12.5" authors = ["HarlanC + +## [Unreleased] - ReleaseDate + +## [0.3.2] - 2021-03-15 +- Upgrade failure library. + +## [0.3.1] +- Remove no used "\n" for error message. +- Fix read timeout for UDP/TCP IO. + +## [0.3.0] +- Support Udp. + +## [0.2.0] +- Add bits reader and writer. + +## [0.1.27] +- Add functions. + + + + + + + + + diff --git a/library/bytesio/Cargo.toml b/library/bytesio/Cargo.toml index 99e1352a..4403fb6f 100644 --- a/library/bytesio/Cargo.toml +++ b/library/bytesio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bytesio" -version = "0.3.1" +version = "0.3.2" authors = ["HarlanC "] edition = "2018" description = "a network io library using tokio." @@ -13,7 +13,7 @@ repository = "https://github.com/harlanc/xiu" byteorder = "1.4.2" bytes = "1.0.0" rand = "0.3" -failure = "0.1.1" +failure = "0.1.8" tokio-util = { version = "0.6.5", features = ["codec"] } futures = "0.3.5" tokio-stream = { version = "0.1" } diff --git a/library/bytesio/README.md b/library/bytesio/README.md index 3029063c..68f132ba 100644 --- a/library/bytesio/README.md +++ b/library/bytesio/README.md @@ -1,11 +1,2 @@ A network bytes io library. -## v0.1.27 -Add functions. -## v0.2.0 -Add bits reader and writer. -## v0.3.0 -Support Udp. -## v0.3.1 -- Remove no used "\n" for error message. -- Fix read timeout for UDP/TCP IO. \ No newline at end of file diff --git a/library/codec/h264/CHANGELOG.md b/library/codec/h264/CHANGELOG.md new file mode 100644 index 00000000..a5367493 --- /dev/null +++ b/library/codec/h264/CHANGELOG.md @@ -0,0 +1,32 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + + + +## [Unreleased] - ReleaseDate + +## [0.2.2] - 2021-03-15 +- Upgrade failure library. + +## [0.2.1] +- Remove no used "\n" for error message. +- Fix sps parse error. + +## [0.2.0] +- Reference bytesio v0.3.0. + +## [0.1.0] +- Add sps parser. + + + + + + + + + diff --git a/library/codec/h264/Cargo.toml b/library/codec/h264/Cargo.toml index 7170f937..876093f7 100644 --- a/library/codec/h264/Cargo.toml +++ b/library/codec/h264/Cargo.toml @@ -13,6 +13,6 @@ repository = "https://github.com/harlanc/xiu" byteorder = "1.4.2" bytes = "1.0.0" log = "0.4" -failure = "0.1.1" +failure = "0.1.8" bytesio = { path = "../../bytesio/" } diff --git a/library/codec/h264/README.md b/library/codec/h264/README.md index bd412723..6ba2e701 100644 --- a/library/codec/h264/README.md +++ b/library/codec/h264/README.md @@ -1,10 +1,4 @@ A h264 decoder library. -## v0.1.0 -- Add sps parser. -## v0.2.0 -- Reference bytesio v0.3.0. -## v0.2.1 -- Remove no used "\n" for error message. -- Fix sps parse error. + diff --git a/library/common/CHANGELOG.md b/library/common/CHANGELOG.md new file mode 100644 index 00000000..3428a165 --- /dev/null +++ b/library/common/CHANGELOG.md @@ -0,0 +1,28 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + + + +## [Unreleased] - ReleaseDate + +## [0.1.1] - 2021-03-15 +- Upgrade failure library. + +## [0.1.0] +- The first version including HTTP/auth logics. + + + + + + + + + + + + diff --git a/library/common/Cargo.toml b/library/common/Cargo.toml index c940b211..518be33a 100644 --- a/library/common/Cargo.toml +++ b/library/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "commonlib" -version = "0.1.0" +version = "0.1.1" authors = ["HarlanC "] edition = "2018" description = "a common library for xiu project." @@ -14,7 +14,7 @@ anyhow = "^1.0" env_logger = "0.10.0" job_scheduler_ng = "2.0.4" chrono = "0.4" -failure = "0.1.1" +failure = "0.1.8" log = "0.4.0" indexmap = "1.9.3" md5 = "0.7.0" diff --git a/library/common/README.md b/library/common/README.md index bfd5bc80..d1467966 100644 --- a/library/common/README.md +++ b/library/common/README.md @@ -1,4 +1 @@ -A common library. - -## v0.1.0 -- The first version including HTTP/auth logics. \ No newline at end of file +A common library. \ No newline at end of file diff --git a/library/container/flv/CHANGELOG.md b/library/container/flv/CHANGELOG.md index 799be4ca..c19db6e3 100644 --- a/library/container/flv/CHANGELOG.md +++ b/library/container/flv/CHANGELOG.md @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.4.2] - 2021-03-15 +- Upgrade failure library. + ## [0.4.1] - 2021-02-28 - Move amf0 mod from RTMP library to this library diff --git a/library/container/flv/Cargo.toml b/library/container/flv/Cargo.toml index 3f8427d4..f0fc7629 100644 --- a/library/container/flv/Cargo.toml +++ b/library/container/flv/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" serde = { version = "1.0", features = ["derive", "rc"] } log = "0.4" indexmap = "1.9.3" diff --git a/library/container/flv/src/define.rs b/library/container/flv/src/define.rs index 76ed98a5..30f09425 100644 --- a/library/container/flv/src/define.rs +++ b/library/container/flv/src/define.rs @@ -5,6 +5,7 @@ use serde::Serialize; pub enum SoundFormat { #[default] AAC = 10, + OPUS = 13, } pub mod aac_packet_type { diff --git a/library/container/mpegts/CHANGELOG.md b/library/container/mpegts/CHANGELOG.md new file mode 100644 index 00000000..21879b26 --- /dev/null +++ b/library/container/mpegts/CHANGELOG.md @@ -0,0 +1,44 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + + + +## [Unreleased] - ReleaseDate + +## [0.2.2] - 2021-03-15 +- Upgrade failure library. + +## [0.2.1] +- Remove no used "\n" for error message. + +## [0.2.0] +- Reference bytesio v0.3.0. + +## [0.1.1] +- Refactor codes. + +## [0.1.0] +- Fix the error chain. +- Remove compile warnings. + +## [0.0.2] +- Remove no used dependences. + +## [0.0.1] +- Support mux H264/AAC data to ts format. + + + + + + + + + + + + diff --git a/library/container/mpegts/Cargo.toml b/library/container/mpegts/Cargo.toml index da5ede52..675c60fb 100644 --- a/library/container/mpegts/Cargo.toml +++ b/library/container/mpegts/Cargo.toml @@ -13,5 +13,5 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" bytesio = { path = "../../bytesio/" } diff --git a/library/container/mpegts/README.md b/library/container/mpegts/README.md index 889ea0da..9a6af4a3 100644 --- a/library/container/mpegts/README.md +++ b/library/container/mpegts/README.md @@ -1,17 +1,5 @@ A mpegts library. -## v0.0.1 -Support mux H264/AAC data to ts format. -## v0.0.2 -Remove no used dependences. -## v0.1.0 -- Fix the error chain. -- Remove compile warnings. -## v0.1.1 -- Refactor codes. -## v0.2.0 -- Reference bytesio v0.3.0. -## v0.2.1 -- Remove no used "\n" for error message. + diff --git a/library/logger/CHANGELOG.md b/library/logger/CHANGELOG.md new file mode 100644 index 00000000..6185a179 --- /dev/null +++ b/library/logger/CHANGELOG.md @@ -0,0 +1,33 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + + + +## [Unreleased] - ReleaseDate + +## [0.1.3] - 2021-03-15 +- Upgrade failure library. + +## [0.1.2] +- Remove build warnings. + +## [0.1.1] +- Refactor codes; + +## [0.1.0] +- The first version. + + + + + + + + + + + diff --git a/library/logger/Cargo.toml b/library/logger/Cargo.toml index ad7841ec..2d066af1 100644 --- a/library/logger/Cargo.toml +++ b/library/logger/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "env_logger_extend" -version = "0.1.2" +version = "0.1.3" authors = ["HarlanC "] edition = "2018" description = "a logger library." @@ -14,5 +14,5 @@ anyhow = "^1.0" env_logger = "0.10.0" job_scheduler_ng = "2.0.4" chrono = "0.4" -failure = "0.1.1" +failure = "0.1.8" log = "0.4.0" diff --git a/library/streamhub/CHANGELOG.md b/library/streamhub/CHANGELOG.md index 8f86fb6e..cc77ae35 100644 --- a/library/streamhub/CHANGELOG.md +++ b/library/streamhub/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.2.2] - 2021-03-15 +- Upgrade failure library. +- Support querying more detailed statistic data. + ## [0.2.1] - 2021-02-28 - Refactor api_kick_off_client of streamhub to simplify the process; - Rename struct name. diff --git a/library/streamhub/Cargo.toml b/library/streamhub/Cargo.toml index ae168970..769a8b5c 100644 --- a/library/streamhub/Cargo.toml +++ b/library/streamhub/Cargo.toml @@ -10,15 +10,15 @@ repository = "https://github.com/harlanc/xiu" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -failure = "0.1.1" +failure = "0.1.8" byteorder = "1.4.2" bytes = "1.0.0" rand = "0.8" log = "0.4" -chrono = "0.4" +chrono = { version = "0.4", features = ["serde"] } indexmap = "1.9.3" #use vendored feature to enable cross compile for openssl -reqwest = {version = "0.11.24",features = ["native-tls-vendored"]} +reqwest = { version = "0.11.24", features = ["native-tls-vendored"] } async-trait = "0.1.70" serde_json = { version = "1", default-features = false, features = [ "alloc", diff --git a/library/streamhub/src/define.rs b/library/streamhub/src/define.rs index 8689533d..1b987dd7 100644 --- a/library/streamhub/src/define.rs +++ b/library/streamhub/src/define.rs @@ -1,8 +1,12 @@ +use chrono::{DateTime, Local}; +use serde_json::Value; +use xflv::define::{AacProfile, AvcCodecId, AvcLevel, AvcProfile, SoundFormat}; + use crate::utils; use { super::errors::StreamHubError, - crate::statistics::StreamStatistics, + crate::statistics::StatisticsStream, crate::stream::StreamIdentifier, async_trait::async_trait, bytes::BytesMut, @@ -158,18 +162,31 @@ pub type StreamHubEventReceiver = mpsc::UnboundedReceiver; pub type BroadcastEventSender = broadcast::Sender; pub type BroadcastEventReceiver = broadcast::Receiver; -pub type TransmitterEventSender = mpsc::UnboundedSender; -pub type TransmitterEventReceiver = mpsc::UnboundedReceiver; +pub type TransceiverEventSender = mpsc::UnboundedSender; +pub type TransceiverEventReceiver = mpsc::UnboundedReceiver; + +pub type StatisticDataSender = mpsc::UnboundedSender; +pub type StatisticDataReceiver = mpsc::UnboundedReceiver; -pub type AvStatisticSender = mpsc::UnboundedSender; -pub type AvStatisticReceiver = mpsc::UnboundedReceiver; +pub type StatisticStreamSender = mpsc::UnboundedSender; +pub type StatisticStreamReceiver = mpsc::UnboundedReceiver; -pub type StreamStatisticSizeSender = oneshot::Sender; -pub type StreamStatisticSizeReceiver = oneshot::Receiver; +pub type StatisticApiResultSender = oneshot::Sender; +pub type StatisticApiResultReceiver = oneshot::Receiver; -pub type SubEventExecuteResultSender = oneshot::Sender>; -pub type PubEventExecuteResultSender = - oneshot::Sender, Option), StreamHubError>>; +pub type SubEventExecuteResultSender = + oneshot::Sender), StreamHubError>>; +pub type PubEventExecuteResultSender = oneshot::Sender< + Result< + ( + Option, + Option, + Option, + ), + StreamHubError, + >, +>; +pub type TransceiverEventExecuteResultSender = oneshot::Sender; #[async_trait] pub trait TStreamHandler: Send + Sync { @@ -178,7 +195,7 @@ pub trait TStreamHandler: Send + Sync { sender: DataSender, sub_type: SubscribeType, ) -> Result<(), StreamHubError>; - async fn get_statistic_data(&self) -> Option; + async fn get_statistic_data(&self) -> Option; async fn send_information(&self, sender: InformationSender); } @@ -234,8 +251,10 @@ pub enum StreamHubEvent { }, #[serde(skip_serializing)] ApiStatistic { - data_sender: AvStatisticSender, - size_sender: StreamStatisticSizeSender, + top_n: Option, + identifier: Option, + uuid: Option, + result_sender: StatisticApiResultSender, }, #[serde(skip_serializing)] ApiKickClient { id: Uuid }, @@ -248,10 +267,11 @@ pub enum StreamHubEvent { } #[derive(Debug)] -pub enum TransmitterEvent { +pub enum TransceiverEvent { Subscribe { sender: DataSender, info: SubscriberInfo, + result_sender: TransceiverEventExecuteResultSender, }, UnSubscribe { info: SubscriberInfo, @@ -259,14 +279,15 @@ pub enum TransmitterEvent { UnPublish {}, Api { - sender: AvStatisticSender, + sender: StatisticStreamSender, + uuid: Option, }, Request { sender: InformationSender, }, } -impl fmt::Display for TransmitterEvent { +impl fmt::Display for TransceiverEvent { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "{:?}", *self) } @@ -281,3 +302,43 @@ pub enum BroadcastEvent { Subscribe { identifier: StreamIdentifier }, UnSubscribe { identifier: StreamIdentifier }, } + +pub enum StatisticData { + AudioCodec { + sound_format: SoundFormat, + profile: AacProfile, + samplerate: u32, + channels: u8, + }, + VideoCodec { + codec: AvcCodecId, + profile: AvcProfile, + level: AvcLevel, + width: u32, + height: u32, + }, + Audio { + uuid: Option, + data_size: usize, + aac_packet_type: u8, + duration: usize, + }, + Video { + uuid: Option, + data_size: usize, + frame_count: usize, + is_key_frame: Option, + duration: usize, + }, + Publisher { + id: Uuid, + remote_addr: String, + start_time: DateTime, + }, + Subscriber { + id: Uuid, + remote_addr: String, + sub_type: SubscribeType, + start_time: DateTime, + }, +} diff --git a/library/streamhub/src/errors.rs b/library/streamhub/src/errors.rs index 16399008..0239dde4 100644 --- a/library/streamhub/src/errors.rs +++ b/library/streamhub/src/errors.rs @@ -1,6 +1,8 @@ use bytesio::bytes_errors::BytesReadError; use bytesio::bytes_errors::BytesWriteError; use failure::Backtrace; +use serde_json::error::Error; +use tokio::sync::oneshot::error::RecvError; use {failure::Fail, std::fmt}; #[derive(Debug, Fail)] @@ -25,6 +27,10 @@ pub enum StreamHubErrorValue { BytesWriteError(BytesWriteError), #[fail(display = "not correct data sender type")] NotCorrectDataSenderType, + #[fail(display = "Tokio oneshot recv error")] + RecvError(RecvError), + #[fail(display = "Serde json error")] + SerdeError(Error), } #[derive(Debug)] pub struct StreamHubError { @@ -63,6 +69,22 @@ impl From for StreamHubError { } } +impl From for StreamHubError { + fn from(error: RecvError) -> Self { + StreamHubError { + value: StreamHubErrorValue::RecvError(error), + } + } +} + +impl From for StreamHubError { + fn from(error: Error) -> Self { + StreamHubError { + value: StreamHubErrorValue::SerdeError(error), + } + } +} + // impl From for ChannelError { // fn from(error: CacheError) -> Self { // ChannelError { diff --git a/library/streamhub/src/lib.rs b/library/streamhub/src/lib.rs index 7ceaa5ab..156c8970 100644 --- a/library/streamhub/src/lib.rs +++ b/library/streamhub/src/lib.rs @@ -1,4 +1,11 @@ -use define::{FrameDataReceiver, PacketDataReceiver, PacketDataSender}; +use define::{ + FrameDataReceiver, PacketDataReceiver, PacketDataSender, StatisticData, StatisticDataReceiver, + StatisticDataSender, +}; +use serde_json::{json, Value}; +use statistics::{StatisticSubscriber, StatisticsStream}; +use tokio::sync::oneshot; +use xflv::define::aac_packet_type; use crate::define::PacketData; @@ -12,11 +19,10 @@ pub mod utils; use { crate::notify::Notifier, define::{ - AvStatisticSender, BroadcastEvent, BroadcastEventReceiver, BroadcastEventSender, - DataReceiver, DataSender, FrameData, FrameDataSender, Information, StreamHubEvent, - StreamHubEventReceiver, StreamHubEventSender, StreamStatisticSizeSender, SubscribeType, - SubscriberInfo, TStreamHandler, TransmitterEvent, TransmitterEventReceiver, - TransmitterEventSender, + BroadcastEvent, BroadcastEventReceiver, BroadcastEventSender, DataReceiver, DataSender, + FrameData, FrameDataSender, Information, StreamHubEvent, StreamHubEventReceiver, + StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, TransceiverEvent, + TransceiverEventReceiver, TransceiverEventSender, }, errors::{StreamHubError, StreamHubErrorValue}, std::collections::HashMap, @@ -26,35 +32,103 @@ use { utils::Uuid, }; -//receive data from ChannelsManager and send to players/subscribers -pub struct Transmitter { +//Receive audio data/video data/meta data/media info from a publisher and send to players/subscribers +//Receive statistic information from a publisher and send to api callers. +pub struct StreamDataTransceiver { //used for receiving Audio/Video data from publishers data_receiver: DataReceiver, //used for receiving event - event_receiver: TransmitterEventReceiver, + event_receiver: TransceiverEventReceiver, //used for sending audio/video frame data to players/subscribers id_to_frame_sender: Arc>>, //used for sending audio/video packet data to players/subscribers id_to_packet_sender: Arc>>, + //publisher and subscribers use this sender to submit statistical data + statistic_data_sender: StatisticDataSender, + //used for receiving statistical data from publishers and subscribers + statistic_data_receiver: StatisticDataReceiver, + //The publisher and subscribers's statistics data of a stream need to be aggregated and sent to the caller as needed. + statistic_data: Arc>, + //a hander implement by protocols, such as rtmp, webrtc, http-flv, hls stream_handler: Arc, } -impl Transmitter { +impl StreamDataTransceiver { fn new( data_receiver: DataReceiver, - event_receiver: UnboundedReceiver, + event_receiver: UnboundedReceiver, + identifier: StreamIdentifier, h: Arc, ) -> Self { + let (statistic_data_sender, statistic_data_receiver) = mpsc::unbounded_channel(); Self { data_receiver, event_receiver, + statistic_data_sender, + statistic_data_receiver, id_to_frame_sender: Arc::new(Mutex::new(HashMap::new())), id_to_packet_sender: Arc::new(Mutex::new(HashMap::new())), stream_handler: h, + statistic_data: Arc::new(Mutex::new(StatisticsStream::new(identifier))), } } - pub async fn receive_frame_data_loop( + async fn receive_frame_data( + data: Option, + frame_senders: &Arc>>, + ) { + if let Some(val) = data { + match val { + FrameData::MetaData { + timestamp: _, + data: _, + } => {} + FrameData::Audio { timestamp, data } => { + let data = FrameData::Audio { + timestamp, + data: data.clone(), + }; + + for (_, v) in frame_senders.lock().await.iter() { + if let Err(audio_err) = v.send(data.clone()).map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendAudioError, + }) { + log::error!("Transmiter send error: {}", audio_err); + } + } + } + FrameData::Video { timestamp, data } => { + let data = FrameData::Video { + timestamp, + data: data.clone(), + }; + for (_, v) in frame_senders.lock().await.iter() { + if let Err(video_err) = v.send(data.clone()).map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendVideoError, + }) { + log::error!("Transmiter send error: {}", video_err); + } + } + } + FrameData::MediaInfo { + media_info: info_value, + } => { + let data = FrameData::MediaInfo { + media_info: info_value, + }; + for (_, v) in frame_senders.lock().await.iter() { + if let Err(media_err) = v.send(data.clone()).map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendVideoError, + }) { + log::error!("Transmiter send error: {}", media_err); + } + } + } + } + } + } + + async fn receive_frame_data_loop( mut exit: broadcast::Receiver<()>, mut receiver: FrameDataReceiver, frame_senders: Arc>>, @@ -63,54 +137,7 @@ impl Transmitter { loop { tokio::select! { data = receiver.recv() => { - if let Some(val) = data { - match val { - FrameData::MetaData { - timestamp: _, - data: _, - } => {} - FrameData::Audio { timestamp, data } => { - let data = FrameData::Audio { - timestamp, - data: data.clone(), - }; - - for (_, v) in frame_senders.lock().await.iter() { - if let Err(audio_err) = v.send(data.clone()).map_err(|_| StreamHubError { - value: StreamHubErrorValue::SendAudioError, - }) { - log::error!("Transmiter send error: {}", audio_err); - } - } - } - FrameData::Video { timestamp, data } => { - let data = FrameData::Video { - timestamp, - data: data.clone(), - }; - for (_, v) in frame_senders.lock().await.iter() { - if let Err(video_err) = v.send(data.clone()).map_err(|_| StreamHubError { - value: StreamHubErrorValue::SendVideoError, - }) { - log::error!("Transmiter send error: {}", video_err); - } - } - } - FrameData::MediaInfo { media_info: info_value } => { - let data = FrameData::MediaInfo { - media_info: info_value - }; - for (_, v) in frame_senders.lock().await.iter() { - if let Err(media_err) = v.send(data.clone()).map_err(|_| StreamHubError { - value: StreamHubErrorValue::SendVideoError, - }) { - log::error!("Transmiter send error: {}", media_err); - } - } - - } - } - } + Self::receive_frame_data(data, &frame_senders).await; } _ = exit.recv()=>{ break; @@ -120,7 +147,44 @@ impl Transmitter { }); } - pub async fn receive_packet_data_loop( + async fn receive_packet_data( + data: Option, + packet_senders: &Arc>>, + ) { + if let Some(val) = data { + match val { + PacketData::Audio { timestamp, data } => { + let data = PacketData::Audio { + timestamp, + data: data.clone(), + }; + + for (_, v) in packet_senders.lock().await.iter() { + if let Err(audio_err) = v.send(data.clone()).map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendAudioError, + }) { + log::error!("Transmiter send error: {}", audio_err); + } + } + } + PacketData::Video { timestamp, data } => { + let data = PacketData::Video { + timestamp, + data: data.clone(), + }; + for (_, v) in packet_senders.lock().await.iter() { + if let Err(video_err) = v.send(data.clone()).map_err(|_| StreamHubError { + value: StreamHubErrorValue::SendVideoError, + }) { + log::error!("Transmiter send error: {}", video_err); + } + } + } + } + } + } + + async fn receive_packet_data_loop( mut exit: broadcast::Receiver<()>, mut receiver: PacketDataReceiver, packet_senders: Arc>>, @@ -129,59 +193,188 @@ impl Transmitter { loop { tokio::select! { data = receiver.recv() => { - if let Some(val) = data { - match val { - - PacketData::Audio { timestamp, data } => { - let data = PacketData::Audio { - timestamp, - data: data.clone(), - }; - - for (_, v) in packet_senders.lock().await.iter() { - if let Err(audio_err) = v.send(data.clone()).map_err(|_| StreamHubError { - value: StreamHubErrorValue::SendAudioError, - }) { - log::error!("Transmiter send error: {}", audio_err); - } - } - } - PacketData::Video { timestamp, data } => { - let data = PacketData::Video { - timestamp, - data: data.clone(), - }; - for (_, v) in packet_senders.lock().await.iter() { - if let Err(video_err) = v.send(data.clone()).map_err(|_| StreamHubError { - value: StreamHubErrorValue::SendVideoError, - }) { - log::error!("Transmiter send error: {}", video_err); - } - } - } + Self::receive_packet_data(data, &packet_senders).await; + } + _ = exit.recv()=>{ + break; + } + } + } + }); + } + + async fn receive_statistics_data( + data: Option, + statistics_data: &Arc>, + ) { + if let Some(val) = data { + match val { + StatisticData::Audio { + uuid, + data_size, + aac_packet_type, + duration: _, + } => { + if let Some(uid) = uuid { + { + let subscriber = &mut statistics_data.lock().await.subscribers; + if let Some(sub) = subscriber.get_mut(&uid) { + sub.send_bytes += data_size; + } + } + statistics_data.lock().await.total_send_bytes += data_size; + } else { + match aac_packet_type { + aac_packet_type::AAC_RAW => { + let audio_data = &mut statistics_data.lock().await.publisher.audio; + audio_data.recv_bytes += data_size; } + aac_packet_type::AAC_SEQHDR => {} + _ => {} } + statistics_data.lock().await.total_recv_bytes += data_size; } - _ = exit.recv()=>{ + } + StatisticData::Video { + uuid, + data_size, + frame_count, + is_key_frame, + duration: _, + } => { + //if it is a subscriber, we need to update the send_bytes + if let Some(uid) = uuid { + { + let subscriber = &mut statistics_data.lock().await.subscribers; + if let Some(sub) = subscriber.get_mut(&uid) { + sub.send_bytes += data_size; + sub.total_send_bytes += data_size; + } + } + + statistics_data.lock().await.total_send_bytes += data_size; + } + //if it is a publisher, we need to update the recv_bytes + else { + let stat_data = &mut statistics_data.lock().await; + stat_data.total_recv_bytes += data_size; + stat_data.publisher.video.recv_bytes += data_size; + stat_data.publisher.video.recv_frame_count += frame_count; + stat_data.publisher.recv_bytes += data_size; + if let Some(is_key) = is_key_frame { + if is_key { + stat_data.publisher.video.gop = + stat_data.publisher.video.recv_frame_count_for_gop; + stat_data.publisher.video.recv_frame_count_for_gop = 1; + } else { + stat_data.publisher.video.recv_frame_count_for_gop += frame_count; + } + } + } + } + StatisticData::AudioCodec { + sound_format, + profile, + samplerate, + channels, + } => { + let audio_codec_data = &mut statistics_data.lock().await.publisher.audio; + audio_codec_data.sound_format = sound_format; + audio_codec_data.profile = profile; + audio_codec_data.samplerate = samplerate; + audio_codec_data.channels = channels; + } + StatisticData::VideoCodec { + codec, + profile, + level, + width, + height, + } => { + let video_codec_data = &mut statistics_data.lock().await.publisher.video; + video_codec_data.codec = codec; + video_codec_data.profile = profile; + video_codec_data.level = level; + video_codec_data.width = width; + video_codec_data.height = height; + } + StatisticData::Publisher { + id, + remote_addr, + start_time, + } => { + let publisher = &mut statistics_data.lock().await.publisher; + publisher.id = id; + publisher.remote_address = remote_addr; + + publisher.start_time = start_time; + } + StatisticData::Subscriber { + id, + remote_addr, + sub_type, + start_time, + } => { + let subscriber = &mut statistics_data.lock().await.subscribers; + let sub = StatisticSubscriber { + id, + remote_address: remote_addr, + sub_type, + start_time, + send_bitrate: 0, + send_bytes: 0, + total_send_bytes: 0, + }; + subscriber.insert(id, sub); + } + } + } + } + + async fn receive_statistics_data_loop( + mut exit_receive: broadcast::Receiver<()>, + exit_caclulate: broadcast::Receiver<()>, + mut receiver: StatisticDataReceiver, + statistics_data: Arc>, + ) { + let mut statistic_calculate = + statistics::StatisticsCaculate::new(statistics_data.clone(), exit_caclulate); + tokio::spawn(async move { statistic_calculate.start().await }); + + tokio::spawn(async move { + loop { + tokio::select! { + data = receiver.recv() => + { + Self::receive_statistics_data(data, &statistics_data).await; + } + _ = exit_receive.recv()=>{ break; } } } }); } - pub async fn receive_event_loop( + + async fn receive_event_loop( stream_handler: Arc, exit: broadcast::Sender<()>, - mut receiver: TransmitterEventReceiver, + mut receiver: TransceiverEventReceiver, packet_senders: Arc>>, frame_senders: Arc>>, + statistic_sender: StatisticDataSender, + statistics_data: Arc>, ) { tokio::spawn(async move { loop { if let Some(val) = receiver.recv().await { match val { - TransmitterEvent::Subscribe { sender, info } => { + TransceiverEvent::Subscribe { + sender, + info, + result_sender, + } => { if let Err(err) = stream_handler .send_prior_data(sender.clone(), info.sub_type) .await @@ -201,31 +394,52 @@ impl Transmitter { packet_senders.lock().await.insert(info.id, packet_sender); } } - } - TransmitterEvent::UnSubscribe { info } => match info.sub_type { - SubscribeType::PlayerRtp | SubscribeType::PlayerWebrtc => { - packet_senders.lock().await.remove(&info.id); + + if let Err(err) = result_sender.send(statistic_sender.clone()) { + log::error!( + "receive_event_loop:send statistic send err :{:?} ", + err + ) } - _ => { - frame_senders.lock().await.remove(&info.id); + + let mut statistics_data = statistics_data.lock().await; + statistics_data.subscriber_count += 1; + } + TransceiverEvent::UnSubscribe { info } => { + match info.sub_type { + SubscribeType::PlayerRtp | SubscribeType::PlayerWebrtc => { + packet_senders.lock().await.remove(&info.id); + } + _ => { + frame_senders.lock().await.remove(&info.id); + } } - }, - TransmitterEvent::UnPublish {} => { + let mut statistics_data = statistics_data.lock().await; + let subscribers = &mut statistics_data.subscribers; + subscribers.remove(&info.id); + + statistics_data.subscriber_count -= 1; + } + TransceiverEvent::UnPublish {} => { if let Err(err) = exit.send(()) { log::error!("TransmitterEvent::UnPublish send error: {}", err); } break; } - TransmitterEvent::Api { sender } => { - if let Some(avstatistic_data) = - stream_handler.get_statistic_data().await - { - if let Err(err) = sender.send(avstatistic_data) { - log::info!("Transmitter send avstatistic data err: {}", err); - } + TransceiverEvent::Api { sender, uuid } => { + log::info!("api: stream identifier: {:?}", uuid); + let statistic_data = if let Some(uid) = uuid { + statistics_data.lock().await.query_by_uuid(uid) + } else { + log::info!("api2: stream identifier: {:?}", statistics_data); + statistics_data.lock().await.clone() + }; + + if let Err(err) = sender.send(statistic_data) { + log::info!("Transmitter send avstatistic data err: {}", err); } } - TransmitterEvent::Request { sender } => { + TransceiverEvent::Request { sender } => { stream_handler.send_information(sender).await; } } @@ -255,30 +469,44 @@ impl Transmitter { .await; } + Self::receive_statistics_data_loop( + tx.subscribe(), + tx.subscribe(), + self.statistic_data_receiver, + self.statistic_data.clone(), + ) + .await; + Self::receive_event_loop( self.stream_handler, tx, self.event_receiver, self.id_to_packet_sender, self.id_to_frame_sender, + self.statistic_data_sender, + self.statistic_data.clone(), ) .await; Ok(()) } + + pub fn get_statistics_data_sender(&self) -> StatisticDataSender { + self.statistic_data_sender.clone() + } } pub struct StreamsHub { - //app_name to stream_name to producer - streams: HashMap, + //stream identifier to transceiver event sender + streams: HashMap, //construct UnSubscribe and UnPublish event from Subscribe and Publish event to kick off client un_pub_sub_events: HashMap, - //event is consumed in Channels, produced from other rtmp sessions + //event is consumed in Stream hub, produced from other protocol sessions hub_event_receiver: StreamHubEventReceiver, - //event is produced from other rtmp sessions + //event is produced from other protocol sessions hub_event_sender: StreamHubEventSender, - //client_event_producer: client_event_producer - client_event_producer: BroadcastEventSender, + // + client_event_sender: BroadcastEventSender, //The rtmp static push/pull and the hls transfer is triggered actively, //add a control switches separately. rtmp_push_enabled: bool, @@ -301,7 +529,7 @@ impl StreamsHub { un_pub_sub_events: HashMap::new(), hub_event_receiver: event_consumer, hub_event_sender: event_producer, - client_event_producer: client_producer, + client_event_sender: client_producer, rtmp_push_enabled: false, rtmp_pull_enabled: false, rtmp_remuxer_enabled: false, @@ -334,7 +562,7 @@ impl StreamsHub { } pub fn get_client_event_consumer(&mut self) -> BroadcastEventReceiver { - self.client_event_producer.subscribe() + self.client_event_sender.subscribe() } pub async fn event_loop(&mut self) { @@ -397,14 +625,14 @@ impl StreamsHub { .publish(identifier.clone(), receiver, stream_handler) .await { - Ok(()) => { + Ok(statistic_data_sender) => { if let Some(notifier) = &self.notifier { notifier.on_publish_notify(event_serialize_str).await; } self.un_pub_sub_events .insert(info.id, StreamHubEvent::UnPublish { identifier, info }); - Ok((frame_sender, packet_sender)) + Ok((frame_sender, packet_sender, Some(statistic_data_sender))) } Err(err) => { log::error!("event_loop Publish err: {}", err); @@ -470,14 +698,14 @@ impl StreamsHub { }; let rv = match self.subscribe(&identifier, info_clone, sender).await { - Ok(()) => { + Ok(statistic_data_sender) => { if let Some(notifier) = &self.notifier { notifier.on_play_notify(event_serialize_str).await; } self.un_pub_sub_events .insert(sub_id, StreamHubEvent::UnSubscribe { identifier, info }); - Ok(receiver) + Ok((receiver, Some(statistic_data_sender))) } Err(err) => { log::error!("event_loop Subscribe error: {}", err); @@ -498,10 +726,21 @@ impl StreamsHub { } StreamHubEvent::ApiStatistic { - data_sender, - size_sender, + top_n, + identifier, + uuid, + result_sender, } => { - if let Err(err) = self.api_statistic(data_sender, size_sender) { + log::info!("api_statistic1: stream identifier: {:?}", identifier); + let result = match self.api_statistic(top_n, identifier, uuid).await { + Ok(rv) => rv, + Err(err) => { + log::error!("event_loop api error: {}", err); + json!(err.to_string()) + } + }; + + if let Err(err) = result_sender.send(result) { log::error!("event_loop api error: {}", err); } } @@ -525,7 +764,7 @@ impl StreamsHub { sender: mpsc::UnboundedSender, ) -> Result<(), StreamHubError> { if let Some(producer) = self.streams.get_mut(identifier) { - let event = TransmitterEvent::Request { sender }; + let event = TransceiverEvent::Request { sender }; log::info!("Request: stream identifier: {}", identifier); producer.send(event).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, @@ -534,32 +773,65 @@ impl StreamsHub { Ok(()) } - fn api_statistic( + async fn api_statistic( &mut self, - data_sender: AvStatisticSender, - size_sender: StreamStatisticSizeSender, - ) -> Result<(), StreamHubError> { - let mut stream_count: usize = 0; - for v in self.streams.values() { - stream_count += 1; - if let Err(err) = v.send(TransmitterEvent::Api { - sender: data_sender.clone(), - }) { - log::error!("TransmitterEvent api send data err: {}", err); - return Err(StreamHubError { + top_n: Option, + identifier: Option, + uuid: Option, + ) -> Result { + if self.streams.is_empty() { + return Ok(json!({})); + } + log::info!("api_statistic: stream identifier: {:?}", identifier); + let (stream_sender, mut stream_receiver) = mpsc::unbounded_channel(); + + let mut stream_count: usize = 1; + + if let Some(identifier) = identifier { + if let Some(event_sender) = self.streams.get_mut(&identifier) { + let event = TransceiverEvent::Api { + sender: stream_sender.clone(), + uuid, + }; + log::info!("api_statistic: stream identifier: {}", identifier); + event_sender.send(event).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, - }); + })?; + } + } else { + stream_count = self.streams.len(); + for v in self.streams.values() { + if let Err(err) = v.send(TransceiverEvent::Api { + sender: stream_sender.clone(), + uuid, + }) { + log::error!("TransmitterEvent api send data err: {}", err); + return Err(StreamHubError { + value: StreamHubErrorValue::SendError, + }); + } } } - if let Err(err) = size_sender.send(stream_count) { - log::error!("TransmitterEvent api send size err: {}", err); - return Err(StreamHubError { - value: StreamHubErrorValue::SendError, - }); + let mut data = Vec::new(); + + loop { + log::info!("api_statistic: stream count: {}", stream_count); + if let Some(stream_statistics) = stream_receiver.recv().await { + data.push(stream_statistics); + } + if data.len() == stream_count { + break; + } } - Ok(()) + if let Some(topn) = top_n { + data.sort_by(|a, b| b.subscriber_count.cmp(&a.subscriber_count)); + let top_streams: Vec = data.into_iter().take(topn).collect(); + return Ok(serde_json::to_value(top_streams)?); + } + + Ok(serde_json::to_value(data)?) } fn api_kick_off_client(&mut self, uid: Uuid) -> Result<(), StreamHubError> { @@ -608,18 +880,20 @@ impl StreamsHub { identifer: &StreamIdentifier, sub_info: SubscriberInfo, sender: DataSender, - ) -> Result<(), StreamHubError> { - if let Some(producer) = self.streams.get_mut(identifer) { - let event = TransmitterEvent::Subscribe { + ) -> Result { + if let Some(event_sender) = self.streams.get_mut(identifer) { + let (result_sender, result_receiver) = oneshot::channel(); + let event = TransceiverEvent::Subscribe { sender, info: sub_info, + result_sender, }; log::info!("subscribe: stream identifier: {}", identifer); - producer.send(event).map_err(|_| StreamHubError { + event_sender.send(event).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; - return Ok(()); + return Ok(result_receiver.await?); } if self.rtmp_pull_enabled { @@ -630,7 +904,7 @@ impl StreamsHub { }; //send subscribe info to pull clients - self.client_event_producer + self.client_event_sender .send(client_event) .map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, @@ -650,7 +924,7 @@ impl StreamsHub { match self.streams.get_mut(identifer) { Some(producer) => { log::info!("unsubscribe....:{}", identifer); - let event = TransmitterEvent::UnSubscribe { info: sub_info }; + let event = TransceiverEvent::UnSubscribe { info: sub_info }; producer.send(event).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; @@ -672,48 +946,50 @@ impl StreamsHub { identifier: StreamIdentifier, receiver: DataReceiver, handler: Arc, - ) -> Result<(), StreamHubError> { + ) -> Result { if self.streams.get(&identifier).is_some() { return Err(StreamHubError { value: StreamHubErrorValue::Exists, }); } - let (event_publisher, event_consumer) = mpsc::unbounded_channel(); - let transmitter = Transmitter::new(receiver, event_consumer, handler); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); + let transceiver = + StreamDataTransceiver::new(receiver, event_receiver, identifier.clone(), handler); + let statistic_data_sender = transceiver.get_statistics_data_sender(); let identifier_clone = identifier.clone(); - if let Err(err) = transmitter.run().await { + if let Err(err) = transceiver.run().await { log::error!( - "transmiter run error, idetifier: {}, error: {}", + "transceiver run error, idetifier: {}, error: {}", identifier_clone, err, ); } else { - log::info!("transmiter exits: idetifier: {}", identifier_clone); + log::info!("transceiver run success, idetifier: {}", identifier_clone); } - self.streams.insert(identifier.clone(), event_publisher); + self.streams.insert(identifier.clone(), event_sender); if self.rtmp_push_enabled || self.hls_enabled || self.rtmp_remuxer_enabled { let client_event = BroadcastEvent::Publish { identifier }; //send publish info to push clients - self.client_event_producer + self.client_event_sender .send(client_event) .map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; } - Ok(()) + Ok(statistic_data_sender) } fn unpublish(&mut self, identifier: &StreamIdentifier) -> Result<(), StreamHubError> { match self.streams.get_mut(identifier) { Some(producer) => { - let event = TransmitterEvent::UnPublish {}; + let event = TransceiverEvent::UnPublish {}; producer.send(event).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; diff --git a/library/streamhub/src/statistics/avstatistics.rs b/library/streamhub/src/statistics/avstatistics.rs deleted file mode 100644 index ba67e676..00000000 --- a/library/streamhub/src/statistics/avstatistics.rs +++ /dev/null @@ -1,130 +0,0 @@ -use crate::stream::StreamIdentifier; - -use { - super::StreamStatistics, - std::{sync::Arc, time::Duration}, - tokio::{ - sync::{ - mpsc, - mpsc::{Receiver, Sender}, - Mutex, - }, - time, - }, - xflv::{ - define, - define::{aac_packet_type, AvcCodecId, SoundFormat}, - mpeg4_aac::Mpeg4Aac, - mpeg4_avc::Mpeg4Avc, - }, -}; - -pub struct AvStatistics { - /*used to calculate video bitrate */ - video_bytes: Arc>, - /*used to calculate audio bitrate */ - audio_bytes: Arc>, - //used to calculate frame rate - frame_count: Arc>, - //used to calculate GOP - gop_frame_count: Arc>, - stream_statistics: Arc>, - pub sender: Sender, -} - -impl AvStatistics { - pub fn new(identifier: StreamIdentifier) -> Self { - let (s, _): (Sender, Receiver) = mpsc::channel(1); - Self { - video_bytes: Arc::new(Mutex::new(0.0)), - audio_bytes: Arc::new(Mutex::new(0.0)), - frame_count: Arc::new(Mutex::new(0)), - gop_frame_count: Arc::new(Mutex::new(0)), - stream_statistics: Arc::new(Mutex::new(StreamStatistics::new(identifier))), - sender: s, - } - } - - pub async fn notify_audio_codec_info(&mut self, codec_info: &Mpeg4Aac) { - let audio_info = &mut self.stream_statistics.lock().await.audio; - audio_info.profile = define::u8_2_aac_profile(codec_info.object_type); - audio_info.samplerate = codec_info.sampling_frequency; - audio_info.sound_format = SoundFormat::AAC; - audio_info.channels = codec_info.channels; - } - - pub async fn notify_video_codec_info(&mut self, codec_info: &Mpeg4Avc) { - let video_info = &mut self.stream_statistics.lock().await.video; - video_info.codec = AvcCodecId::H264; - video_info.profile = define::u8_2_avc_profile(codec_info.profile); - video_info.level = define::u8_2_avc_level(codec_info.level); - video_info.height = codec_info.height; - video_info.width = codec_info.width; - } - - pub async fn notify_audio_statistics_info(&mut self, data_size: usize, aac_packet_type: u8) { - match aac_packet_type { - aac_packet_type::AAC_RAW => { - *self.audio_bytes.lock().await += data_size as f32; - } - aac_packet_type::AAC_SEQHDR => {} - _ => {} - } - } - - pub async fn notify_video_statistics_info(&mut self, data_size: usize, is_key_frame: bool) { - *self.video_bytes.lock().await += data_size as f32; - *self.frame_count.lock().await += 1; - if is_key_frame { - let video_info = &mut self.stream_statistics.lock().await.video; - video_info.gop = *self.gop_frame_count.lock().await; - *self.gop_frame_count.lock().await = 0; - } else { - *self.gop_frame_count.lock().await += 1; - } - } - - pub fn start(&mut self) { - let mut interval = time::interval(Duration::from_secs(1)); - - let video_bytes_clone = self.video_bytes.clone(); - let audio_bytes_clone = self.audio_bytes.clone(); - let frame_count_clone = self.frame_count.clone(); - let stream_statistics_clone = self.stream_statistics.clone(); - - let (s, mut r): (Sender, Receiver) = mpsc::channel(1); - self.sender = s; - - tokio::spawn(async move { - loop { - tokio::select! { - _ = interval.tick() => { - { - let stream_statistics = &mut stream_statistics_clone.lock().await; - let audio_info = &mut stream_statistics.audio; - audio_info.bitrate = *audio_bytes_clone.lock().await * 8.0/1000.0; - - let video_info = &mut stream_statistics.video; - video_info.bitrate = *video_bytes_clone.lock().await * 8.0/1000.0; - video_info.frame_rate = *frame_count_clone.lock().await; - } - *video_bytes_clone.lock().await = 0.0; - *audio_bytes_clone.lock().await = 0.0; - *frame_count_clone.lock().await = 0; - // if let Ok(strinfo) = serde_json::to_string(&*stream_statistics_clone.lock().await) { - // // log::info!("stream_info: {strinfo}"); - // } - }, - _ = r.recv() =>{ - log::info!("avstatistics shutting down"); - return - }, - } - } - }); - } - - pub async fn get_avstatistic_data(&self) -> StreamStatistics { - self.stream_statistics.lock().await.clone() - } -} diff --git a/library/streamhub/src/statistics/mod.rs b/library/streamhub/src/statistics/mod.rs index 935c9761..6ef5d4c5 100644 --- a/library/streamhub/src/statistics/mod.rs +++ b/library/streamhub/src/statistics/mod.rs @@ -1,40 +1,79 @@ -pub mod avstatistics; - use { super::stream::StreamIdentifier, + crate::{define::SubscribeType, utils::Uuid}, + chrono::{DateTime, Local}, serde::Serialize, + std::{collections::HashMap, sync::Arc, time::Duration}, + tokio::{ + sync::{broadcast::Receiver, Mutex}, + time, + }, xflv::define::{AacProfile, AvcCodecId, AvcLevel, AvcProfile, SoundFormat}, }; #[derive(Debug, Clone, Serialize, Default)] pub struct VideoInfo { - codec: AvcCodecId, - profile: AvcProfile, - level: AvcLevel, - width: u32, - height: u32, + pub codec: AvcCodecId, + pub profile: AvcProfile, + pub level: AvcLevel, + pub width: u32, + pub height: u32, + /*used for caculate the bitrate*/ + #[serde(skip_serializing)] + pub recv_bytes: usize, #[serde(rename = "bitrate(kbits/s)")] - bitrate: f32, - frame_rate: usize, - gop: usize, + pub bitrate: usize, + /*used for caculate the frame rate*/ + #[serde(skip_serializing)] + pub recv_frame_count: usize, + pub frame_rate: usize, + /*used for caculate the GOP*/ + #[serde(skip_serializing)] + pub recv_frame_count_for_gop: usize, + pub gop: usize, } #[derive(Debug, Clone, Serialize, Default)] pub struct AudioInfo { - sound_format: SoundFormat, - profile: AacProfile, - samplerate: u32, - channels: u8, + pub sound_format: SoundFormat, + pub profile: AacProfile, + pub samplerate: u32, + pub channels: u8, + /*used for caculate the bitrate*/ + #[serde(skip_serializing)] + pub recv_bytes: usize, #[serde(rename = "bitrate(kbits/s)")] - bitrate: f32, + pub bitrate: usize, +} +#[derive(Debug, Clone, Serialize, Default)] +pub struct StatisticsStream { + /*publisher infomation */ + pub publisher: StatisticPublisher, + /*subscriber infomation */ + pub subscribers: HashMap, + /*How many clients are subscribing to this stream.*/ + pub subscriber_count: usize, + /*calculate upstream traffic, now equals audio and video traffic received by this server*/ + pub total_recv_bytes: usize, + /*calculate downstream traffic, now equals audio and video traffic sent to all subscribers*/ + pub total_send_bytes: usize, } #[derive(Debug, Clone, Serialize, Default)] -pub struct StreamStatistics { +pub struct StatisticPublisher { + pub id: Uuid, identifier: StreamIdentifier, + pub start_time: DateTime, pub video: VideoInfo, pub audio: AudioInfo, + pub remote_address: String, + /*used for caculate the recv_bitrate*/ + #[serde(skip_serializing)] + pub recv_bytes: usize, + /*the bitrate at which the server receives streaming data*/ + #[serde(rename = "recv_bitrate(kbits/s)")] + pub recv_bitrate: usize, } -impl StreamStatistics { +impl StatisticPublisher { pub fn new(identifier: StreamIdentifier) -> Self { Self { identifier, @@ -42,3 +81,98 @@ impl StreamStatistics { } } } +#[derive(Debug, Clone, Serialize)] +pub struct StatisticSubscriber { + pub id: Uuid, + pub start_time: DateTime, + pub remote_address: String, + pub sub_type: SubscribeType, + /*used for caculate the send_bitrate*/ + #[serde(skip_serializing)] + pub send_bytes: usize, + /*the bitrate at which the server send streaming data to a client*/ + #[serde(rename = "send_bitrate(kbits/s)")] + pub send_bitrate: usize, + #[serde(rename = "total_send_bytes(kbits/s)")] + pub total_send_bytes: usize, +} + +impl StatisticsStream { + pub fn new(identifier: StreamIdentifier) -> Self { + Self { + publisher: StatisticPublisher::new(identifier), + ..Default::default() + } + } + + fn get_publisher(&self) -> StatisticsStream { + let mut statistic_stream = self.clone(); + statistic_stream.subscribers.clear(); + statistic_stream + } + + fn get_subscriber(&self, uuid: Uuid) -> StatisticsStream { + let mut statistic_stream = self.clone(); + statistic_stream.subscribers.retain(|&id, _| uuid == id); + statistic_stream + } + + pub fn query_by_uuid(&self, uuid: Uuid) -> StatisticsStream { + if uuid == self.publisher.id { + self.get_publisher() + } else { + self.get_subscriber(uuid) + } + } +} + +pub struct StatisticsCaculate { + stream: Arc>, + exit: Receiver<()>, +} + +impl StatisticsCaculate { + pub fn new(stream: Arc>, exit: Receiver<()>) -> Self { + Self { stream, exit } + } + + async fn caculate(&mut self) { + let stream_statistics_clone = &mut self.stream.lock().await; + + stream_statistics_clone.publisher.video.bitrate = + stream_statistics_clone.publisher.video.recv_bytes * 8 / 5000; + stream_statistics_clone.publisher.video.recv_bytes = 0; + + stream_statistics_clone.publisher.video.frame_rate = + stream_statistics_clone.publisher.video.recv_frame_count / 5; + stream_statistics_clone.publisher.video.recv_frame_count = 0; + + stream_statistics_clone.publisher.audio.bitrate = + stream_statistics_clone.publisher.audio.recv_bytes * 8 / 5000; + stream_statistics_clone.publisher.audio.recv_bytes = 0; + + stream_statistics_clone.publisher.recv_bitrate = + stream_statistics_clone.publisher.recv_bytes * 8 / 5000; + stream_statistics_clone.publisher.recv_bytes = 0; + + for (_, subscriber) in stream_statistics_clone.subscribers.iter_mut() { + subscriber.send_bitrate = subscriber.send_bytes * 8 / 5000; + subscriber.send_bytes = 0; + } + } + pub async fn start(&mut self) { + let mut interval = time::interval(Duration::from_secs(5)); + + loop { + tokio::select! { + _ = interval.tick() => { + self.caculate().await; + }, + _ = self.exit.recv() => { + log::info!("avstatistics shutting down"); + return + }, + } + } + } +} diff --git a/library/streamhub/src/stream.rs b/library/streamhub/src/stream.rs index 38321c1d..0c0a161a 100644 --- a/library/streamhub/src/stream.rs +++ b/library/streamhub/src/stream.rs @@ -1,17 +1,18 @@ -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::fmt; -#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Default)] +#[derive(Debug, Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Default)] pub enum StreamIdentifier { #[default] Unkonwn, + #[serde(rename = "rtmp")] Rtmp { app_name: String, stream_name: String, }, - Rtsp { - stream_path: String, - }, + #[serde(rename = "rtsp")] + Rtsp { stream_path: String }, + #[serde(rename = "webrtc")] WebRTC { app_name: String, stream_name: String, diff --git a/library/streamhub/src/utils.rs b/library/streamhub/src/utils.rs index 6005c11d..6c6dd621 100644 --- a/library/streamhub/src/utils.rs +++ b/library/streamhub/src/utils.rs @@ -1,4 +1,5 @@ use rand::Rng; +use serde::{Serialize, Serializer}; use std::fmt; use std::time::SystemTime; @@ -33,6 +34,15 @@ fn u8_to_enum(digit: u8) -> RandomDigitCount { } } +impl Serialize for Uuid { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} + impl Uuid { pub fn from_str2(uuid: &str) -> Option { let length = uuid.len(); @@ -109,6 +119,8 @@ impl fmt::Display for Uuid { #[cfg(test)] mod tests { + + use super::Uuid; #[test] @@ -117,6 +129,10 @@ mod tests { let s = id.to_string(); + let serialized = serde_json::to_string(&id).unwrap(); + + println!("serialized:{serialized}"); + println!("{s}"); if let Some(u) = Uuid::from_str2(&s) { diff --git a/protocol/hls/CHANGELOG.md b/protocol/hls/CHANGELOG.md index df232463..4a6de328 100644 --- a/protocol/hls/CHANGELOG.md +++ b/protocol/hls/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.5.3] - 2021-03-15 +- Upgrade failure library. +- Some changes for statistics feature. + ## [0.5.2] - 2021-02-28 - Refactor: remove the dependecy of HLS on RTMP. diff --git a/protocol/hls/Cargo.toml b/protocol/hls/Cargo.toml index 908c0c86..54779c38 100644 --- a/protocol/hls/Cargo.toml +++ b/protocol/hls/Cargo.toml @@ -13,7 +13,7 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" axum = { version = "0.7.4" } tokio-util = { version = "0.6.5", features = ["codec"] } diff --git a/protocol/hls/src/flv_data_receiver.rs b/protocol/hls/src/flv_data_receiver.rs index ff24f5ff..c923d263 100644 --- a/protocol/hls/src/flv_data_receiver.rs +++ b/protocol/hls/src/flv_data_receiver.rs @@ -128,7 +128,7 @@ impl FlvDataReceiver { }); } - let receiver = event_result_receiver.await??.frame_receiver.unwrap(); + let receiver = event_result_receiver.await??.0.frame_receiver.unwrap(); self.data_consumer = receiver; diff --git a/protocol/httpflv/CHANGELOG.md b/protocol/httpflv/CHANGELOG.md index b7b00ffd..7da78ad2 100644 --- a/protocol/httpflv/CHANGELOG.md +++ b/protocol/httpflv/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.4.3] - 2021-03-15 +- Upgrade failure library. +- Support querying more detailed statistic data. + ## [0.4.2] - 2021-02-28 - Refactor: remove the dependency of HTTP-FLV on RTMP. diff --git a/protocol/httpflv/Cargo.toml b/protocol/httpflv/Cargo.toml index cb644a35..e2da0043 100644 --- a/protocol/httpflv/Cargo.toml +++ b/protocol/httpflv/Cargo.toml @@ -13,10 +13,11 @@ edition = "2018" [dependencies] byteorder = "1.4.2" bytes = "1.0.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" axum = { version = "0.7.4" } futures = "0.3" +chrono = "0.4" streamhub = { path = "../../library/streamhub/" } xflv = { path = "../../library/container/flv/" } diff --git a/protocol/httpflv/src/httpflv.rs b/protocol/httpflv/src/httpflv.rs index 251715e4..a6d490a2 100644 --- a/protocol/httpflv/src/httpflv.rs +++ b/protocol/httpflv/src/httpflv.rs @@ -1,3 +1,4 @@ +use streamhub::define::{StatisticData, StatisticDataSender}; use tokio::sync::oneshot; use { super::{ @@ -26,7 +27,9 @@ pub struct HttpFlv { muxer: FlvMuxer, event_producer: StreamHubEventSender, - data_consumer: FrameDataReceiver, + data_receiver: FrameDataReceiver, + /* now used for subscriber session */ + statistic_data_sender: Option, http_response_data_producer: HttpResponseDataProducer, subscriber_id: Uuid, request_url: String, @@ -42,14 +45,15 @@ impl HttpFlv { request_url: String, remote_addr: SocketAddr, ) -> Self { - let (_, data_consumer) = mpsc::unbounded_channel(); + let (_, data_receiver) = mpsc::unbounded_channel(); let subscriber_id = Uuid::new(RandomDigitCount::Four); Self { app_name, stream_name, muxer: FlvMuxer::new(), - data_consumer, + data_receiver, + statistic_data_sender: None, event_producer, http_response_data_producer, subscriber_id, @@ -73,7 +77,7 @@ impl HttpFlv { let mut retry_count = 0; //write flv body loop { - if let Some(data) = self.data_consumer.recv().await { + if let Some(data) = self.data_receiver.recv().await { if let Err(err) = self.write_flv_tag(data) { if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value { if err_in.is_disconnected() { @@ -100,8 +104,37 @@ impl HttpFlv { pub fn write_flv_tag(&mut self, channel_data: FrameData) -> Result<(), HttpFLvError> { let (common_data, common_timestamp, tag_type) = match channel_data { - FrameData::Audio { timestamp, data } => (data, timestamp, tag_type::AUDIO), - FrameData::Video { timestamp, data } => (data, timestamp, tag_type::VIDEO), + FrameData::Audio { timestamp, data } => { + if let Some(sender) = &self.statistic_data_sender { + let statistic_audio_data = StatisticData::Audio { + uuid: Some(self.subscriber_id), + aac_packet_type: 1, + data_size: data.len(), + duration: 0, + }; + if let Err(err) = sender.send(statistic_audio_data) { + log::error!("send statistic data err: {}", err); + } + } + + (data, timestamp, tag_type::AUDIO) + } + FrameData::Video { timestamp, data } => { + if let Some(sender) = &self.statistic_data_sender { + let statistic_video_data = StatisticData::Video { + uuid: Some(self.subscriber_id), + frame_count: 1, + is_key_frame: None, + data_size: data.len(), + duration: 0, + }; + if let Err(err) = sender.send(statistic_video_data) { + log::error!("send statistic data err: {}", err); + } + } + + (data, timestamp, tag_type::VIDEO) + } FrameData::MetaData { timestamp, data } => { //remove @setDataFrame from RTMP's metadata let mut amf_writer: Amf0Writer = Amf0Writer::new(); @@ -194,8 +227,22 @@ impl HttpFlv { }); } - let receiver = event_result_receiver.await??.frame_receiver.unwrap(); - self.data_consumer = receiver; + let result_receiver = event_result_receiver.await??; + let receiver = result_receiver.0.frame_receiver.unwrap(); + self.data_receiver = receiver; + self.statistic_data_sender = result_receiver.1; + + if let Some(sender) = &self.statistic_data_sender { + let statistic_subscriber = StatisticData::Subscriber { + id: self.subscriber_id, + remote_addr: self.remote_addr.to_string(), + start_time: chrono::Local::now(), + sub_type: SubscribeType::PlayerHttpFlv, + }; + if let Err(err) = sender.send(statistic_subscriber) { + log::error!("send statistic_subscriber err: {}", err); + } + } Ok(()) } diff --git a/protocol/rtmp/CHANGELOG.md b/protocol/rtmp/CHANGELOG.md index 0a60c8e9..8ed9e3a3 100644 --- a/protocol/rtmp/CHANGELOG.md +++ b/protocol/rtmp/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.6.3] - 2021-03-15 +- Upgrade failure library. +- Support querying more detailed statistic data. + ## [0.6.2] - 2021-02-29 - Move the amf0 mod to FLV library. diff --git a/protocol/rtmp/Cargo.toml b/protocol/rtmp/Cargo.toml index 4f766e90..2144bc4c 100644 --- a/protocol/rtmp/Cargo.toml +++ b/protocol/rtmp/Cargo.toml @@ -14,7 +14,7 @@ edition = "2018" byteorder = "1.4.2" bytes = "1.0.0" rand = "0.3" -failure = "0.1.1" +failure = "0.1.8" hmac = "0.11.0" sha2 = "0.9" # uuid = { version = "0.6.5", features = ["v4"] } diff --git a/protocol/rtmp/src/cache/mod.rs b/protocol/rtmp/src/cache/mod.rs index b43e060f..05867f9e 100644 --- a/protocol/rtmp/src/cache/mod.rs +++ b/protocol/rtmp/src/cache/mod.rs @@ -9,9 +9,7 @@ use { errors::CacheError, gop::Gop, std::collections::VecDeque, - streamhub::define::FrameData, - streamhub::statistics::avstatistics::AvStatistics, - streamhub::stream::StreamIdentifier, + streamhub::define::{FrameData, StatisticData, StatisticDataSender}, xflv::{ define, flv_tag_header::{AudioTagHeader, VideoTagHeader}, @@ -30,23 +28,13 @@ pub struct Cache { audio_seq: BytesMut, audio_timestamp: u32, gops: Gops, - pub av_statistics: AvStatistics, -} - -impl Drop for Cache { - #[allow(unused_must_use)] - fn drop(&mut self) { - self.av_statistics.sender.send(true); - } + statistic_data_sender: Option, } impl Cache { - pub fn new(app_name: String, stream_name: String, gop_num: usize) -> Self { - let identifier = StreamIdentifier::Rtmp { - app_name, - stream_name, - }; - let mut cache = Cache { + pub fn new(gop_num: usize, statistic_data_sender: Option) -> Self { + + Cache { metadata: metadata::MetaData::new(), metadata_timestamp: 0, video_seq: BytesMut::new(), @@ -54,10 +42,8 @@ impl Cache { audio_seq: BytesMut::new(), audio_timestamp: 0, gops: Gops::new(gop_num), - av_statistics: AvStatistics::new(identifier), - }; - cache.av_statistics.start(); - cache + statistic_data_sender, + } } //, values: Vec @@ -98,18 +84,36 @@ impl Cache { self.audio_seq = chunk_body.clone(); self.audio_timestamp = timestamp; - let mut aac_processor = Mpeg4AacProcessor::default(); - let aac = aac_processor - .extend_data(reader.extract_remaining_bytes()) - .audio_specific_config_load()?; - self.av_statistics - .notify_audio_codec_info(&aac.mpeg4_aac) - .await; + if let Some(statistic_data_sender) = &self.statistic_data_sender { + let mut aac_processor = Mpeg4AacProcessor::default(); + + let aac = aac_processor + .extend_data(reader.extract_remaining_bytes()) + .audio_specific_config_load()?; + + let statistic_audio_codec = StatisticData::AudioCodec { + sound_format: define::SoundFormat::AAC, + profile: define::u8_2_aac_profile(aac.mpeg4_aac.object_type), + samplerate: aac.mpeg4_aac.sampling_frequency, + channels: aac.mpeg4_aac.channels, + }; + if let Err(err) = statistic_data_sender.send(statistic_audio_codec) { + log::error!("send statistic_data err: {}", err); + } + } } - self.av_statistics - .notify_audio_statistics_info(chunk_body.len(), tag_header.aac_packet_type) - .await; + if let Some(statistic_data_sender) = &self.statistic_data_sender { + let statistic_audio_data = StatisticData::Audio { + uuid: None, + data_size: chunk_body.len(), + aac_packet_type: tag_header.aac_packet_type, + duration: 0, + }; + if let Err(err) = statistic_data_sender.send(statistic_audio_data) { + log::error!("send statistic_data err: {}", err); + } + } Ok(()) } @@ -151,21 +155,39 @@ impl Cache { self.gops.save_frame_data(channel_data, is_key_frame); if is_key_frame && tag_header.avc_packet_type == define::avc_packet_type::AVC_SEQHDR { - let mut avc_processor = Mpeg4AvcProcessor::default(); - avc_processor.decoder_configuration_record_load(&mut reader)?; - - self.av_statistics - .notify_video_codec_info(&avc_processor.mpeg4_avc) - .await; - self.video_seq = chunk_body.clone(); self.video_timestamp = timestamp; - } - self.av_statistics - .notify_video_statistics_info(chunk_body.len(), is_key_frame) - .await; + if let Some(statistic_data_sender) = &self.statistic_data_sender { + let mut avc_processor = Mpeg4AvcProcessor::default(); + avc_processor.decoder_configuration_record_load(&mut reader)?; + + let statistic_video_codec = StatisticData::VideoCodec { + codec: define::AvcCodecId::H264, + profile: define::u8_2_avc_profile(avc_processor.mpeg4_avc.profile), + level: define::u8_2_avc_level(avc_processor.mpeg4_avc.level), + width: avc_processor.mpeg4_avc.width, + height: avc_processor.mpeg4_avc.height, + }; + if let Err(err) = statistic_data_sender.send(statistic_video_codec) { + log::error!("send statistic_data err: {}", err); + } + } + } + if let Some(statistic_data_sender) = &self.statistic_data_sender { + let statistic_video_data = StatisticData::Video { + uuid: None, + data_size: chunk_body.len(), + frame_count: 1, + is_key_frame: Some(is_key_frame), + duration: 0, + }; + + if let Err(err) = statistic_data_sender.send(statistic_video_data) { + log::error!("send statistic_data err: {}", err); + } + } Ok(()) } diff --git a/protocol/rtmp/src/relay/push_client.rs b/protocol/rtmp/src/relay/push_client.rs index 5adcfd32..97c00c67 100644 --- a/protocol/rtmp/src/relay/push_client.rs +++ b/protocol/rtmp/src/relay/push_client.rs @@ -53,8 +53,8 @@ impl PushClient { stream, ClientType::Publish, self.address.clone(), - app_name.clone(), - stream_name.clone(), + app_name, + stream_name, self.channel_event_producer.clone(), 0, ); diff --git a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs index 743d751c..b55f47e6 100644 --- a/protocol/rtmp/src/remuxer/rtsp2rtmp.rs +++ b/protocol/rtmp/src/remuxer/rtsp2rtmp.rs @@ -31,7 +31,6 @@ pub struct Rtsp2RtmpRemuxerSession { app_name: String, stream_name: String, - publishe_id: Uuid, //RTSP data_receiver: FrameDataReceiver, stream_path: String, @@ -73,7 +72,7 @@ impl Rtsp2RtmpRemuxerSession { event_producer: event_producer.clone(), subscribe_id: Uuid::new(RandomDigitCount::Four), - publishe_id: Uuid::new(RandomDigitCount::Four), + video_clock_rate: 1000, audio_clock_rate: 1000, base_audio_timestamp: 0, @@ -93,30 +92,20 @@ impl Rtsp2RtmpRemuxerSession { pub async fn publish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .publish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.publishe_id, - 0, - ) + .publish_to_channels(self.app_name.clone(), self.stream_name.clone(), 0) .await?; Ok(()) } pub async fn unpublish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .unpublish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.publishe_id, - ) + .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) .await?; Ok(()) } pub async fn subscribe_rtsp(&mut self) -> Result<(), RtmpRemuxerError> { let (event_result_sender, event_result_receiver) = oneshot::channel(); - let sub_info = SubscriberInfo { id: self.subscribe_id, sub_type: SubscribeType::PlayerRtmp, @@ -141,7 +130,7 @@ impl Rtsp2RtmpRemuxerSession { }); } - let receiver = event_result_receiver.await??; + let receiver = event_result_receiver.await??.0; self.data_receiver = receiver.frame_receiver.unwrap(); Ok(()) } diff --git a/protocol/rtmp/src/remuxer/whip2rtmp.rs b/protocol/rtmp/src/remuxer/whip2rtmp.rs index fdcf7683..80684800 100644 --- a/protocol/rtmp/src/remuxer/whip2rtmp.rs +++ b/protocol/rtmp/src/remuxer/whip2rtmp.rs @@ -31,7 +31,6 @@ pub struct Whip2RtmpRemuxerSession { app_name: String, stream_name: String, - publishe_id: Uuid, //WHIP data_receiver: FrameDataReceiver, @@ -84,7 +83,6 @@ impl Whip2RtmpRemuxerSession { event_producer: event_producer.clone(), subscribe_id: Uuid::new(RandomDigitCount::Four), - publishe_id: Uuid::new(RandomDigitCount::Four), video_clock_rate: 1000, audio_clock_rate: 1000, base_audio_timestamp: 0, @@ -107,23 +105,14 @@ impl Whip2RtmpRemuxerSession { pub async fn publish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .publish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.publishe_id, - 1, - ) + .publish_to_channels(self.app_name.clone(), self.stream_name.clone(), 1) .await?; Ok(()) } pub async fn unpublish_rtmp(&mut self) -> Result<(), RtmpRemuxerError> { self.rtmp_handler - .unpublish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.publishe_id, - ) + .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) .await?; Ok(()) } @@ -156,7 +145,7 @@ impl Whip2RtmpRemuxerSession { }); } - let receiver = event_result_receiver.await??; + let receiver = event_result_receiver.await??.0; self.data_receiver = receiver.frame_receiver.unwrap(); Ok(()) } diff --git a/protocol/rtmp/src/session/client_session.rs b/protocol/rtmp/src/session/client_session.rs index ecbd7970..b285cadd 100644 --- a/protocol/rtmp/src/session/client_session.rs +++ b/protocol/rtmp/src/session/client_session.rs @@ -29,8 +29,6 @@ use { std::sync::Arc, //crate::utils::print::print, streamhub::define::StreamHubEventSender, - streamhub::utils::RandomDigitCount, - streamhub::utils::Uuid, tokio::{net::TcpStream, sync::Mutex}, xflv::amf0::Amf0ValueType, }; @@ -78,10 +76,6 @@ pub struct ClientSession { //stream name with parameters raw_stream_name: String, stream_name: String, - /* Used to mark the subscriber's the data producer - in channels and delete it from map when unsubscribe - is called. */ - session_id: Uuid, state: ClientSessionState, client_type: ClientType, sub_app_name: Option, @@ -110,8 +104,6 @@ impl ClientSession { let tcp_io: Box = Box::new(TcpIO::new(stream)); let net_io = Arc::new(Mutex::new(tcp_io)); - let subscriber_id = Uuid::new(RandomDigitCount::Four); - let packetizer = if client_type == ClientType::Publish { Some(ChunkPacketizer::new(Arc::clone(&net_io))) } else { @@ -130,7 +122,6 @@ impl ClientSession { app_name, raw_stream_name, stream_name, - session_id: subscriber_id, state: ClientSessionState::Handshake, client_type, sub_app_name: None, @@ -527,18 +518,13 @@ impl ClientSession { (&self.sub_app_name, &self.sub_stream_name) { self.common - .subscribe_from_channels( - app_name.clone(), - stream_name.clone(), - self.session_id, - ) + .subscribe_from_channels(app_name.clone(), stream_name.clone()) .await?; } else { self.common .subscribe_from_channels( self.app_name.clone(), self.stream_name.clone(), - self.session_id, ) .await?; } @@ -550,7 +536,6 @@ impl ClientSession { .publish_to_channels( self.app_name.clone(), self.stream_name.clone(), - self.session_id, self.gop_num, ) .await? diff --git a/protocol/rtmp/src/session/common.rs b/protocol/rtmp/src/session/common.rs index ced302d3..a2265544 100644 --- a/protocol/rtmp/src/session/common.rs +++ b/protocol/rtmp/src/session/common.rs @@ -1,4 +1,4 @@ -use streamhub::define::DataSender; +use streamhub::define::{DataSender, StatisticData, StatisticDataSender}; use tokio::sync::oneshot; use { @@ -27,7 +27,7 @@ use { SubscriberInfo, TStreamHandler, }, errors::{StreamHubError, StreamHubErrorValue}, - statistics::StreamStatistics, + statistics::StatisticsStream, stream::StreamIdentifier, utils::Uuid, }, @@ -35,6 +35,10 @@ use { }; pub struct Common { + /* Used to mark the subscriber's the data producer + in channels and delete it from map when unsubscribe + is called. */ + session_id: Uuid, //only Server Subscriber or Client Publisher needs to send out trunck data. packetizer: Option, @@ -49,6 +53,8 @@ pub struct Common { /*request URL from client*/ pub request_url: String, pub stream_handler: Arc, + /* now used for subscriber session */ + statistic_data_sender: Option, } impl Common { @@ -62,6 +68,7 @@ impl Common { let (init_producer, init_consumer) = mpsc::unbounded_channel(); Self { + session_id: Uuid::new(streamhub::utils::RandomDigitCount::Four), packetizer, data_sender: init_producer, @@ -72,6 +79,7 @@ impl Common { remote_addr, request_url: String::default(), stream_handler: Arc::new(RtmpStreamHandler::new()), + statistic_data_sender: None, //cache: None, } } @@ -81,10 +89,37 @@ impl Common { if let Some(data) = self.data_receiver.recv().await { match data { FrameData::Audio { timestamp, data } => { + let data_size = data.len(); self.send_audio(data, timestamp).await?; + + if let Some(sender) = &self.statistic_data_sender { + let statistic_audio_data = StatisticData::Audio { + uuid: Some(self.session_id), + aac_packet_type: 1, + data_size, + duration: 0, + }; + if let Err(err) = sender.send(statistic_audio_data) { + log::error!("send statistic_data err: {}", err); + } + } } FrameData::Video { timestamp, data } => { + let data_size = data.len(); self.send_video(data, timestamp).await?; + + if let Some(sender) = &self.statistic_data_sender { + let statistic_video_data = StatisticData::Video { + uuid: Some(self.session_id), + frame_count: 1, + data_size, + is_key_frame: None, + duration: 0, + }; + if let Err(err) = sender.send(statistic_video_data) { + log::error!("send statistic_data err: {}", err); + } + } } FrameData::MetaData { timestamp, data } => { self.send_metadata(data, timestamp).await?; @@ -243,7 +278,7 @@ impl Common { Ok(()) } - fn get_subscriber_info(&mut self, sub_id: Uuid) -> SubscriberInfo { + fn get_subscriber_info(&mut self) -> SubscriberInfo { let remote_addr = if let Some(addr) = self.remote_addr { addr.to_string() } else { @@ -256,7 +291,7 @@ impl Common { }; SubscriberInfo { - id: sub_id, + id: self.session_id, /*rtmp local client subscribe from local rtmp session and publish(relay) the rtmp steam to remote RTMP server*/ sub_type, @@ -268,7 +303,7 @@ impl Common { } } - fn get_publisher_info(&mut self, sub_id: Uuid) -> PublisherInfo { + fn get_publisher_info(&mut self) -> PublisherInfo { let remote_addr = if let Some(addr) = self.remote_addr { addr.to_string() } else { @@ -281,7 +316,7 @@ impl Common { }; PublisherInfo { - id: sub_id, + id: self.session_id, pub_type, pub_data_type: streamhub::define::PubDataType::Frame, notify_info: NotifyInfo { @@ -296,13 +331,12 @@ impl Common { &mut self, app_name: String, stream_name: String, - sub_id: Uuid, ) -> Result<(), SessionError> { log::info!( "subscribe_from_channels, app_name: {} stream_name: {} subscribe_id: {}", app_name, stream_name, - sub_id + self.session_id ); let identifier = StreamIdentifier::Rtmp { @@ -314,7 +348,7 @@ impl Common { let subscribe_event = StreamHubEvent::Subscribe { identifier, - info: self.get_subscriber_info(sub_id), + info: self.get_subscriber_info(), result_sender: event_result_sender, }; let rv = self.event_producer.send(subscribe_event); @@ -325,8 +359,24 @@ impl Common { }); } - let recv = event_result_receiver.await??; - self.data_receiver = recv.frame_receiver.unwrap(); + let result = event_result_receiver.await??; + self.data_receiver = result.0.frame_receiver.unwrap(); + + let statistic_data_sender: Option = result.1; + + if let Some(sender) = &statistic_data_sender { + let statistic_subscriber = StatisticData::Subscriber { + id: self.session_id, + remote_addr: self.remote_addr.unwrap().to_string(), + start_time: chrono::Local::now(), + sub_type: SubscribeType::PlayerRtmp, + }; + if let Err(err) = sender.send(statistic_subscriber) { + log::error!("send statistic_subscriber err: {}", err); + } + } + + self.statistic_data_sender = statistic_data_sender; Ok(()) } @@ -335,7 +385,6 @@ impl Common { &mut self, app_name: String, stream_name: String, - sub_id: Uuid, ) -> Result<(), SessionError> { let identifier = StreamIdentifier::Rtmp { app_name, @@ -344,7 +393,7 @@ impl Common { let subscribe_event = StreamHubEvent::UnSubscribe { identifier, - info: self.get_subscriber_info(sub_id), + info: self.get_subscriber_info(), }; if let Err(err) = self.event_producer.send(subscribe_event) { log::error!("unsubscribe_from_channels err {}", err); @@ -358,20 +407,18 @@ impl Common { &mut self, app_name: String, stream_name: String, - pub_id: Uuid, gop_num: usize, ) -> Result<(), SessionError> { - self.stream_handler - .set_cache(Cache::new(app_name.clone(), stream_name.clone(), gop_num)) - .await; - let (event_result_sender, event_result_receiver) = oneshot::channel(); + let info = self.get_publisher_info(); + let remote_addr = info.notify_info.remote_addr.clone(); + let publish_event = StreamHubEvent::Publish { identifier: StreamIdentifier::Rtmp { - app_name, - stream_name, + app_name: app_name.clone(), + stream_name: stream_name.clone(), }, - info: self.get_publisher_info(pub_id), + info, stream_handler: self.stream_handler.clone(), result_sender: event_result_sender, }; @@ -384,6 +431,23 @@ impl Common { let result = event_result_receiver.await??; self.data_sender = result.0.unwrap(); + + let statistic_data_sender: Option = result.2; + + if let Some(sender) = &statistic_data_sender { + let statistic_publisher = StatisticData::Publisher { + id: self.session_id, + remote_addr, + start_time: chrono::Local::now(), + }; + if let Err(err) = sender.send(statistic_publisher) { + log::error!("send statistic_publisher err: {}", err); + } + } + + self.stream_handler + .set_cache(Cache::new(gop_num, statistic_data_sender)) + .await; Ok(()) } @@ -391,7 +455,6 @@ impl Common { &mut self, app_name: String, stream_name: String, - pub_id: Uuid, ) -> Result<(), SessionError> { log::info!( "unpublish_to_channels, app_name:{}, stream_name:{}", @@ -403,7 +466,7 @@ impl Common { app_name: app_name.clone(), stream_name: stream_name.clone(), }, - info: self.get_publisher_info(pub_id), + info: self.get_publisher_info(), }; match self.event_producer.send(unpublish_event) { @@ -496,16 +559,19 @@ impl TStreamHandler for RtmpStreamHandler { }; if let Some(cache) = &mut *self.cache.lock().await { if let Some(meta_body_data) = cache.get_metadata() { + log::info!("send_prior_data: meta_body_data: "); sender.send(meta_body_data).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; } if let Some(audio_seq_data) = cache.get_audio_seq() { + log::info!("send_prior_data: audio_seq_data: ",); sender.send(audio_seq_data).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; } if let Some(video_seq_data) = cache.get_video_seq() { + log::info!("send_prior_data: video_seq_data:"); sender.send(video_seq_data).map_err(|_| StreamHubError { value: StreamHubErrorValue::SendError, })?; @@ -531,10 +597,10 @@ impl TStreamHandler for RtmpStreamHandler { Ok(()) } - async fn get_statistic_data(&self) -> Option { - if let Some(cache) = &mut *self.cache.lock().await { - return Some(cache.av_statistics.get_avstatistic_data().await); - } + async fn get_statistic_data(&self) -> Option { + //if let Some(cache) = &mut *self.cache.lock().await { + // return Some(cache.av_statistics.get_avstatistic_data().await); + //} None } diff --git a/protocol/rtmp/src/session/server_session.rs b/protocol/rtmp/src/session/server_session.rs index 2bd3d592..521ac232 100644 --- a/protocol/rtmp/src/session/server_session.rs +++ b/protocol/rtmp/src/session/server_session.rs @@ -29,10 +29,7 @@ use { commonlib::auth::Auth, indexmap::IndexMap, std::{sync::Arc, time::Duration}, - streamhub::{ - define::StreamHubEventSender, - utils::{RandomDigitCount, Uuid}, - }, + streamhub::define::StreamHubEventSender, tokio::{net::TcpStream, sync::Mutex}, xflv::amf0::Amf0ValueType, }; @@ -57,10 +54,6 @@ pub struct ServerSession { state: ServerSessionState, bytesio_data: BytesMut, has_remaing_data: bool, - /* Used to mark the subscriber's the data producer - in channels and delete it from map when unsubscribe - is called. */ - pub session_id: Uuid, connect_properties: ConnectProperties, pub common: Common, /*configure how many gops will be cached.*/ @@ -99,7 +92,7 @@ impl ServerSession { SessionType::Server, remote_addr, ), - session_id: Uuid::new(RandomDigitCount::Four), + bytesio_data: BytesMut::new(), has_remaing_data: false, connect_properties: ConnectProperties::default(), @@ -169,11 +162,7 @@ impl ServerSession { } Err(err) => { self.common - .unpublish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.session_id, - ) + .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) .await?; return Err(SessionError { @@ -205,11 +194,7 @@ impl ServerSession { Err(err) => { if let UnpackErrorValue::CannotParse = err.value { self.common - .unpublish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.session_id, - ) + .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) .await?; return Err(err)?; } @@ -225,11 +210,7 @@ impl ServerSession { Ok(_) => {} Err(err) => { self.common - .unsubscribe_from_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.session_id, - ) + .unsubscribe_from_channels(self.app_name.clone(), self.stream_name.clone()) .await?; return Err(err); } @@ -502,11 +483,7 @@ impl ServerSession { stream_id: &f64, ) -> Result<(), SessionError> { self.common - .unpublish_to_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.session_id, - ) + .unpublish_to_channels(self.app_name.clone(), self.stream_name.clone()) .await?; let mut netstream = NetStreamWriter::new(Arc::clone(&self.io)); @@ -664,11 +641,7 @@ impl ServerSession { /*Now it can update the request url*/ self.common.request_url = self.get_request_url(raw_stream_name); self.common - .subscribe_from_channels( - self.app_name.clone(), - self.stream_name.clone(), - self.session_id, - ) + .subscribe_from_channels(self.app_name.clone(), self.stream_name.clone()) .await?; self.state = ServerSessionState::Play; @@ -755,7 +728,6 @@ impl ServerSession { .publish_to_channels( self.app_name.clone(), self.stream_name.clone(), - self.session_id, self.gop_num, ) .await?; diff --git a/protocol/rtsp/CHANGELOG.md b/protocol/rtsp/CHANGELOG.md index 423fb3f7..cd528d90 100644 --- a/protocol/rtsp/CHANGELOG.md +++ b/protocol/rtsp/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.2.2] - 2021-03-15 +- Upgrade failure library. +- Some changes for statistics feature. +- Fix publishing RTSP stream error caused by network. +- Fix the issue that stopping the playback of RTSP stream leads to push(publish) failure. + ## [0.2.1] - 2021-02-29 - Reference the new version streamhub library. diff --git a/protocol/rtsp/Cargo.toml b/protocol/rtsp/Cargo.toml index f52a73d6..93e29799 100644 --- a/protocol/rtsp/Cargo.toml +++ b/protocol/rtsp/Cargo.toml @@ -11,7 +11,7 @@ byteorder = "1.4.2" tokio = "1.4.0" bytes = "1.0.0" log = "0.4" -failure = "0.1.1" +failure = "0.1.8" http = "0.2.9" indexmap = "1.9.3" lazy_static = "1.4.0" @@ -19,6 +19,11 @@ chrono = "0.4" async-trait = "0.1.70" base64 = "0.21.2" hex = "0.4.3" +serde_json = { version = "1", default-features = false, features = [ + "alloc", + "raw_value", + "std", +] } bytesio = { path = "../../library/bytesio/" } streamhub = { path = "../../library/streamhub/" } diff --git a/protocol/rtsp/src/session/mod.rs b/protocol/rtsp/src/session/mod.rs index 87fed351..b3fa44f5 100644 --- a/protocol/rtsp/src/session/mod.rs +++ b/protocol/rtsp/src/session/mod.rs @@ -6,6 +6,7 @@ use crate::global_trait::Unmarshal; use crate::rtp::define::ANNEXB_NALU_START_CODE; use crate::rtp::utils::Marshal as RtpMarshal; + use commonlib::http::HttpRequest as RtspRequest; use commonlib::http::HttpResponse as RtspResponse; use commonlib::http::Marshal as RtspMarshal; @@ -56,7 +57,7 @@ use streamhub::{ StreamHubEvent, StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, }, errors::{StreamHubError, StreamHubErrorValue}, - statistics::StreamStatistics, + statistics::StatisticsStream, stream::StreamIdentifier, utils::{RandomDigitCount, Uuid}, }; @@ -71,6 +72,7 @@ pub struct RtspServerSession { tracks: HashMap, sdp: Sdp, pub session_id: Option, + session_type: define::SessionType, stream_handler: Arc, event_producer: StreamHubEventSender, @@ -130,6 +132,7 @@ impl RtspServerSession { tracks: HashMap::new(), sdp: Sdp::default(), session_id: None, + session_type: define::SessionType::Server, event_producer, stream_handler: Arc::new(RtspStreamHandler::new()), auth, @@ -190,7 +193,7 @@ impl RtspServerSession { let mut retry_count = 0; loop { // TODO(all) : shoud check if have '\r\n\r\n' firstly. - let data = self.reader.extract_remaining_bytes(); + let data = self.reader.get_remaining_bytes(); if let Some(rtsp_request_data) = RtspRequest::unmarshal(std::str::from_utf8(&data)?) { // TCP packet sticking issue, if have content_length in header. // should check the body @@ -210,14 +213,13 @@ impl RtspServerSession { } retry_count += 1; let data_recv = self.io.lock().await.read().await?; - // re-push the previous data to reader firstly. - self.reader.extend_from_slice(&data); self.reader.extend_from_slice(&data_recv[..]); continue; } } } rtsp_request = rtsp_request_data; + self.reader.extract_remaining_bytes(); } else { log::error!("corrupted rtsp message={}", std::str::from_utf8(&data)?); return Ok(()); @@ -239,8 +241,8 @@ impl RtspServerSession { self.handle_setup(&rtsp_request).await?; } rtsp_method_name::PLAY => { - if self.handle_play(&rtsp_request).await.is_err() { - self.unsubscribe_from_stream_hub(rtsp_request.uri.path)?; + if let Err(err) = self.handle_play(&rtsp_request).await { + log::info!("handle_play error: {}", err); } } rtsp_method_name::RECORD => { @@ -516,9 +518,11 @@ impl RtspServerSession { self.send_response(&response).await?; + self.session_type = define::SessionType::Client; + let (event_result_sender, event_result_receiver) = oneshot::channel(); - let publish_event = StreamHubEvent::Subscribe { + let subscribe_event = StreamHubEvent::Subscribe { identifier: StreamIdentifier::Rtsp { stream_path: rtsp_request.uri.path.clone(), }, @@ -526,13 +530,13 @@ impl RtspServerSession { result_sender: event_result_sender, }; - if self.event_producer.send(publish_event).is_err() { + if self.event_producer.send(subscribe_event).is_err() { return Err(SessionError { value: SessionErrorValue::StreamHubEventSendErr, }); } - let mut receiver = event_result_receiver.await??.frame_receiver.unwrap(); + let mut receiver = event_result_receiver.await??.0.frame_receiver.unwrap(); let mut retry_times = 0; loop { @@ -582,20 +586,6 @@ impl RtspServerSession { } } - pub fn unsubscribe_from_stream_hub(&mut self, stream_path: String) -> Result<(), SessionError> { - let identifier = StreamIdentifier::Rtsp { stream_path }; - - let subscribe_event = StreamHubEvent::UnSubscribe { - identifier, - info: self.get_subscriber_info(), - }; - if let Err(err) = self.event_producer.send(subscribe_event) { - log::error!("unsubscribe_from_stream_hub err {}", err); - } - - Ok(()) - } - async fn handle_record(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { if let Some(range_str) = rtsp_request.headers.get(&String::from("Range")) { if let Some(range) = RtspRange::unmarshal(range_str) { @@ -616,27 +606,40 @@ impl RtspServerSession { } fn handle_teardown(&mut self, rtsp_request: &RtspRequest) -> Result<(), SessionError> { - let stream_path = &rtsp_request.uri.path; - let unpublish_event = StreamHubEvent::UnPublish { - identifier: StreamIdentifier::Rtsp { - stream_path: stream_path.clone(), - }, - info: self.get_publisher_info(), + let identifier = StreamIdentifier::Rtsp { + stream_path: rtsp_request.uri.path.clone(), + }; + + let event = match self.session_type { + define::SessionType::Client => { + log::info!("handle_teardown: client"); + + StreamHubEvent::UnSubscribe { + identifier, + info: self.get_subscriber_info(), + } + } + define::SessionType::Server => { + log::info!("handle_teardown: server"); + StreamHubEvent::UnPublish { + identifier, + info: self.get_publisher_info(), + } + } }; - let rv = self.event_producer.send(unpublish_event); + let event_json_str = serde_json::to_string(&event).unwrap(); + + let rv = self.event_producer.send(event); match rv { - Err(_) => { - log::error!("unpublish_to_channels error.stream_name: {}", stream_path); + Err(err) => { + log::error!("handle_teardown: send event error: {err} for event: {event_json_str}"); Err(SessionError { value: SessionErrorValue::StreamHubEventSendErr, }) } Ok(()) => { - log::info!( - "unpublish_to_channels successfully.stream name: {}", - stream_path - ); + log::info!("handle_teardown: send event success: {event_json_str}"); Ok(()) } } @@ -866,7 +869,7 @@ impl TStreamHandler for RtspStreamHandler { Ok(()) } - async fn get_statistic_data(&self) -> Option { + async fn get_statistic_data(&self) -> Option { None } diff --git a/protocol/webrtc/CHANGELOG.md b/protocol/webrtc/CHANGELOG.md index a6d27096..13b06e69 100644 --- a/protocol/webrtc/CHANGELOG.md +++ b/protocol/webrtc/CHANGELOG.md @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] - ReleaseDate +## [0.3.2] - 2021-03-15 +- Upgrade failure library. +- Some changes for statistics feature. + ## [0.3.1] - 2021-02-29 - Update the opus library from opus-rs to audiopus for cross compile。 diff --git a/protocol/webrtc/Cargo.toml b/protocol/webrtc/Cargo.toml index 7057b8af..65feff15 100644 --- a/protocol/webrtc/Cargo.toml +++ b/protocol/webrtc/Cargo.toml @@ -15,7 +15,7 @@ http = "0.2.9" byteorder = "1.4.2" bytes = "1.0.0" tokio = "1.4.0" -failure = "0.1.1" +failure = "0.1.8" log = "0.4" webrtc = "0.10.1" async-trait = "0.1.70" diff --git a/protocol/webrtc/src/session/mod.rs b/protocol/webrtc/src/session/mod.rs index ca1a6f4a..26ae9d0c 100644 --- a/protocol/webrtc/src/session/mod.rs +++ b/protocol/webrtc/src/session/mod.rs @@ -5,7 +5,7 @@ use streamhub::{ StreamHubEventSender, SubscribeType, SubscriberInfo, TStreamHandler, }, errors::StreamHubError, - statistics::StreamStatistics, + statistics::StatisticsStream, stream::StreamIdentifier, utils::{RandomDigitCount, Uuid}, }; @@ -367,7 +367,7 @@ impl WebRTCServerSession { }); } - let receiver = event_result_receiver.await??.packet_receiver.unwrap(); + let receiver = event_result_receiver.await??.0.packet_receiver.unwrap(); let (pc_state_sender, mut pc_state_receiver) = broadcast::channel(1); @@ -565,7 +565,7 @@ impl TStreamHandler for WebRTCStreamHandler { ) -> Result<(), StreamHubError> { Ok(()) } - async fn get_statistic_data(&self) -> Option { + async fn get_statistic_data(&self) -> Option { None }