diff --git a/crates/store/re_video/examples/frames.rs b/crates/store/re_video/examples/frames.rs index 663b74df8d07..a27ae350681c 100644 --- a/crates/store/re_video/examples/frames.rs +++ b/crates/store/re_video/examples/frames.rs @@ -27,6 +27,10 @@ fn main() { let video = std::fs::read(video_path).expect("failed to read video"); let video = re_video::VideoData::load_mp4(&video).expect("failed to load video"); + let sync_decoder = Box::new( + re_video::decode::av1::SyncDav1dDecoder::new().expect("Failed to start AV1 decoder"), + ); + println!( "{} {}x{}", video.gops.len(), @@ -38,14 +42,16 @@ fn main() { progress.enable_steady_tick(Duration::from_millis(100)); let frames = Arc::new(Mutex::new(Vec::new())); - let mut decoder = re_video::decode::av1::Decoder::new("debug_name".to_owned(), { + let on_output = { let frames = frames.clone(); let progress = progress.clone(); move |frame| { progress.inc(1); frames.lock().push(frame); } - }); + }; + let mut decoder = + re_video::decode::AsyncDecoder::new("debug_name".to_owned(), sync_decoder, on_output); let start = Instant::now(); for sample in &video.samples { diff --git a/crates/store/re_video/src/decode/async_decoder.rs b/crates/store/re_video/src/decode/async_decoder.rs new file mode 100644 index 000000000000..bdabde53a2c6 --- /dev/null +++ b/crates/store/re_video/src/decode/async_decoder.rs @@ -0,0 +1,163 @@ +use std::sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, +}; + +use crossbeam::channel::{unbounded, Receiver, Sender}; + +use super::{Chunk, Frame, OutputCallback, Result, SyncDecoder}; + +enum Command { + Chunk(Chunk), + Flush { on_done: Sender<()> }, + Reset, + Stop, +} + +#[derive(Clone)] +struct Comms { + /// Set when it is time to die + should_stop: Arc<AtomicBool>, + + /// Incremented on each call to [`AsyncDecoder::reset`]. + /// Decremented each time the decoder thread receives [`Command::Reset`]. + num_outstanding_resets: Arc<AtomicU64>, +} + +impl Default for Comms { + fn default() -> Self { + Self { + should_stop: Arc::new(AtomicBool::new(false)), + num_outstanding_resets: Arc::new(AtomicU64::new(0)), + } + } +} + +/// Runs a [`SyncDecoder`] in a background thread, for non-blocking video decoding. +pub struct AsyncDecoder { + /// Where the decoding happens + _thread: std::thread::JoinHandle<()>, + + /// Commands sent to the decoder thread. + command_tx: Sender<Command>, + + /// Instant communication to the decoder thread (circumventing the command queue). + comms: Comms, +} + +impl AsyncDecoder { + pub fn new( + debug_name: String, + mut sync_decoder: Box<dyn SyncDecoder + Send>, + on_output: impl Fn(Result<Frame>) + Send + Sync + 'static, + ) -> Self { + re_tracing::profile_function!(); + + let (command_tx, command_rx) = unbounded(); + let comms = Comms::default(); + + let thread = std::thread::Builder::new() + .name("av1_decoder".into()) + .spawn({ + let comms = comms.clone(); + move || { + econtext::econtext_data!("Video", debug_name.clone()); + + decoder_thread(sync_decoder.as_mut(), &comms, &command_rx, &on_output); + re_log::debug!("Closing decoder thread for {debug_name}"); + } + }) + .expect("failed to spawn decoder thread"); + + Self { + _thread: thread, + command_tx, + comms, + } + } + + // NOTE: The interface is all `&mut self` to avoid certain types of races. + pub fn decode(&mut self, chunk: Chunk) { + re_tracing::profile_function!(); + self.command_tx.send(Command::Chunk(chunk)).ok(); + } + + /// Resets the decoder. + /// + /// This does not block, all chunks sent to `decode` before this point will be discarded. + // NOTE: The interface is all `&mut self` to avoid certain types of races. + pub fn reset(&mut self) { + re_tracing::profile_function!(); + + // Increment resets first… + self.comms + .num_outstanding_resets + .fetch_add(1, Ordering::Release); + + // …so it is visible on the decoder thread when it gets the `Reset` command. + self.command_tx.send(Command::Reset).ok(); + } + + /// Blocks until all pending frames have been decoded. + // NOTE: The interface is all `&mut self` to avoid certain types of races. + pub fn flush(&mut self) { + re_tracing::profile_function!(); + let (tx, rx) = crossbeam::channel::bounded(0); + self.command_tx.send(Command::Flush { on_done: tx }).ok(); + rx.recv().ok(); + } +} + +impl Drop for AsyncDecoder { + fn drop(&mut self) { + re_tracing::profile_function!(); + + // Set `should_stop` first… + self.comms.should_stop.store(true, Ordering::Release); + + // …so it is visible on the decoder thread when it gets the `Stop` command. + self.command_tx.send(Command::Stop).ok(); + + // NOTE: we don't block here. The decoder thread will finish soon enough. + } +} + +fn decoder_thread( + decoder: &mut dyn SyncDecoder, + comms: &Comms, + command_rx: &Receiver<Command>, + on_output: &OutputCallback, +) { + #![allow(clippy::debug_assert_with_mut_call)] + + while let Ok(command) = command_rx.recv() { + if comms.should_stop.load(Ordering::Acquire) { + re_log::debug!("Should stop"); + return; + } + + // If we're waiting for a reset we should ignore all other commands until we receive it. + let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire); + + match command { + Command::Chunk(chunk) => { + if !has_outstanding_reset { + decoder.submit_chunk(&comms.should_stop, chunk, on_output); + } + } + Command::Flush { on_done } => { + on_done.send(()).ok(); + } + Command::Reset => { + decoder.reset(); + comms.num_outstanding_resets.fetch_sub(1, Ordering::Release); + } + Command::Stop => { + re_log::debug!("Stop"); + return; + } + } + } + + re_log::debug!("Disconnected"); +} diff --git a/crates/store/re_video/src/decode/av1.rs b/crates/store/re_video/src/decode/av1.rs index 44dc2a2203a3..320d523535ea 100644 --- a/crates/store/re_video/src/decode/av1.rs +++ b/crates/store/re_video/src/decode/av1.rs @@ -1,210 +1,52 @@ //! AV1 support. -use std::sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, - Arc, -}; +use std::sync::atomic::{AtomicBool, Ordering}; -use crossbeam::channel::{unbounded, Receiver, Sender}; use dav1d::{PixelLayout, PlanarImageComponent}; use crate::Time; -use super::{Chunk, Frame, PixelFormat}; +use super::{Chunk, Error, Frame, OutputCallback, PixelFormat, Result, SyncDecoder}; -#[derive(thiserror::Error, Debug)] -pub enum Error { - /// An error occrurred during initialization of the decoder. - /// - /// No further output will come. - #[error("Error initializing decoder: {0}")] - Initialization(dav1d::Error), - - #[error("Decoding error: {0}")] - Dav1d(dav1d::Error), -} - -pub type Result<T = (), E = Error> = std::result::Result<T, E>; - -enum Command { - Chunk(Chunk), - Flush { on_done: Sender<()> }, - Reset, - Stop, -} - -#[derive(Clone)] -struct Comms { - /// Set when it is time to die - should_stop: Arc<AtomicBool>, - - /// Incremented on each call to [`Decoder::reset`]. - /// Decremented each time the decoder thread receives [`Command::Reset`]. - num_outstanding_resets: Arc<AtomicU64>, +pub struct SyncDav1dDecoder { + decoder: dav1d::Decoder, } -impl Default for Comms { - fn default() -> Self { - Self { - should_stop: Arc::new(AtomicBool::new(false)), - num_outstanding_resets: Arc::new(AtomicU64::new(0)), - } - } -} - -/// AV1 software decoder. -pub struct Decoder { - /// Where the decoding happens - _thread: std::thread::JoinHandle<()>, - - /// Commands sent to the decoder thread. - command_tx: Sender<Command>, - - /// Instant communication to the decoder thread (circumventing the command queue). - comms: Comms, -} - -impl Decoder { - pub fn new( - debug_name: String, - on_output: impl Fn(Result<Frame>) + Send + Sync + 'static, - ) -> Self { +impl SyncDav1dDecoder { + pub fn new() -> Result<Self> { re_tracing::profile_function!(); - let (command_tx, command_rx) = unbounded(); - let comms = Comms::default(); - - let thread = std::thread::Builder::new() - .name("av1_decoder".into()) - .spawn({ - let comms = comms.clone(); - move || { - econtext::econtext_data!("Video", debug_name.clone()); - decoder_thread(&comms, &command_rx, &on_output); - re_log::debug!("Closing decoder thread for {debug_name}"); - } - }) - .expect("failed to spawn decoder thread"); - - Self { - _thread: thread, - command_tx, - comms, - } - } - // NOTE: The interface is all `&mut self` to avoid certain types of races. - pub fn decode(&mut self, chunk: Chunk) { - re_tracing::profile_function!(); - self.command_tx.send(Command::Chunk(chunk)).ok(); - } + // See https://videolan.videolan.me/dav1d/structDav1dSettings.html for settings docs + let mut settings = dav1d::Settings::new(); - /// Resets the decoder. - /// - /// This does not block, all chunks sent to `decode` before this point will be discarded. - // NOTE: The interface is all `&mut self` to avoid certain types of races. - pub fn reset(&mut self) { - re_tracing::profile_function!(); + // Prioritize delivering video frames, not error messages. + settings.set_strict_std_compliance(false); - // Increment resets first… - self.comms - .num_outstanding_resets - .fetch_add(1, Ordering::Release); + // Set to 1 for low-latency decoding. + settings.set_max_frame_delay(1); - // …so it is visible on the decoder thread when it gets the `Reset` command. - self.command_tx.send(Command::Reset).ok(); - } + let decoder = dav1d::Decoder::with_settings(&settings)?; - /// Blocks until all pending frames have been decoded. - // NOTE: The interface is all `&mut self` to avoid certain types of races. - pub fn flush(&mut self) { - re_tracing::profile_function!(); - let (tx, rx) = crossbeam::channel::bounded(0); - self.command_tx.send(Command::Flush { on_done: tx }).ok(); - rx.recv().ok(); + Ok(Self { decoder }) } } -impl Drop for Decoder { - fn drop(&mut self) { +impl SyncDecoder for SyncDav1dDecoder { + fn submit_chunk(&mut self, should_stop: &AtomicBool, chunk: Chunk, on_output: &OutputCallback) { re_tracing::profile_function!(); - - // Set `should_stop` first… - self.comms.should_stop.store(true, Ordering::Release); - - // …so it is visible on the decoder thread when it gets the `Stop` command. - self.command_tx.send(Command::Stop).ok(); - - // NOTE: we don't block here. The decoder thread will finish soon enough. + submit_chunk(&mut self.decoder, chunk, on_output); + output_frames(should_stop, &mut self.decoder, on_output); } -} - -type OutputCallback = dyn Fn(Result<Frame>) + Send + Sync; - -fn create_decoder() -> Result<dav1d::Decoder, dav1d::Error> { - re_tracing::profile_function!(); - - // See https://videolan.videolan.me/dav1d/structDav1dSettings.html for settings docs - let mut settings = dav1d::Settings::new(); - - // Prioritize delivering video frames, not error messages. - settings.set_strict_std_compliance(false); - - // Set to 1 for low-latency decoding. - settings.set_max_frame_delay(1); - dav1d::Decoder::with_settings(&settings) -} - -fn decoder_thread(comms: &Comms, command_rx: &Receiver<Command>, on_output: &OutputCallback) { - #![allow(clippy::debug_assert_with_mut_call)] - - let mut decoder = match create_decoder() { - Err(err) => { - on_output(Err(Error::Initialization(err))); - return; - } - Ok(decoder) => decoder, - }; - - while let Ok(command) = command_rx.recv() { - if comms.should_stop.load(Ordering::Acquire) { - re_log::debug!("Should stop"); - return; - } - - // If we're waiting for a reset we should ignore all other commands until we receive it. - let has_outstanding_reset = 0 < comms.num_outstanding_resets.load(Ordering::Acquire); - - match command { - Command::Chunk(chunk) => { - if !has_outstanding_reset { - submit_chunk(&mut decoder, chunk, on_output); - // NOTE: in all video's I've tested, there is one frame per chunk - let _num_frames = output_frames(&comms.should_stop, &mut decoder, on_output); - } - } - Command::Flush { on_done } => { - debug_assert!(matches!(decoder.get_picture(), Err(dav1d::Error::Again)), - "There should be no pending pictures, since we output them directly after submitting a chunk."); - - on_done.send(()).ok(); - } - Command::Reset => { - decoder.flush(); + /// Clear and reset everything + fn reset(&mut self) { + re_tracing::profile_function!(); - debug_assert!(matches!(decoder.get_picture(), Err(dav1d::Error::Again)), - "There should be no pending pictures, since we output them directly after submitting a chunk."); + self.decoder.flush(); - comms.num_outstanding_resets.fetch_sub(1, Ordering::Release); - } - Command::Stop => { - re_log::debug!("Stop"); - return; - } - } + debug_assert!(matches!(self.decoder.get_picture(), Err(dav1d::Error::Again)), + "There should be no pending pictures, since we output them directly after submitting a chunk."); } - - re_log::debug!("Disconnected"); } fn submit_chunk(decoder: &mut dav1d::Decoder, chunk: Chunk, on_output: &OutputCallback) { diff --git a/crates/store/re_video/src/decode/mod.rs b/crates/store/re_video/src/decode/mod.rs index 3fa0da48803e..5b15fc30a2d8 100644 --- a/crates/store/re_video/src/decode/mod.rs +++ b/crates/store/re_video/src/decode/mod.rs @@ -4,8 +4,39 @@ #[cfg(not(target_arch = "wasm32"))] pub mod av1; +#[cfg(not(target_arch = "wasm32"))] +pub mod async_decoder; + +#[cfg(not(target_arch = "wasm32"))] +pub use async_decoder::AsyncDecoder; + +use std::sync::atomic::AtomicBool; + use crate::Time; +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[cfg(feature = "av1")] + #[cfg(not(target_arch = "wasm32"))] + #[error("dav1d: {0}")] + Dav1d(#[from] dav1d::Error), +} + +pub type Result<T = (), E = Error> = std::result::Result<T, E>; + +pub type OutputCallback = dyn Fn(Result<Frame>) + Send + Sync; + +/// Blocking decoder of video chunks. +pub trait SyncDecoder { + /// Submit some work and read the results. + /// + /// Stop early if `should_stop` is `true` or turns `true`. + fn submit_chunk(&mut self, should_stop: &AtomicBool, chunk: Chunk, on_output: &OutputCallback); + + /// Clear and reset everything + fn reset(&mut self) {} +} + /// One chunk of encoded video data; usually one frame. pub struct Chunk { pub data: Vec<u8>, diff --git a/crates/store/re_video/src/lib.rs b/crates/store/re_video/src/lib.rs index 106bc84f0222..a1d6340fde3f 100644 --- a/crates/store/re_video/src/lib.rs +++ b/crates/store/re_video/src/lib.rs @@ -7,10 +7,6 @@ pub use decode::{Chunk, Frame, PixelFormat}; pub use demux::{Config, Sample, VideoData, VideoLoadError}; pub use re_mp4::{TrackId, TrackKind}; -#[cfg(feature = "av1")] -#[cfg(not(target_arch = "wasm32"))] -pub use decode::av1; - use ordered_float::OrderedFloat; #[derive(Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] diff --git a/crates/viewer/re_renderer/src/video/decoder/mod.rs b/crates/viewer/re_renderer/src/video/decoder/mod.rs index 06270b5c68f4..6f19ff5c40f8 100644 --- a/crates/viewer/re_renderer/src/video/decoder/mod.rs +++ b/crates/viewer/re_renderer/src/video/decoder/mod.rs @@ -1,9 +1,8 @@ #[cfg(target_arch = "wasm32")] mod web; -#[cfg(feature = "video_av1")] #[cfg(not(target_arch = "wasm32"))] -mod native_av1; +mod native_decoder; use std::{ops::Range, sync::Arc, time::Duration}; @@ -138,7 +137,10 @@ impl VideoDecoder { if cfg!(debug_assertions) { return Err(DecodingError::NoNativeDebug); // because debug builds of rav1d are so slow } else { - let decoder = native_av1::Av1VideoDecoder::new(debug_name)?; + let av1_decoder = re_video::decode::av1::SyncDav1dDecoder::new() + .map_err(|err| DecodingError::StartDecoder(err.to_string()))?; + + let decoder = native_decoder::NativeDecoder::new(debug_name, Box::new(av1_decoder))?; return Ok(Self::from_chunk_decoder(render_ctx, data, decoder)); }; } else { diff --git a/crates/viewer/re_renderer/src/video/decoder/native_av1.rs b/crates/viewer/re_renderer/src/video/decoder/native_decoder.rs similarity index 90% rename from crates/viewer/re_renderer/src/video/decoder/native_av1.rs rename to crates/viewer/re_renderer/src/video/decoder/native_decoder.rs index c7c7bc9b78b9..2e5176692524 100644 --- a/crates/viewer/re_renderer/src/video/decoder/native_av1.rs +++ b/crates/viewer/re_renderer/src/video/decoder/native_decoder.rs @@ -18,14 +18,17 @@ struct DecoderOutput { error: Option<TimedDecodingError>, } -/// Native AV1 decoder -pub struct Av1VideoDecoder { - decoder: re_video::av1::Decoder, +/// Native video decoder +pub struct NativeDecoder { + decoder: re_video::decode::AsyncDecoder, decoder_output: Arc<Mutex<DecoderOutput>>, } -impl Av1VideoDecoder { - pub fn new(debug_name: String) -> Result<Self, DecodingError> { +impl NativeDecoder { + pub fn new( + debug_name: String, + sync_decoder: Box<dyn re_video::decode::SyncDecoder + Send>, + ) -> Result<Self, DecodingError> { re_tracing::profile_function!(); let decoder_output = Arc::new(Mutex::new(DecoderOutput::default())); @@ -33,7 +36,7 @@ impl Av1VideoDecoder { let on_output = { let decoder_output = decoder_output.clone(); let debug_name = debug_name.clone(); - move |frame: re_video::av1::Result<Frame>| match frame { + move |frame: re_video::decode::Result<Frame>| match frame { Ok(frame) => { re_log::trace!("Decoded frame at {:?}", frame.timestamp); let mut output = decoder_output.lock(); @@ -52,7 +55,8 @@ impl Av1VideoDecoder { } } }; - let decoder = re_video::av1::Decoder::new(debug_name, on_output); + + let decoder = re_video::decode::AsyncDecoder::new(debug_name, sync_decoder, on_output); Ok(Self { decoder, @@ -61,7 +65,7 @@ impl Av1VideoDecoder { } } -impl VideoChunkDecoder for Av1VideoDecoder { +impl VideoChunkDecoder for NativeDecoder { /// Start decoding the given chunk. fn decode(&mut self, chunk: Chunk, _is_keyframe: bool) -> Result<(), DecodingError> { self.decoder.decode(chunk); diff --git a/crates/viewer/re_renderer/src/video/mod.rs b/crates/viewer/re_renderer/src/video/mod.rs index a3e43a99a1eb..5cc072758ce2 100644 --- a/crates/viewer/re_renderer/src/video/mod.rs +++ b/crates/viewer/re_renderer/src/video/mod.rs @@ -13,6 +13,9 @@ use crate::{resource_managers::GpuTexture2D, RenderContext}; // TODO(jan, andreas): These errors are for the most part specific to the web decoder right now. #[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] pub enum DecodingError { + #[error("Failed to start decoder")] + StartDecoder(String), + #[error("The decoder is lagging behind")] EmptyBuffer,