diff --git a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs index da9bf8c206..b967923ffc 100644 --- a/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs +++ b/crates/bifrost/src/loglets/local_loglet/log_store_writer.rs @@ -9,15 +9,17 @@ // by the Apache License, Version 2.0. use std::sync::Arc; +use std::time::Duration; use bytes::{Bytes, BytesMut}; +use futures::StreamExt as FutureStreamExt; use metrics::histogram; use restate_rocksdb::{IoMode, Priority, RocksDb}; use rocksdb::{BoundColumnFamily, WriteBatch}; use smallvec::SmallVec; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::StreamExt; +use tokio_stream::StreamExt as TokioStreamExt; use tracing::{debug, error, trace, warn}; use restate_core::{cancellation_watcher, task_center, ShutdownError, TaskKind}; @@ -73,6 +75,8 @@ impl LogStoreWriter { ) -> Result { // big enough to allows a second full batch to queue up while the existing one is being processed let batch_size = std::cmp::max(1, updateable.load().writer_batch_commit_count); + // leave twice as much space in the the channel to ensure we can enqueue up-to a full batch in + // the backlog while we process this one. let (sender, receiver) = mpsc::channel(batch_size * 2); task_center().spawn_child( @@ -82,9 +86,19 @@ impl LogStoreWriter { async move { let opts = updateable.load(); let batch_size = std::cmp::max(1, opts.writer_batch_commit_count); - let batch_duration = opts.writer_batch_commit_duration.into(); - let receiver = - ReceiverStream::new(receiver).chunks_timeout(batch_size, batch_duration); + let batch_duration: Duration = opts.writer_batch_commit_duration.into(); + // We don't want to use chunks_timeout if time-based batching is disabled, why? + // because even if duration is zero, tokio's timer resolution is 1ms which means + // that we will delay every batch by 1ms for no reason. + let receiver = if batch_duration == Duration::ZERO { + ReceiverStream::new(receiver) + .ready_chunks(batch_size) + .boxed() + } else { + ReceiverStream::new(receiver) + .chunks_timeout(batch_size, batch_duration) + .boxed() + }; tokio::pin!(receiver); loop { @@ -93,7 +107,7 @@ impl LogStoreWriter { _ = cancellation_watcher() => { break; } - Some(cmds) = receiver.next() => { + Some(cmds) = TokioStreamExt::next(&mut receiver) => { let opts = updateable.load(); self.handle_commands(opts, cmds).await; } @@ -143,6 +157,8 @@ impl LogStoreWriter { } } + // todo: future optimization. pre-merge all updates within a batch before writing + // the merge to rocksdb. if let Some(logstate_updates) = command.log_state_updates { Self::update_log_state( &metadata_cf, diff --git a/crates/types/src/config/bifrost.rs b/crates/types/src/config/bifrost.rs index f24b4602d7..5466886447 100644 --- a/crates/types/src/config/bifrost.rs +++ b/crates/types/src/config/bifrost.rs @@ -95,7 +95,7 @@ impl Default for LocalLogletOptions { batch_wal_flushes: true, sync_wal_before_ack: true, writer_batch_commit_count: 500, - writer_batch_commit_duration: Duration::from_nanos(5).into(), + writer_batch_commit_duration: Duration::ZERO.into(), } } }