From b6a36f8d63f2437df503ebd6c4a91f08b5045a82 Mon Sep 17 00:00:00 2001 From: Jacob Su Date: Thu, 28 Mar 2024 09:50:28 +0800 Subject: [PATCH] fix RTMP publish single AAC from ffmpeg client. --- library/bytesio/src/bytesio.rs | 19 +++++++--- library/container/flv/src/demuxer.rs | 5 ++- library/container/flv/src/muxer.rs | 42 ++++++++++++++++++-- protocol/hls/src/flv2hls.rs | 4 ++ protocol/httpflv/src/httpflv.rs | 57 ++++++++++++++++++++++++++-- protocol/rtmp/src/cache/mod.rs | 8 ++-- 6 files changed, 118 insertions(+), 17 deletions(-) diff --git a/library/bytesio/src/bytesio.rs b/library/bytesio/src/bytesio.rs index 99862519..4ec66250 100644 --- a/library/bytesio/src/bytesio.rs +++ b/library/bytesio/src/bytesio.rs @@ -33,7 +33,11 @@ pub struct UdpIO { impl UdpIO { pub async fn new(remote_domain: String, remote_port: u16, local_port: u16) -> Option { - let remote_address = format!("{remote_domain}:{remote_port}"); + let remote_address = if remote_domain == "localhost" { + format!("127.0.0.1:{remote_port}") + } else { + format!("{remote_domain}:{remote_port}") + }; log::info!("remote address: {}", remote_address); let local_address = format!("0.0.0.0:{local_port}"); if let Ok(local_socket) = UdpSocket::bind(local_address).await { @@ -41,10 +45,13 @@ impl UdpIO { if let Err(err) = local_socket.connect(remote_socket_addr).await { log::info!("connect to remote udp socket error: {}", err); } + + return Some(Self { + socket: local_socket, + }); + } else { + log::error!("remote_address parse error: {:?}", remote_address); } - return Some(Self { - socket: local_socket, - }); } None @@ -75,7 +82,7 @@ impl TNetIO for UdpIO { Ok(data) => data, Err(err) => Err(BytesIOError { value: BytesIOErrorValue::TimeoutError(err), - }) + }), } } @@ -120,7 +127,7 @@ impl TNetIO for TcpIO { Ok(data) => data, Err(err) => Err(BytesIOError { value: BytesIOErrorValue::TimeoutError(err), - }) + }), } } diff --git a/library/container/flv/src/demuxer.rs b/library/container/flv/src/demuxer.rs index e56efe23..c7022783 100644 --- a/library/container/flv/src/demuxer.rs +++ b/library/container/flv/src/demuxer.rs @@ -185,7 +185,10 @@ impl FlvAudioTagDemuxer { if tag_header.sound_format == SoundFormat::AAC as u8 { match tag_header.aac_packet_type { aac_packet_type::AAC_SEQHDR => { - self.aac_processor.audio_specific_config_load()?; + if self.aac_processor.bytes_reader.len() >= 2 { + self.aac_processor.audio_specific_config_load()?; + } + return Ok(FlvDemuxerAudioData::new()); } aac_packet_type::AAC_RAW => { diff --git a/library/container/flv/src/muxer.rs b/library/container/flv/src/muxer.rs index 187017e3..54d83c48 100644 --- a/library/container/flv/src/muxer.rs +++ b/library/container/flv/src/muxer.rs @@ -3,7 +3,7 @@ use { bytesio::bytes_writer::BytesWriter, }; -const FLV_HEADER: [u8; 9] = [ +const FLV_HEADER_AV: [u8; 9] = [ 0x46, // 'F' 0x4c, //'L' 0x56, //'V' @@ -11,6 +11,30 @@ const FLV_HEADER: [u8; 9] = [ 0x05, //00000101 audio tag and video tag 0x00, 0x00, 0x00, 0x09, //flv header size ]; // 9 +const FLV_HEADER_JUST_AUDIO: [u8; 9] = [ + 0x46, // 'F' + 0x4c, //'L' + 0x56, //'V' + 0x01, //version + 0x04, //00000101 audio tag and video tag + 0x00, 0x00, 0x00, 0x09, //flv header size +]; // 9 +const FLV_HEADER_JUST_VIDEO: [u8; 9] = [ + 0x46, // 'F' + 0x4c, //'L' + 0x56, //'V' + 0x01, //version + 0x01, //00000101 audio tag and video tag + 0x00, 0x00, 0x00, 0x09, //flv header size +]; // 9 +const FLV_HEADER_EMPTY_AV: [u8; 9] = [ + 0x46, // 'F' + 0x4c, //'L' + 0x56, //'V' + 0x01, //version + 0x00, //00000101 audio tag and video tag + 0x00, 0x00, 0x00, 0x09, //flv header size +]; // 9 pub const HEADER_LENGTH: u32 = 11; pub struct FlvMuxer { pub writer: BytesWriter, @@ -29,8 +53,20 @@ impl FlvMuxer { } } - pub fn write_flv_header(&mut self) -> Result<(), FlvMuxerError> { - self.writer.write(&FLV_HEADER)?; + pub fn write_flv_header( + &mut self, + has_audio: bool, + has_video: bool, + ) -> Result<(), FlvMuxerError> { + if has_audio && has_video { + self.writer.write(&FLV_HEADER_AV)?; + } else if has_audio { + self.writer.write(&FLV_HEADER_JUST_AUDIO)?; + } else if has_video { + self.writer.write(&FLV_HEADER_JUST_VIDEO)?; + } else { + self.writer.write(&FLV_HEADER_EMPTY_AV)?; + } Ok(()) } diff --git a/protocol/hls/src/flv2hls.rs b/protocol/hls/src/flv2hls.rs index c9e6f5c5..818688e3 100644 --- a/protocol/hls/src/flv2hls.rs +++ b/protocol/hls/src/flv2hls.rs @@ -137,6 +137,10 @@ impl Flv2HlsRemuxer { dts = data.dts; pid = self.audio_pid; payload.extend_from_slice(&data.data[..]); + + if dts - self.last_ts_dts >= self.duration * 1000 { + self.need_new_segment = true; + } } _ => return Ok(()), } diff --git a/protocol/httpflv/src/httpflv.rs b/protocol/httpflv/src/httpflv.rs index a6d490a2..b27ced80 100644 --- a/protocol/httpflv/src/httpflv.rs +++ b/protocol/httpflv/src/httpflv.rs @@ -26,6 +26,10 @@ pub struct HttpFlv { muxer: FlvMuxer, + has_audio: bool, + has_video: bool, + has_send_header: bool, + event_producer: StreamHubEventSender, data_receiver: FrameDataReceiver, /* now used for subscriber session */ @@ -52,6 +56,9 @@ impl HttpFlv { app_name, stream_name, muxer: FlvMuxer::new(), + has_audio: false, + has_video: false, + has_send_header: false, data_receiver, statistic_data_sender: None, event_producer, @@ -70,14 +77,56 @@ impl HttpFlv { } pub async fn send_media_stream(&mut self) -> Result<(), HttpFLvError> { - self.muxer.write_flv_header()?; - self.muxer.write_previous_tag_size(0)?; - - self.flush_response_data()?; let mut retry_count = 0; + + let mut max_av_frame_num_to_guess_av = 0; + // the first av frames are sequence configs, must be cached; + let mut cached_frames = Vec::new(); //write flv body loop { if let Some(data) = self.data_receiver.recv().await { + if !self.has_send_header { + max_av_frame_num_to_guess_av += 1; + + match data { + FrameData::Audio { + timestamp: _, + data: _, + } => { + self.has_audio = true; + cached_frames.push(data); + } + FrameData::Video { + timestamp: _, + data: _, + } => { + self.has_video = true; + cached_frames.push(data); + } + FrameData::MetaData { + timestamp: _, + data: _, + } => cached_frames.push(data), + _ => {} + } + + if (self.has_audio && self.has_video) || max_av_frame_num_to_guess_av > 10 { + self.has_send_header = true; + self.muxer + .write_flv_header(self.has_audio, self.has_video)?; + self.muxer.write_previous_tag_size(0)?; + + self.flush_response_data()?; + + for frame in &cached_frames { + self.write_flv_tag(frame.clone())?; + } + cached_frames.clear(); + } + + continue; + } + if let Err(err) = self.write_flv_tag(data) { if let HttpFLvErrorValue::MpscSendError(err_in) = &err.value { if err_in.is_disconnected() { diff --git a/protocol/rtmp/src/cache/mod.rs b/protocol/rtmp/src/cache/mod.rs index 05867f9e..6a5d05ae 100644 --- a/protocol/rtmp/src/cache/mod.rs +++ b/protocol/rtmp/src/cache/mod.rs @@ -33,7 +33,6 @@ pub struct Cache { impl Cache { pub fn new(gop_num: usize, statistic_data_sender: Option) -> Self { - Cache { metadata: metadata::MetaData::new(), metadata_timestamp: 0, @@ -78,7 +77,10 @@ impl Cache { let mut reader = BytesReader::new(chunk_body.clone()); let tag_header = AudioTagHeader::unmarshal(&mut reader)?; - if tag_header.sound_format == define::SoundFormat::AAC as u8 + let remain_bytes = reader.extract_remaining_bytes(); + + if remain_bytes.len() >= 2 + && tag_header.sound_format == define::SoundFormat::AAC as u8 && tag_header.aac_packet_type == define::aac_packet_type::AAC_SEQHDR { self.audio_seq = chunk_body.clone(); @@ -88,7 +90,7 @@ impl Cache { let mut aac_processor = Mpeg4AacProcessor::default(); let aac = aac_processor - .extend_data(reader.extract_remaining_bytes()) + .extend_data(remain_bytes) .audio_specific_config_load()?; let statistic_audio_codec = StatisticData::AudioCodec {