diff --git a/protocol/rtmp/src/chunk/mod.rs b/protocol/rtmp/src/chunk/mod.rs index 81b828ca..759b9609 100644 --- a/protocol/rtmp/src/chunk/mod.rs +++ b/protocol/rtmp/src/chunk/mod.rs @@ -23,6 +23,16 @@ impl ChunkBasicHeader { } } +#[derive(Eq, PartialEq, Debug, Clone)] +pub enum ExtendTimestampType { + //There is no extended timestamp + NONE, + //The extended timestamp field is read in format 0 chunk. + FORMAT0, + //The extended timestamp field is read in format 1 or 2 chunk. + FORMAT12, +} + //5.3.1.2 #[derive(Eq, PartialEq, Debug, Clone)] pub struct ChunkMessageHeader { @@ -38,10 +48,10 @@ pub struct ChunkMessageHeader { // NOTE: this value should be reset to 0 when the current chunk type is 0. pub timestamp_delta: u32, // This field will be set for type 0,1,2 .If the timestamp/timestamp delta >= 0xFFFFFF - // then set this value to true else set it to false. + // then set this value to FORMAT0/FORMAT12 else set it to NONE. // Note that when the chunk format is 3, this value will be inherited from // the most recent chunk 0, 1, or 2 chunk.(5.3.1.3 Extended Timestamp). - pub is_extended_timestamp: bool, + pub extended_timestamp_type: ExtendTimestampType, } impl ChunkMessageHeader { @@ -52,7 +62,7 @@ impl ChunkMessageHeader { msg_type_id, msg_streamd_id: msg_stream_id, timestamp_delta: 0, - is_extended_timestamp: false, + extended_timestamp_type: ExtendTimestampType::NONE, } } } diff --git a/protocol/rtmp/src/chunk/packetizer.rs b/protocol/rtmp/src/chunk/packetizer.rs index a4f9be0c..e3e79812 100644 --- a/protocol/rtmp/src/chunk/packetizer.rs +++ b/protocol/rtmp/src/chunk/packetizer.rs @@ -1,7 +1,7 @@ use { super::{ define::CHUNK_SIZE, errors::PackError, ChunkBasicHeader, ChunkHeader, ChunkInfo, - ChunkMessageHeader, + ChunkMessageHeader, ExtendTimestampType, }, byteorder::{BigEndian, LittleEndian}, bytesio::{bytes_writer::AsyncBytesWriter, bytesio::TNetIO}, @@ -23,18 +23,17 @@ pub struct ChunkPacketizer { max_chunk_size: usize, //bytes: Cursor>, writer: AsyncBytesWriter, - //save timestamp need to be write for chunk - timestamp: u32, + //save extended timestamp need to be write for chunk + extended_timestamp: Option, } impl ChunkPacketizer { pub fn new(io: Arc>>) -> Self { Self { csid_2_chunk_header: HashMap::new(), - //chunk_info: ChunkInfo::new(), writer: AsyncBytesWriter::new(io), max_chunk_size: CHUNK_SIZE as usize, - timestamp: 0, + extended_timestamp: None, } } fn zip_chunk_header(&mut self, chunk_info: &mut ChunkInfo) -> Result { @@ -96,23 +95,41 @@ impl ChunkPacketizer { basic_header: &ChunkBasicHeader, message_header: &mut ChunkMessageHeader, ) -> Result<(), PackError> { - self.timestamp = if basic_header.format == 0 { - message_header.timestamp - } else { - message_header.timestamp_delta - }; - - let timestamp = if self.timestamp >= 0xFFFFFF { - message_header.is_extended_timestamp = true; - 0xFFFFFF - } else { - message_header.is_extended_timestamp = false; - self.timestamp + let message_header_timestamp: u32; + (self.extended_timestamp, message_header_timestamp) = match basic_header.format { + 0 => { + if message_header.timestamp >= 0xFFFFFF { + message_header.extended_timestamp_type = ExtendTimestampType::FORMAT0; + (Some(message_header.timestamp), 0xFFFFFF) + } else { + (None, message_header.timestamp) + } + } + 1 | 2 => { + if message_header.timestamp_delta >= 0xFFFFFF { + //if use the format1,2's extended timestamp, there may be a problem for + //av timestamp. + log::warn!( + "Now use extended timestamp for format {}, the value is: {}", + basic_header.format, + message_header.timestamp_delta + ); + message_header.extended_timestamp_type = ExtendTimestampType::FORMAT12; + (Some(message_header.timestamp_delta), 0xFFFFFF) + } else { + (None, message_header.timestamp_delta) + } + } + _ => { + //should not be here + (None, 0) + } }; match basic_header.format { 0 => { - self.writer.write_u24::(timestamp)?; + self.writer + .write_u24::(message_header_timestamp)?; self.writer .write_u24::(message_header.msg_length)?; self.writer.write_u8(message_header.msg_type_id)?; @@ -120,13 +137,15 @@ impl ChunkPacketizer { .write_u32::(message_header.msg_streamd_id)?; } 1 => { - self.writer.write_u24::(timestamp)?; + self.writer + .write_u24::(message_header_timestamp)?; self.writer .write_u24::(message_header.msg_length)?; self.writer.write_u8(message_header.msg_type_id)?; } 2 => { - self.writer.write_u24::(timestamp)?; + self.writer + .write_u24::(message_header_timestamp)?; } _ => {} } @@ -154,10 +173,11 @@ impl ChunkPacketizer { chunk_info.basic_header.format, chunk_info.basic_header.chunk_stream_id, )?; + self.write_message_header(&chunk_info.basic_header, &mut chunk_info.message_header)?; - if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(self.timestamp)?; + if let Some(extended_timestamp) = self.extended_timestamp { + self.write_extened_timestamp(extended_timestamp)?; } let mut cur_payload_size: usize; @@ -175,8 +195,9 @@ impl ChunkPacketizer { if whole_payload_size > 0 { self.write_basic_header(3, chunk_info.basic_header.chunk_stream_id)?; - if chunk_info.message_header.is_extended_timestamp { - self.write_extened_timestamp(self.timestamp)?; + + if let Some(extended_timestamp) = self.extended_timestamp { + self.write_extened_timestamp(extended_timestamp)?; } } } diff --git a/protocol/rtmp/src/chunk/unpacketizer.rs b/protocol/rtmp/src/chunk/unpacketizer.rs index c0403a4f..2618f47c 100644 --- a/protocol/rtmp/src/chunk/unpacketizer.rs +++ b/protocol/rtmp/src/chunk/unpacketizer.rs @@ -2,7 +2,7 @@ use { super::{ define, errors::{UnpackError, UnpackErrorValue}, - ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, + ChunkBasicHeader, ChunkInfo, ChunkMessageHeader, ExtendTimestampType, }, crate::messages::define::msg_type_id, byteorder::{BigEndian, LittleEndian}, @@ -356,7 +356,7 @@ impl ChunkUnpacketizer { //1, or 2 chunk for the same chunk stream ID indicated the presence of //an extended timestamp field. 5.3.1.3) if self.current_chunk_info.basic_header.format != 3 { - self.current_message_header().is_extended_timestamp = false; + self.current_message_header().extended_timestamp_type = ExtendTimestampType::NONE; } match self.current_chunk_info.basic_header.format { @@ -410,7 +410,8 @@ impl ChunkUnpacketizer { } if self.current_message_header().timestamp >= 0xFFFFFF { - self.current_message_header().is_extended_timestamp = true; + self.current_message_header().extended_timestamp_type = + ExtendTimestampType::FORMAT0; } } /*****************************************************************/ @@ -460,7 +461,8 @@ impl ChunkUnpacketizer { } if self.current_message_header().timestamp_delta >= 0xFFFFFF { - self.current_message_header().is_extended_timestamp = true; + self.current_message_header().extended_timestamp_type = + ExtendTimestampType::FORMAT12; } } /************************************************/ @@ -481,7 +483,8 @@ impl ChunkUnpacketizer { self.reader.read_u24::()?; if self.current_message_header().timestamp_delta >= 0xFFFFFF { - self.current_message_header().is_extended_timestamp = true; + self.current_message_header().extended_timestamp_type = + ExtendTimestampType::FORMAT12; } } @@ -495,32 +498,24 @@ impl ChunkUnpacketizer { } pub fn read_extended_timestamp(&mut self) -> Result { - let mut extended_timestamp: u32 = 0; - - if self.current_message_header().is_extended_timestamp { - extended_timestamp = self.reader.read_u32::()?; - } - - let cur_format_id = self.current_chunk_info.basic_header.format; - - match cur_format_id { - 0 => { - if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp = extended_timestamp; - } + //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, + //1, or 2 chunk for the same chunk stream ID indicated the presence of + //an extended timestamp field. + match self.current_message_header().extended_timestamp_type { + //the current fortmat type can be 0 or 3 + ExtendTimestampType::FORMAT0 => { + self.current_message_header().timestamp = self.reader.read_u32::()?; } - 1 | 2 | 3 => { - //The extended timestamp field is present in Type 3 chunks when the most recent Type 0, - //1, or 2 chunk for the same chunk stream ID indicated the presence of - //an extended timestamp field. - if self.current_message_header().is_extended_timestamp { - self.current_message_header().timestamp_delta = extended_timestamp; - } + //the current fortmat type can be 1,2 or 3 + ExtendTimestampType::FORMAT12 => { + self.current_message_header().timestamp_delta = + self.reader.read_u32::()?; } - - _ => {} + ExtendTimestampType::NONE => {} } + //compute the abs timestamp + let cur_format_id = self.current_chunk_info.basic_header.format; if cur_format_id == 1 || cur_format_id == 2 || (cur_format_id == 3 && self.current_chunk_info.payload.len() == 0)