Skip to content

Commit

Permalink
Align batch logger API closer to logging
Browse files Browse the repository at this point in the history
`publish_batch` accepts a mutable reference to an option instead of a
mutable reference to a container.

Signed-off-by: Moritz Hoffmann <antiguru@gmail.com>
  • Loading branch information
antiguru committed Jan 13, 2025
1 parent 77f96ae commit 56ec0d7
Showing 1 changed file with 8 additions and 7 deletions.
15 changes: 8 additions & 7 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ pub type TimelyProgressLogger = crate::logging_core::Logger<TimelyProgressEventB
use std::time::Duration;
use columnar::Columnar;
use serde::{Deserialize, Serialize};

use crate::Container;
use crate::container::CapacityContainerBuilder;
use crate::dataflow::operators::capture::{Event, EventPusher};

/// Logs events as a timely stream, with progress statements.
pub struct BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>> {
// None when the logging stream is closed
pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
time: Duration,
event_pusher: P,
_phantom: ::std::marker::PhantomData<T>,
_phantom: ::std::marker::PhantomData<C>,
}

impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>> {
impl<P, C> BatchLogger<P, C> where P: EventPusher<Duration, C>, C: Container {
/// Creates a new batch logger.
pub fn new(event_pusher: P) -> Self {
BatchLogger {
Expand All @@ -35,8 +36,8 @@ impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>>
}
}
/// Publishes a batch of logged events and advances the capability.
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Vec<(Duration, T)>) {
if !data.is_empty() {
pub fn publish_batch(&mut self, &time: &Duration, data: &mut Option<C>) {
if let Some(data) = data {
self.event_pusher.push(Event::Messages(self.time, std::mem::take(data)));
}
if self.time < time {
Expand All @@ -47,7 +48,7 @@ impl<T, P> BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>>
self.time = time;
}
}
impl<T, P> Drop for BatchLogger<T, P> where P: EventPusher<Duration, Vec<(Duration, T)>> {
impl<P, C> Drop for BatchLogger<P, C> where P: EventPusher<Duration, C> {
fn drop(&mut self) {
self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
}
Expand Down

0 comments on commit 56ec0d7

Please sign in to comment.