diff --git a/Cargo.lock b/Cargo.lock index b3839baa..d9dbc7dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1236,6 +1236,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "is_sorted" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357376465c37db3372ef6a00585d336ed3d0f11d4345eef77ebcb05865392b21" + [[package]] name = "itertools" version = "0.10.5" @@ -1415,6 +1421,8 @@ dependencies = [ "gstreamer-app", "gstreamer-rtsp", "gstreamer-rtsp-server", + "is_sorted", + "itertools", "lazy_static", "log", "neolink_core", diff --git a/Cargo.toml b/Cargo.toml index c00d20ee..fc30a56c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,8 @@ gstreamer = "0.20.3" gstreamer-app = "0.20.0" gstreamer-rtsp = "0.20.0" gstreamer-rtsp-server = { version = "0.20.3", features = ["v1_16"] } +is_sorted = "0.1.1" +itertools = "0.10.5" lazy_static = "1.4.0" log = { version = "0.4.17", features = [ "release_max_level_debug" ] } neolink_core = { path = "crates/core", version = "0.5.10" } diff --git a/crates/core/src/bc_protocol/connection/discovery.rs b/crates/core/src/bc_protocol/connection/discovery.rs index 93549565..076928ca 100644 --- a/crates/core/src/bc_protocol/connection/discovery.rs +++ b/crates/core/src/bc_protocol/connection/discovery.rs @@ -58,7 +58,7 @@ struct UidLookupResults { pub(crate) struct Discovery {} -const MTU: u32 = 1030; +const MTU: u32 = 1350; lazy_static! { static ref P2P_RELAY_HOSTNAMES: [&'static str; 10] = [ "p2p.reolink.com", diff --git a/crates/core/src/bc_protocol/connection/udpsource.rs b/crates/core/src/bc_protocol/connection/udpsource.rs index d22c8844..bebd9105 100644 --- a/crates/core/src/bc_protocol/connection/udpsource.rs +++ b/crates/core/src/bc_protocol/connection/udpsource.rs @@ -28,7 +28,7 @@ use tokio_util::{ udp::UdpFramed, }; -const MTU: usize = 1030; +const MTU: usize = 1350; const UDPDATA_HEADER_SIZE: usize = 20; pub(crate) struct UdpSource { diff --git a/sample_config.toml b/sample_config.toml index d1879a85..4fb6735a 100644 --- a/sample_config.toml +++ b/sample_config.toml @@ -75,6 +75,22 @@ address = "192.168.1.187:9000" # # print_format = "None" +# Internally neolink uses a buffer to improve delivery of frames at their actual times +# A large buffer can work even for large interupts in the connection with the camera +# A small buffer will reduce the latency introduced +# buffer_size: 100 + +# Over long periods of time the camera buffer can exhaust with no means of recovery +# to combat this a time stretching stratery is employed where the stream is slowed +# as the buffer is exhausted and sped up when full +# this can keep the camera and rtsp streams in sync and reduce buffer exhaustions +# +# HOWEVER if there is real latency issues (such as too high a bitrate) +# then smoothing can result in a stream appearing in slow motion +# +# you can disable and enable this feature here +# use_smoothing: true + [[cameras]] name = "storage shed" diff --git a/src/config.rs b/src/config.rs index 78b3c8ac..2cfd870b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -102,6 +102,22 @@ pub(crate) struct CameraConfig { #[serde(default = "default_update_time", alias = "time")] pub(crate) update_time: bool, + + #[validate(range( + min = 10, + max = 500, + message = "Invalid buffer size", + code = "buffer_size" + ))] + #[serde(default = "default_buffer_size", alias = "size", alias = "buffer")] + pub(crate) buffer_size: usize, + + #[serde( + default = "default_smoothing", + alias = "smoothing", + alias = "stretching" + )] + pub(crate) use_smoothing: bool, } #[derive(Debug, Deserialize, Validate, Clone)] @@ -239,6 +255,14 @@ fn default_pause() -> PauseConfig { } } +fn default_smoothing() -> bool { + true +} + +fn default_buffer_size() -> usize { + 100 +} + pub(crate) static RESERVED_NAMES: &[&str] = &["anyone", "anonymous"]; fn validate_username(name: &str) -> Result<(), ValidationError> { if name.trim().is_empty() { diff --git a/src/rtsp/gst/factory.rs b/src/rtsp/gst/factory.rs index 298048ac..6c4426aa 100644 --- a/src/rtsp/gst/factory.rs +++ b/src/rtsp/gst/factory.rs @@ -41,14 +41,16 @@ glib::wrapper! { impl Default for NeoMediaFactory { fn default() -> Self { - Self::new() + Self::new(100, true) } } impl NeoMediaFactory { - pub(crate) fn new() -> Self { + pub(crate) fn new(buffer_size: usize, use_smoothing: bool) -> Self { let factory = Object::new::(); factory.set_shared(false); + factory.imp().shared.set_buffer_size(buffer_size); + factory.imp().shared.set_use_smoothing(use_smoothing); // factory.set_do_retransmission(false); // Can't use as the method is missing on the 32bit docker gst dll factory.set_launch("videotestsrc pattern=\"snow\" ! video/x-raw,width=896,height=512,framerate=25/1 ! textoverlay name=\"inittextoverlay\" text=\"Stream not Ready\" valignment=top halignment=left font-desc=\"Sans, 32\" ! jpegenc ! rtpjpegpay name=pay0"); factory.set_suspend_mode(RTSPSuspendMode::None); @@ -120,6 +122,8 @@ pub(crate) enum FactoryCommand { BcMedia(BcMedia), ClearBuffer, JumpToLive, + Pause, + Resume, } pub(crate) struct NeoMediaFactoryImpl { @@ -149,6 +153,7 @@ impl Default for NeoMediaFactoryImpl { shared.clone(), ReceiverStream::new(datarx), ReceiverStream::new(rx_clientsender), + 100, ); threads.spawn(async move { loop { @@ -265,7 +270,10 @@ impl NeoMediaFactoryImpl { bin.remove(&element)?; } - let mut client_data = NeoMediaSender::new(); + let mut client_data = NeoMediaSender::new( + self.shared.get_buffer_size(), + self.shared.get_use_smoothing(), + ); // Now contruct the actual ones match ( diff --git a/src/rtsp/gst/sender.rs b/src/rtsp/gst/sender.rs index f038130e..f2401a30 100644 --- a/src/rtsp/gst/sender.rs +++ b/src/rtsp/gst/sender.rs @@ -8,6 +8,8 @@ use futures::{ use gstreamer::{prelude::*, ClockTime}; use gstreamer_app::AppSrc; pub use gstreamer_rtsp_server::gio::{TlsAuthenticationMode, TlsCertificate}; +use is_sorted::IsSorted; +use itertools::Itertools; use log::*; use neolink_core::bcmedia::model::*; use std::{ @@ -29,48 +31,94 @@ use crate::rtsp::Spring; type FrameTime = i64; -const BUFFER_SIZE: usize = 300; - #[derive(Debug, Clone)] struct Stamped { time: FrameTime, - data: Arc, + data: Vec>, } -#[derive(Default, Debug)] +#[derive(Debug)] struct NeoBuffer { buf: VecDeque, + max_size: usize, } impl NeoBuffer { - fn push(&mut self, item: Stamped) { - // Sort time - // debug!("sorting"); - let mut sorting_vec = vec![]; - let time = item.time; - while self - .buf - .back() - .map(|back| back.time > time) - .unwrap_or(false) - { - sorting_vec.push(self.buf.pop_back().unwrap()); + fn new(max_size: usize) -> Self { + Self { + buf: Default::default(), + max_size, } - sorting_vec.push(item); + } + + fn push(&mut self, media: Arc) { + let frame_time = match media.as_ref() { + BcMedia::Iframe(data) => Some(data.microseconds as FrameTime), + BcMedia::Pframe(data) => Some(data.microseconds as FrameTime), + _ => None, + }; + if let Some(frame_time) = frame_time { + let mut sorting_vec = vec![]; + let time = frame_time; + while self + .buf + .back() + .map(|back| back.time >= time) + .unwrap_or(false) + { + sorting_vec.push(self.buf.pop_back().unwrap()); + } + if sorting_vec + .last() + .map(|last| last.time == frame_time) + .unwrap_or(false) + { + sorting_vec.last_mut().unwrap().data.push(media); + } else { + sorting_vec.push(Stamped { + time, + data: vec![media], + }); + } + + while let Some(sorted_item) = sorting_vec.pop() { + debug!("Pushing frame with time: {}", sorted_item.time); + self.buf.push_back(sorted_item); + } - for sorted_item in sorting_vec.drain(..) { - // debug!("Pushing frame with time: {}", sorted_item.time); - self.buf.push_back(sorted_item); + debug_assert!( + IsSorted::is_sorted(&mut self.buf.iter().map(|stamped| stamped.time)), + "{:?}", + self.buf + .iter() + .map(|stamped| stamped.time) + .collect::>() + ); + } else if let Some(last) = self.buf.back_mut() { + last.data.push(media); } - while self.buf.len() > BUFFER_SIZE { + + while self.buf.len() > self.max_size { if self.buf.pop_front().is_none() { break; } } } + fn prep_size(&self) -> usize { + self.max_size * 2 / 3 + } + + fn live_size(&self) -> usize { + self.max_size / 3 + } + pub(crate) fn ready(&self) -> bool { - self.buf.len() > BUFFER_SIZE * 2 / 3 + self.buf.len() > self.prep_size() + } + + pub(crate) fn ready_play(&self) -> bool { + self.buf.len() > self.live_size() } // fn last_iframe_time(&self) -> Option { @@ -100,14 +148,14 @@ impl NeoBuffer { // .map(|i| i as FrameTime) // } - fn start_time(&self) -> Option { - let (fronts, backs) = self.buf.as_slices(); - fronts - .iter() - .chain(&mut backs.iter()) - .map(|frame| frame.time) - .next() - } + // fn start_time(&self) -> Option { + // let (fronts, backs) = self.buf.as_slices(); + // fronts + // .iter() + // .chain(&mut backs.iter()) + // .map(|frame| frame.time) + // .next() + // } fn end_time(&self) -> Option { let (fronts, backs) = self.buf.as_slices(); @@ -163,6 +211,7 @@ impl NeoMediaSenders { shared: Arc, data_source: ReceiverStream, client_source: ReceiverStream, + buffer_size: usize, ) -> Self { Self { data_source, @@ -170,7 +219,7 @@ impl NeoMediaSenders { shared, uid: Default::default(), client_data: Default::default(), - buffer: Default::default(), + buffer: NeoBuffer::new(buffer_size), } } @@ -245,20 +294,17 @@ impl NeoMediaSenders { _ => self.buffer.end_time().unwrap_or(0), }; - let data = Stamped { - time, - data: Arc::new(data), - }; - for client_data in self.client_data.values_mut() { - client_data.add_data(data.clone()).await?; - } + let data = Arc::new(data); let end_time = self.buffer.end_time(); - let frame_time = data.time; + let frame_time = time; // Ocassionally the camera will make a jump in timestamps of about 15s (on sub 9s on main) // This could mean that it runs on some fixed sized buffer if let Some(end_time) = end_time { - let delta_time = frame_time - end_time - -1; + let delta_frame = (self.buffer.buf.back().unwrap().time + - self.buffer.buf.front().unwrap().time) + / self.buffer.buf.len() as i64; + let delta_time = frame_time - end_time - delta_frame; let delta_duration = Duration::from_micros(delta_time.unsigned_abs()); if delta_duration > Duration::from_secs(1) { debug!( @@ -267,17 +313,26 @@ impl NeoMediaSenders { ); debug!("Adjusting master: {}", self.buffer.buf.len()); for frame in self.buffer.buf.iter_mut() { + let old_frame_time = frame.time; frame.time = frame.time.saturating_add(delta_time); - debug!(" - New frame time: {} -> {}", frame.time, frame_time); + debug!( + " - New frame time: {} -> {} (target {})", + old_frame_time, frame.time, frame_time + ); } for (_, client) in self.client_data.iter_mut() { - client.buffer.buf.clear(); - client.inited = false; + for frame in client.buffer.buf.iter_mut() { + frame.time = frame.time.saturating_add(delta_time); + } + client.start_time.mod_value(delta_time as f64); } } } + for client_data in self.client_data.values_mut() { + client_data.add_data(data.clone()).await?; + } self.buffer.push(data); self.shared @@ -367,6 +422,7 @@ impl NeoMediaSenders { } async fn update(&mut self) -> AnyResult<()> { + self.buffer.max_size = self.shared.get_buffer_size(); self.init_clients().await?; self.process_client_commands().await?; self.process_client_update().await?; @@ -389,6 +445,15 @@ impl NeoMediaSenders { Some(v) = self.data_source.next() => { match v { FactoryCommand::BcMedia(media) => { + let frame_time = match &media { + BcMedia::Iframe(data) => Some(Duration::from_micros(data.microseconds as u64)), + BcMedia::Pframe(data) => Some(Duration::from_micros(data.microseconds as u64)), + _ => None, + }; + if let Some(frame_time) = frame_time { + debug!("Got frame at {:?}", frame_time); + } + self.handle_new_data(media).await?; }, FactoryCommand::ClearBuffer => { @@ -399,7 +464,18 @@ impl NeoMediaSenders { // Set them into the non init state // This will make them wait for the // buffer to be enough then jump to live - client.inited = false; + let _ = client.jump_to_live().await; + } + }, + FactoryCommand::Pause => { + for client in self.client_data.values_mut() { + client.playing = false; + } + }, + FactoryCommand::Resume => { + for client in self.client_data.values_mut() { + client.playing = true; + let _ = client.jump_to_live().await; } }, } @@ -432,25 +508,29 @@ pub(super) struct NeoMediaSender { command_sender: Sender, inited: bool, playing: bool, + refilling: bool, + use_smoothing: bool, } impl NeoMediaSender { - pub(super) fn new() -> Self { + pub(super) fn new(buffer_size: usize, use_smoothing: bool) -> Self { let (tx, rx) = channel(30); Self { start_time: Spring::new(0.0, 0.0, 10.0), live_offset: 0, - buffer: NeoBuffer::default(), + buffer: NeoBuffer::new(buffer_size), vid: None, aud: None, command_reciever: rx, command_sender: tx, inited: false, playing: true, + refilling: false, + use_smoothing, } } - async fn add_data(&mut self, data: Stamped) -> AnyResult<()> { + async fn add_data(&mut self, data: Arc) -> AnyResult<()> { self.buffer.push(data); Ok(()) } @@ -467,41 +547,71 @@ impl NeoMediaSender { self.command_sender.clone() } - fn target_live(&self) -> Option { - let target_idx = BUFFER_SIZE / 3; - if self.buffer.buf.len() >= target_idx { - let target_frame = self.buffer.buf.len().saturating_sub(target_idx); - self.buffer.buf.get(target_frame).map(|frame| frame.time) - } else { + fn target_live_for(buffer: &NeoBuffer) -> Option { + let target_idx = buffer.live_size(); + let stamps = buffer.buf.iter().map(|item| item.time).collect::>(); + + if stamps.len() >= target_idx { + let target_frame = stamps.len().saturating_sub(target_idx); + stamps.get(target_frame).copied() + } else if stamps.len() > 5 { // Approximate it's location - let fraction = target_idx as f64 / self.buffer.buf.len() as f64; - if let (Some(st), Some(et)) = (self.buffer.start_time(), self.buffer.end_time()) { + let fraction = target_idx as f64 / stamps.len() as f64; + if let (Some(st), Some(et)) = (stamps.first(), stamps.last()) { Some(et - ((et - st) as f64 * fraction) as FrameTime) } else { None } + } else { + debug!("Not enough timestamps for target live: {:?}", stamps); + if let Some(st) = stamps.first() { + debug!("Setting to 1s behind first frame in buffer"); + Some(st - Duration::from_secs(1).as_micros() as FrameTime) + } else { + None + } } } - async fn jump_to_live(&mut self) -> AnyResult<()> { - if self.inited { - let target_time = self.target_live(); + fn target_live(&self) -> Option { + Self::target_live_for(&self.buffer) + } - if let Some(target_time) = target_time { - if let Some(et) = self.buffer.end_time() { - debug!( - "Minimum Latency: {:?}", - Duration::from_micros(et.saturating_sub(target_time).max(0) as u64) - ); - } - let runtime = self.get_runtime().unwrap_or(0); + async fn jump_to_live(&mut self) -> AnyResult<()> { + let target_time = self.target_live(); - self.start_time.reset_to((target_time - runtime) as f64); + if let Some(target_time) = target_time { + if let Some(et) = self.buffer.end_time() { + debug!( + "Buffer stamps: {:?}", + self.buffer + .buf + .iter() + .fold(Vec::::new(), |mut acc, item| { + if let Some(last) = acc.last() { + if *last < item.time { + acc.push(item.time); + } + } else { + acc.push(item.time); + } + acc + }) + ); debug!( - "Jumped to live: New start time: {:?}", - Duration::from_micros(self.start_time.value_u64()), + "Minimum Latency: {:?} ({:?} - {:?})", + Duration::from_micros(et.saturating_sub(target_time).max(0) as u64), + Duration::from_micros(et.max(0) as u64), + Duration::from_micros(target_time.max(0) as u64), ); } + let runtime = self.get_runtime().unwrap_or(0); + + self.start_time.reset_to((target_time - runtime) as f64); + debug!( + "Jumped to live: New start time: {:?}", + Duration::from_micros(self.start_time.value_u64()), + ); } Ok(()) @@ -510,8 +620,10 @@ impl NeoMediaSender { async fn update_starttime(&mut self) -> AnyResult<()> { self.start_time.update().await; - if let (Some(runtime), Some(target_time)) = (self.get_runtime(), self.target_live()) { - self.start_time.set_target((target_time - runtime) as f64); + if self.use_smoothing { + if let (Some(runtime), Some(target_time)) = (self.get_runtime(), self.target_live()) { + self.start_time.set_target((target_time - runtime) as f64); + } } Ok(()) } @@ -519,24 +631,9 @@ impl NeoMediaSender { async fn seek( &mut self, _original_runtime: Option, - target_runtime: FrameTime, - master_buffer: &NeoBuffer, + _target_runtime: FrameTime, + _master_buffer: &NeoBuffer, ) -> AnyResult<()> { - if let Some(current_runtime) = self.get_runtime() { - self.live_offset = self - .live_offset - .saturating_add(target_runtime.saturating_sub(current_runtime)); - trace!("Old runtime: {}", current_runtime); - trace!("Target runtime: {}", target_runtime); - trace!("Offset: {}", self.live_offset); - trace!("New runtime: {:?}", self.get_runtime()); - if let Some(new_buftime) = self.get_buftime() { - self.buffer.buf.clear(); - for frame in master_buffer.buf.iter().filter(|f| f.time >= new_buftime) { - self.buffer.buf.push_back(frame.clone()); - } - } - } self.jump_to_live().await?; Ok(()) } @@ -570,30 +667,31 @@ impl NeoMediaSender { async fn initialise(&mut self, buffer: &NeoBuffer) -> AnyResult<()> { if !self.inited && buffer.ready() { - // Minimum buffer - self.inited = true; - - // Split buffer into 2/3 of preprocess - // and one third for the future buffer - let split_idx = buffer.buf.len() * 2 / 3; - let rebuf = buffer.buf.iter().cloned().collect::>(); - let (preprocess, buffer) = rebuf.split_at(split_idx); - let start_ms = buffer - .first() - .map(|data| data.time as f64) - .expect("Buffer should have a start time"); - self.start_time.reset_to(start_ms); - - // Send preprocess now - self.send_buffers(preprocess).await?; - debug!("Preprocessed"); - // Send these later - for frame in buffer.iter() { - self.buffer.push(frame.clone()); - } - debug!("Buffer filled"); + if let Some(target_time) = Self::target_live_for(buffer) { + // Minimum buffer + self.inited = true; + self.buffer.buf.clear(); - self.jump_to_live().await?; + let mut buffer_iter = buffer.buf.iter().cloned(); + let preprocess = buffer_iter + .take_while_ref(|item| item.time < target_time) + .collect::>(); + let mut buffer = buffer_iter.collect::>(); + self.start_time.reset_to(target_time as f64); + + // Send preprocess now + self.send_buffers(preprocess.as_slice()).await?; + debug!("Preprocessed"); + // Send these later + for frame in buffer.drain(..) { + self.buffer.buf.push_back(frame); + } + debug!("Buffer filled"); + + self.jump_to_live().await?; + } else { + debug!("Buffer not ready to init: {}", buffer.buf.len()); + } } else if !self.inited { debug!("Buffer not ready to init: {}", buffer.buf.len()); } @@ -611,6 +709,10 @@ impl NeoMediaSender { .saturating_add(self.live_offset) .max(0), ); + // debug!("base_time: {:?}", base_time); + // debug!("time: {:?}", time); + // debug!("runtime: {:?}", runtime); + // debug!("Final runtime: {:?}", res); trace!( "Runtime: {:?}, Offset: {:?}, Offseted Runtime: {:?}", runtime, @@ -638,44 +740,68 @@ impl NeoMediaSender { } async fn update(&mut self) -> AnyResult<()> { - self.update_starttime().await?; - let mut buffers = vec![]; - if !self - .vid - .as_ref() - .map(|x| x.pads().iter().all(|pad| pad.is_linked())) - .unwrap_or(false) - { - return Err(anyhow!("Vid src is closed")); - } - if self.buffer.buf.len() < 4 { - error!("Buffer exhausted. Not enough data from Camera."); - } else { - trace!("Buffer size: {}", self.buffer.buf.len()); + if self.buffer.buf.len() >= self.buffer.max_size * 9 / 10 { + debug!("Buffer overfull"); + self.jump_to_live().await?; } - const LATENCY: FrameTime = Duration::from_millis(250).as_micros() as FrameTime; - if let Some(buftime) = self.get_buftime().map(|i| i.saturating_add(LATENCY)) { - // debug!("Update: buftime: {}", buftime); - while self - .buffer - .buf - .front() - .map(|data| data.time <= buftime) + if self.refilling && self.buffer.ready_play() { + self.refilling = false; + self.jump_to_live().await?; + } else if self.refilling { + debug!( + "Refilling: {}/{} ({:.2}%)", + self.buffer.buf.len(), + self.buffer.live_size(), + self.buffer.buf.len() as f32 / (self.buffer.live_size()) as f32 * 100.0 + ); + } else if !self.refilling && self.inited && self.playing { + self.update_starttime().await?; + // Check app src is live + if !self + .vid + .as_ref() + .map(|x| x.pads().iter().all(|pad| pad.is_linked())) .unwrap_or(false) { - tokio::task::yield_now().await; - match self.buffer.buf.pop_front() { - Some(frame) => { - buffers.push(frame); + return Err(anyhow!("Vid src is closed")); + } + + // Check if buffers are ok + if self.buffer.buf.len() <= self.buffer.max_size / 10 { + warn!( + "Buffer exhausted. Not enough data from Camera. Pausing RTSP until refilled." + ); + info!("Try reducing your Max Bitrate using the offical app"); + self.refilling = true; + } else { + debug!("Buffer size: {}", self.buffer.buf.len()); + } + + // Send buffers + let mut buffers = vec![]; + if let Some(buftime) = self.get_buftime() { + // debug!("Update: buftime: {}", buf time); + while self + .buffer + .buf + .front() + .map(|data| data.time <= buftime) + .unwrap_or(false) + { + tokio::task::yield_now().await; + match self.buffer.buf.pop_front() { + Some(frame) => { + buffers.push(frame); + } + None => break, } - None => break, } } - } - tokio::task::yield_now().await; - // collect certain frames - self.send_buffers(&buffers).await?; + tokio::task::yield_now().await; + // collect certain frames + self.send_buffers(&buffers).await?; + } Ok(()) } @@ -684,18 +810,18 @@ impl NeoMediaSender { if medias.is_empty() { return Ok(()); } - if self.inited && self.playing { + tokio::task::yield_now().await; + let mut vid_buffers: Vec<(FrameTime, Vec)> = vec![]; + let mut aud_buffers: Vec<(FrameTime, Vec)> = vec![]; + for media_sets in medias.iter() { tokio::task::yield_now().await; - let mut vid_buffers: Vec<(FrameTime, Vec)> = vec![]; - let mut aud_buffers: Vec<(FrameTime, Vec)> = vec![]; - for media in medias.iter() { - tokio::task::yield_now().await; - let buffer = match media.data.as_ref() { + for media in media_sets.data.iter() { + let buffer = match media.as_ref() { BcMedia::Iframe(_) | BcMedia::Pframe(_) => Some(&mut vid_buffers), BcMedia::Aac(_) | BcMedia::Adpcm(_) => Some(&mut aud_buffers), _ => None, }; - let data = match media.data.as_ref() { + let data = match media.as_ref() { BcMedia::Iframe(data) => Some(&data.data), BcMedia::Pframe(data) => Some(&data.data), BcMedia::Aac(data) => Some(&data.data), @@ -703,7 +829,7 @@ impl NeoMediaSender { _ => None, }; if let (Some(data), Some(buffer)) = (data, buffer) { - let next_time = media.time; + let next_time = media_sets.time; if let Some(last) = buffer.last_mut() { let last_time = last.0; if next_time == last_time { @@ -716,131 +842,126 @@ impl NeoMediaSender { } } } - tokio::task::yield_now().await; - tokio::try_join!( - async { - if !vid_buffers.is_empty() { - // debug!("Sending video buffers: {}", vid_buffers.len()); - if let Some(appsrc) = self.vid.clone() { - let buffers = { - let mut buffers = - gstreamer::BufferList::new_sized(vid_buffers.len()); - { - let buffers_ref = buffers.get_mut().unwrap(); - for (time, buf) in vid_buffers.drain(..) { - tokio::task::yield_now().await; - let runtime = self.buftime_to_runtime(time); - let _actual_runtime = self - .get_runtime() - .map(|i| Duration::from_micros(i as u64)); - // debug!( - // " - Sending vid frame at time {} ({:?} Expect: {:?})", - // time, - // Duration::from_micros(runtime as u64), - // actual_runtime - // ); - - let gst_buf = { - let mut gst_buf = - gstreamer::Buffer::with_size(buf.len()).unwrap(); - { - let gst_buf_mut = gst_buf.get_mut().unwrap(); - - let time = ClockTime::from_useconds( - runtime.try_into().unwrap(), - ); - gst_buf_mut.set_dts(time); - let mut gst_buf_data = - gst_buf_mut.map_writable().unwrap(); - gst_buf_data.copy_from_slice(buf.as_slice()); - } - gst_buf - }; - buffers_ref.add(gst_buf); - } - } - buffers - }; - - let res = tokio::task::spawn_blocking(move || { - // debug!(" - Pushing buffer: {}", buffers.len()); - appsrc - .push_buffer_list(buffers.copy()) - .map(|_| ()) - .map_err(|_| anyhow!("Could not push buffer to appsrc")) - }) - .await; - match &res { - Err(e) => { - debug!("Paniced on send buffer list: {:?}", e); - } - Ok(Err(e)) => { - debug!("Failed to send buffer list: {:?}", e); + } + tokio::task::yield_now().await; + tokio::try_join!( + async { + if !vid_buffers.is_empty() { + // debug!("Sending video buffers: {}", vid_buffers.len()); + if let Some(appsrc) = self.vid.clone() { + let buffers = { + let mut buffers = gstreamer::BufferList::new_sized(vid_buffers.len()); + { + let buffers_ref = buffers.get_mut().unwrap(); + for (time, buf) in vid_buffers.drain(..) { + tokio::task::yield_now().await; + let runtime = self.buftime_to_runtime(time); + debug!( + " - Sending vid frame at time {} ({:?} Expect: {:?})", + time, + Duration::from_micros(runtime as u64), + self.get_runtime().map(|i| Duration::from_micros(i as u64)) + ); + + let gst_buf = { + let mut gst_buf = + gstreamer::Buffer::with_size(buf.len()).unwrap(); + { + let gst_buf_mut = gst_buf.get_mut().unwrap(); + + let time = ClockTime::from_useconds( + runtime.try_into().unwrap(), + ); + gst_buf_mut.set_dts(time); + let mut gst_buf_data = + gst_buf_mut.map_writable().unwrap(); + gst_buf_data.copy_from_slice(buf.as_slice()); + } + gst_buf + }; + buffers_ref.add(gst_buf); } - Ok(Ok(_)) => {} - }; - res??; - } + } + buffers + }; + + let res = tokio::task::spawn_blocking(move || { + // debug!(" - Pushing buffer: {}", buffers.len()); + appsrc + .push_buffer_list(buffers.copy()) + .map(|_| ()) + .map_err(|_| anyhow!("Could not push buffer to appsrc")) + }) + .await; + match &res { + Err(e) => { + debug!("Paniced on send buffer list: {:?}", e); + } + Ok(Err(e)) => { + debug!("Failed to send buffer list: {:?}", e); + } + Ok(Ok(_)) => {} + }; + res??; } - AnyResult::Ok(()) - }, - async { - if !aud_buffers.is_empty() { - if let Some(appsrc) = self.aud.clone() { - let buffers = { - let mut buffers = - gstreamer::BufferList::new_sized(aud_buffers.len()); - { - let buffers_ref = buffers.get_mut().unwrap(); - for (time, buf) in aud_buffers.drain(..) { - tokio::task::yield_now().await; - let runtime = self.buftime_to_runtime(time); - - let gst_buf = { - let mut gst_buf = - gstreamer::Buffer::with_size(buf.len()).unwrap(); - { - let gst_buf_mut = gst_buf.get_mut().unwrap(); - - let time = ClockTime::from_useconds( - runtime.try_into().unwrap(), - ); - gst_buf_mut.set_dts(time); - let mut gst_buf_data = - gst_buf_mut.map_writable().unwrap(); - gst_buf_data.copy_from_slice(buf.as_slice()); - } - gst_buf - }; - buffers_ref.add(gst_buf); - } - } - buffers - }; - - let res = tokio::task::spawn_blocking(move || { - appsrc - .push_buffer_list(buffers.copy()) - .map(|_| ()) - .map_err(|_| anyhow!("Could not push buffer to appsrc")) - }) - .await; - match &res { - Err(e) => { - debug!("Paniced on send buffer list: {:?}", e); - } - Ok(Err(e)) => { - debug!("Failed to send buffer list: {:?}", e); + } + AnyResult::Ok(()) + }, + async { + if !aud_buffers.is_empty() { + if let Some(appsrc) = self.aud.clone() { + let buffers = { + let mut buffers = gstreamer::BufferList::new_sized(aud_buffers.len()); + { + let buffers_ref = buffers.get_mut().unwrap(); + for (time, buf) in aud_buffers.drain(..) { + tokio::task::yield_now().await; + let runtime = self.buftime_to_runtime(time); + + let gst_buf = { + let mut gst_buf = + gstreamer::Buffer::with_size(buf.len()).unwrap(); + { + let gst_buf_mut = gst_buf.get_mut().unwrap(); + + let time = ClockTime::from_useconds( + runtime.try_into().unwrap(), + ); + gst_buf_mut.set_dts(time); + let mut gst_buf_data = + gst_buf_mut.map_writable().unwrap(); + gst_buf_data.copy_from_slice(buf.as_slice()); + } + gst_buf + }; + buffers_ref.add(gst_buf); } - Ok(Ok(_)) => {} - }; - res??; - } + } + buffers + }; + + let res = tokio::task::spawn_blocking(move || { + appsrc + .push_buffer_list(buffers.copy()) + .map(|_| ()) + .map_err(|_| anyhow!("Could not push buffer to appsrc")) + }) + .await; + match &res { + Err(e) => { + debug!("Paniced on send buffer list: {:?}", e); + } + Ok(Err(e)) => { + debug!("Failed to send buffer list: {:?}", e); + } + Ok(Ok(_)) => {} + }; + res??; } - AnyResult::Ok(()) } - )?; - } + AnyResult::Ok(()) + } + )?; Ok(()) } } diff --git a/src/rtsp/gst/server.rs b/src/rtsp/gst/server.rs index 61091454..52728b29 100644 --- a/src/rtsp/gst/server.rs +++ b/src/rtsp/gst/server.rs @@ -52,8 +52,12 @@ impl NeoRtspServer { self.imp().get_sender(tag).await } - pub(crate) async fn create_stream>(&self, tag: U) -> AnyResult<()> { - self.imp().create_stream(tag).await + pub(crate) async fn create_stream>( + &self, + tag: U, + config: &CameraConfig, + ) -> AnyResult<()> { + self.imp().create_stream(tag, config).await } #[allow(dead_code)] @@ -302,6 +306,26 @@ impl NeoRtspServer { Err(anyhow!("No such tag")) } } + + // Pause on all senders of a tag + pub(crate) async fn pause>(&self, tag: T) -> AnyResult<()> { + if let Some(sender) = self.imp().get_sender(tag).await { + sender.send(FactoryCommand::Pause).await?; + Ok(()) + } else { + Err(anyhow!("No such tag")) + } + } + + // Resume on all senders of a tag + pub(crate) async fn resume>(&self, tag: T) -> AnyResult<()> { + if let Some(sender) = self.imp().get_sender(tag).await { + sender.send(FactoryCommand::Resume).await?; + Ok(()) + } else { + Err(anyhow!("No such tag")) + } + } } unsafe impl Send for NeoRtspServer {} @@ -330,12 +354,16 @@ impl ObjectSubclass for NeoRtspServerImpl { } impl NeoRtspServerImpl { - pub(crate) async fn create_stream>(&self, tag: U) -> AnyResult<()> { + pub(crate) async fn create_stream>( + &self, + tag: U, + config: &CameraConfig, + ) -> AnyResult<()> { let key = tag.into(); match self.medias.write().await.entry(key.clone()) { Entry::Occupied(_occ) => {} Entry::Vacant(vac) => { - let media = NeoMediaFactory::new(); + let media = NeoMediaFactory::new(config.buffer_size, config.use_smoothing); let thread_media = media.clone(); self.threads .write() diff --git a/src/rtsp/gst/shared.rs b/src/rtsp/gst/shared.rs index 685695fa..ad466b72 100644 --- a/src/rtsp/gst/shared.rs +++ b/src/rtsp/gst/shared.rs @@ -1,7 +1,7 @@ //! Data shared between the various //! components that manage a media stream pub use gstreamer_rtsp_server::gio::{TlsAuthenticationMode, TlsCertificate}; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::RwLock; #[derive(Copy, Clone, Debug, PartialEq, Eq)] @@ -23,6 +23,8 @@ pub(super) struct NeoMediaShared { pub(super) aud_format: RwLock, pub(super) number_of_clients: AtomicUsize, pub(super) buffer_ready: AtomicBool, + pub(super) buffer_size: AtomicUsize, + pub(super) use_smoothing: AtomicBool, } impl Default for NeoMediaShared { @@ -32,6 +34,26 @@ impl Default for NeoMediaShared { aud_format: RwLock::new(AudFormats::Unknown), number_of_clients: AtomicUsize::new(0), buffer_ready: AtomicBool::new(false), + buffer_size: AtomicUsize::new(100), + use_smoothing: AtomicBool::new(false), } } } + +impl NeoMediaShared { + pub(super) fn get_buffer_size(&self) -> usize { + self.buffer_size.load(Ordering::Relaxed) + } + + pub(super) fn set_buffer_size(&self, new_size: usize) { + self.buffer_size.store(new_size, Ordering::Relaxed) + } + + pub(super) fn get_use_smoothing(&self) -> bool { + self.use_smoothing.load(Ordering::Relaxed) + } + + pub(super) fn set_use_smoothing(&self, new_value: bool) { + self.use_smoothing.store(new_value, Ordering::Relaxed) + } +} diff --git a/src/rtsp/mod.rs b/src/rtsp/mod.rs index 97330081..dcb8ef06 100644 --- a/src/rtsp/mod.rs +++ b/src/rtsp/mod.rs @@ -279,6 +279,12 @@ async fn camera_main(camera: Camera) -> Result<(), CameraFailureKi }.with_context(|| format!("{}: Error while streaming", name)) .map_err(CameraFailureKind::Retry)?; + tags.iter() + .map(|tag| rtsp_thread.pause(tag)) + .collect::>() + .collect::>() + .await; + let paused = streaming .stop() .await @@ -346,5 +352,11 @@ async fn camera_main(camera: Camera) -> Result<(), CameraFailureKi .await .with_context(|| format!("{}: Could not start stream", name)) .map_err(CameraFailureKind::Retry)?; + + tags.iter() + .map(|tag| rtsp_thread.resume(tag)) + .collect::>() + .collect::>() + .await; } } diff --git a/src/rtsp/spring.rs b/src/rtsp/spring.rs index 4eaf09e6..5cf4de8c 100644 --- a/src/rtsp/spring.rs +++ b/src/rtsp/spring.rs @@ -64,6 +64,12 @@ impl Spring { self.target } + #[allow(dead_code)] + pub(crate) fn mod_value(&mut self, delta: f64) { + self.target += delta; + self.value += delta; + } + pub(crate) fn reset_to(&mut self, target: f64) { self.target = target; self.value = target; diff --git a/src/rtsp/states/connected.rs b/src/rtsp/states/connected.rs index 4fb580db..e7e03366 100644 --- a/src/rtsp/states/connected.rs +++ b/src/rtsp/states/connected.rs @@ -36,4 +36,13 @@ impl Camera { }, }) } + + #[allow(unused)] + pub(crate) async fn join(&self) -> Result<()> { + self.state + .camera + .join() + .await + .map_err(|e| anyhow::anyhow!("Camera join error: {:?}", e)) + } } diff --git a/src/rtsp/states/loggedin.rs b/src/rtsp/states/loggedin.rs index 5b194b2a..fef2b53b 100644 --- a/src/rtsp/states/loggedin.rs +++ b/src/rtsp/states/loggedin.rs @@ -96,4 +96,13 @@ impl Camera { pub(crate) fn get_camera(&self) -> &BcCamera { &self.state.camera } + + #[allow(unused)] + pub(crate) async fn join(&self) -> Result<()> { + self.state + .camera + .join() + .await + .map_err(|e| anyhow::anyhow!("Camera join error: {:?}", e)) + } } diff --git a/src/rtsp/states/shared.rs b/src/rtsp/states/shared.rs index 29062a40..22409f5c 100644 --- a/src/rtsp/states/shared.rs +++ b/src/rtsp/states/shared.rs @@ -85,7 +85,7 @@ impl Shared { pub(crate) async fn setup_streams(&self) -> Result<()> { for stream in self.streams.iter() { let tag = self.get_tag_for_stream(stream); - self.rtsp.create_stream(&tag).await?; + self.rtsp.create_stream(&tag, &self.config).await?; self.rtsp .add_permitted_roles(&tag, &self.permitted_users) .await?; diff --git a/src/rtsp/states/streaming.rs b/src/rtsp/states/streaming.rs index 1560a411..44b53f67 100644 --- a/src/rtsp/states/streaming.rs +++ b/src/rtsp/states/streaming.rs @@ -100,20 +100,26 @@ impl Camera { } pub(crate) async fn join(&self) -> Result<()> { - let mut locked_threads = self.state.set.write().await; - while let Some(res) = locked_threads.join_next().await { - match res { - Err(e) => { - locked_threads.abort_all(); - return Err(e.into()); - } - Ok(Err(e)) => { - locked_threads.abort_all(); - return Err(e); + tokio::select! { + v = async { + let mut locked_threads = self.state.set.write().await; + while let Some(res) = locked_threads.join_next().await { + match res { + Err(e) => { + locked_threads.abort_all(); + return Err(e.into()); + } + Ok(Err(e)) => { + locked_threads.abort_all(); + return Err(e); + } + Ok(Ok(())) => {} + } } - Ok(Ok(())) => {} - } - } + Ok(()) + } => v, + v = self.state.camera.join() => v.map_err(|e| anyhow!("Camera join error: {:?}", e)), + }?; Ok(()) }