Skip to content

Commit

Permalink
fix: Switch to futures-0.3 channels (#6283)
Browse files Browse the repository at this point in the history
* Use future03 channel

Signed-off-by: ktf <krunotf@gmail.com>

* Fuse channel

Signed-off-by: ktf <krunotf@gmail.com>

* Bump

Signed-off-by: ktf <krunotf@gmail.com>

* Add bench futures channel

Signed-off-by: ktf <krunotf@gmail.com>

* Fmt

Signed-off-by: ktf <krunotf@gmail.com>

Co-authored-by: Jesse Szwedko <jesse@szwedko.me>
  • Loading branch information
ktff and jszwedko authored Feb 7, 2021
1 parent c6c8732 commit 85c0a29
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 11 deletions.
28 changes: 27 additions & 1 deletion benches/isolated_buffering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use criterion::{criterion_group, BatchSize, Criterion, SamplingMode, Throughput}
use futures::{
compat::{Future01CompatExt, Stream01CompatExt},
stream::BoxStream,
StreamExt,
SinkExt, StreamExt,
};
use futures01::{stream, Sink, Stream};
use tempfile::tempdir;
Expand Down Expand Up @@ -70,6 +70,32 @@ fn benchmark_buffers(c: &mut Criterion) {
);
});

group.bench_function("channels/futures", |b| {
b.iter_batched(
|| {
let rt = runtime();

let (writer, mut reader) = futures::channel::mpsc::channel(100);

let read_handle = rt.spawn(async move { while reader.next().await.is_some() {} });

(rt, writer, read_handle)
},
|(mut rt, mut writer, read_handle)| {
let write_handle = rt.spawn(async move {
let mut stream = random_events(line_size).take(num_lines as u64).compat();
while let Some(e) = stream.next().await {
writer.send(e).await.unwrap();
}
});

rt.block_on(write_handle).unwrap();
rt.block_on(read_handle).unwrap();
},
BatchSize::SmallInput,
);
});

group.bench_function("channels/tokio", |b| {
b.iter_batched(
|| {
Expand Down
9 changes: 5 additions & 4 deletions src/buffers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{config::Resource, sink::BoundedSink, Event};
use crate::{config::Resource, Event};
#[cfg(feature = "leveldb")]
use futures::compat::{Sink01CompatExt, Stream01CompatExt};
use futures::{Sink, Stream};
use futures::{channel::mpsc, Sink, SinkExt, Stream};
use futures01::task::AtomicTask;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
Expand All @@ -16,7 +16,6 @@ use std::{
};
#[cfg(feature = "leveldb")]
use tokio::stream::StreamExt;
use tokio::sync::mpsc;

#[cfg(feature = "leveldb")]
pub mod disk;
Expand Down Expand Up @@ -72,7 +71,9 @@ impl BufferInputCloner {
pub fn get(&self) -> Box<dyn Sink<Event, Error = ()> + Send> {
match self {
BufferInputCloner::Memory(tx, when_full) => {
let inner = BoundedSink::new(tx.clone());
let inner = tx
.clone()
.sink_map_err(|error| error!(message = "Sender error.", %error));
if when_full == &WhenFull::DropNewest {
Box::new(DropWhenFull::new(inner))
} else {
Expand Down
9 changes: 3 additions & 6 deletions src/topology/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use std::{
sync::{Arc, Mutex},
};
use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire};
use tokio::{
sync::mpsc,
time::{timeout, Duration},
};
use tokio::time::{timeout, Duration};

pub struct Pieces {
pub inputs: HashMap<String, (buffers::BufferInputCloner, Vec<String>)>,
Expand Down Expand Up @@ -57,7 +54,7 @@ pub async fn build_pieces(
.iter()
.filter(|(name, _)| diff.sources.contains_new(&name))
{
let (tx, rx) = mpsc::channel(1000);
let (tx, rx) = tokio::sync::mpsc::channel(1000);
let pipeline = Pipeline::from_sender(tx, vec![]);

let typetag = source.source_type();
Expand Down Expand Up @@ -116,7 +113,7 @@ pub async fn build_pieces(
Ok(transform) => transform,
};

let (input_tx, input_rx) = mpsc::channel(100);
let (input_tx, input_rx) = futures::channel::mpsc::channel(100);
let input_tx = buffers::BufferInputCloner::Memory(input_tx, buffers::WhenFull::Block);

let (output, control) = Fanout::new();
Expand Down
1 change: 1 addition & 0 deletions src/transforms/util/runtime_transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ where
Box::pin(
input_rx
.map(Message::Process)
.fuse()
.into_future()
.map(move |(first, rest)| {
// The first message is always `Message::Init`.
Expand Down

0 comments on commit 85c0a29

Please sign in to comment.