Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion_stream): improved stream developer experience #81

Merged
merged 2 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions swiftide/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tracing = { version = "0.1.40", features = ["log"] }
strum = "0.26.2"
strum_macros = "0.26.4"
num_cpus = "1.16.0"
pin-project-lite = "0.2"

# Integrations
async-openai = { version = "0.23.2", optional = true }
Expand Down
50 changes: 30 additions & 20 deletions swiftide/src/ingestion/ingestion_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Default for IngestionPipeline {
/// Creates a default `IngestionPipeline` with an empty stream, no storage, and a concurrency level equal to the number of CPUs.
fn default() -> Self {
Self {
stream: Box::pin(futures_util::stream::empty()),
stream: IngestionStream::empty(),
storage: Default::default(),
concurrency: num_cpus::get(),
}
Expand All @@ -47,7 +47,7 @@ impl IngestionPipeline {
pub fn from_loader(loader: impl Loader + 'static) -> Self {
let stream = loader.into_stream();
Self {
stream: stream.boxed(),
stream,
..Default::default()
}
}
Expand Down Expand Up @@ -95,7 +95,8 @@ impl IngestionPipeline {
}
.instrument(span)
})
.boxed();
.boxed()
.into();
self
}

Expand All @@ -120,7 +121,8 @@ impl IngestionPipeline {
async move { transformer.transform_node(node).await }.instrument(span)
})
.try_buffer_unordered(concurrency)
.boxed();
.boxed()
.into();

self
}
Expand Down Expand Up @@ -154,7 +156,7 @@ impl IngestionPipeline {
})
.try_buffer_unordered(concurrency) // First get the streams from each future
.try_flatten_unordered(concurrency) // Then flatten all the streams back into one
.boxed();
.boxed().into();
self
}

Expand All @@ -180,7 +182,8 @@ impl IngestionPipeline {
})
.try_buffer_unordered(concurrency)
.try_flatten_unordered(concurrency)
.boxed();
.boxed()
.into();

self
}
Expand Down Expand Up @@ -210,7 +213,7 @@ impl IngestionPipeline {
})
.try_buffer_unordered(self.concurrency)
.try_flatten_unordered(self.concurrency)
.boxed();
.boxed().into();
} else {
self.stream = self
.stream
Expand All @@ -222,7 +225,8 @@ impl IngestionPipeline {
async move { storage.store(node).await }.instrument(span)
})
.try_buffer_unordered(self.concurrency)
.boxed();
.boxed()
.into();
}

self
Expand All @@ -232,7 +236,9 @@ impl IngestionPipeline {
///
/// Useful for rate limiting the ingestion pipeline. Uses tokio_stream::StreamExt::throttle internally which has a granualarity of 1ms.
pub fn throttle(mut self, duration: impl Into<Duration>) -> Self {
self.stream = tokio_stream::StreamExt::throttle(self.stream, duration.into()).boxed();
self.stream = tokio_stream::StreamExt::throttle(self.stream, duration.into())
.boxed()
.into();
self
}

Expand All @@ -249,7 +255,8 @@ impl IngestionPipeline {
Err(_e) => None,
}
})
.boxed();
.boxed()
.into();
self
}

Expand All @@ -260,7 +267,8 @@ impl IngestionPipeline {
self.stream = self
.stream
.inspect(|result| tracing::debug!("Processing result: {:?}", result))
.boxed();
.boxed()
.into();
self
}

Expand All @@ -271,7 +279,8 @@ impl IngestionPipeline {
self.stream = self
.stream
.inspect_err(|e| tracing::error!("Error processing node: {:?}", e))
.boxed();
.boxed()
.into();
self
}

Expand All @@ -282,7 +291,8 @@ impl IngestionPipeline {
self.stream = self
.stream
.inspect_ok(|node| tracing::debug!("Processed node: {:?}", node))
.boxed();
.boxed()
.into();
self
}

Expand Down Expand Up @@ -333,7 +343,6 @@ mod tests {
use super::*;
use crate::ingestion::IngestionNode;
use crate::traits::*;
use futures_util::stream;
use mockall::Sequence;

/// Tests a simple run of the ingestion pipeline.
Expand All @@ -351,7 +360,7 @@ mod tests {
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| Box::pin(stream::iter(vec![Ok(IngestionNode::default())])));
.returning(|| vec![Ok(IngestionNode::default())].into());

transformer.expect_transform_node().returning(|mut node| {
node.chunk = "transformed".to_string();
Expand All @@ -363,7 +372,7 @@ mod tests {
.expect_batch_transform()
.times(1)
.in_sequence(&mut seq)
.returning(|nodes| Box::pin(stream::iter(nodes.into_iter().map(Ok))));
.returning(|nodes| IngestionStream::iter(nodes.into_iter().map(Ok)));
batch_transformer.expect_concurrency().returning(|| None);

chunker
Expand All @@ -377,7 +386,7 @@ mod tests {
node.chunk = format!("transformed_chunk_{}", i);
nodes.push(Ok(node));
}
Box::pin(stream::iter(nodes))
nodes.into()
});
chunker.expect_concurrency().returning(|| None);

Expand Down Expand Up @@ -409,7 +418,7 @@ mod tests {
.expect_into_stream()
.times(1)
.in_sequence(&mut seq)
.returning(|| Box::pin(stream::iter(vec![Ok(IngestionNode::default())])));
.returning(|| vec![Ok(IngestionNode::default())].into());
transformer
.expect_transform_node()
.returning(|_node| Err(anyhow::anyhow!("Error transforming node")));
Expand All @@ -435,11 +444,12 @@ mod tests {
.times(1)
.in_sequence(&mut seq)
.returning(|| {
Box::pin(stream::iter(vec![
vec![
Ok(IngestionNode::default()),
Ok(IngestionNode::default()),
Ok(IngestionNode::default()),
]))
]
.into()
});
transformer
.expect_transform_node()
Expand Down
98 changes: 61 additions & 37 deletions swiftide/src/ingestion/ingestion_stream.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,68 @@
#![allow(clippy::from_over_into)]
#![cfg(not(tarpaulin_include))]
//! This module defines the `IngestionStream` type, which is used for handling asynchronous streams of `IngestionNode` items in the ingestion pipeline.
//!
//! The `IngestionStream` type is a pinned, boxed, dynamically-dispatched stream that yields `Result<IngestionNode>` items. This type is essential for managing
//! and processing large volumes of data asynchronously, ensuring efficient and scalable ingestion workflows.

use anyhow::Result;
use futures_util::stream::Stream;
use futures_util::stream::{self, Stream};
use pin_project_lite::pin_project;
use std::pin::Pin;

use super::IngestionNode;

/// A type alias for a pinned, boxed, dynamically-dispatched stream of `IngestionNode` items.
///
/// This type is used in the ingestion pipeline to handle asynchronous streams of data. Each item in the stream is a `Result<IngestionNode>`,
/// allowing for error handling during the ingestion process. The `Send` trait is implemented to ensure that the stream can be safely sent
/// across threads, enabling concurrent processing.
///
/// # Type Definition
/// - `Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>`
///
/// # Components
/// - `Pin`: Ensures that the memory location of the stream is fixed, which is necessary for certain asynchronous operations.
/// - `Box<dyn Stream<Item = Result<IngestionNode>>>`: A heap-allocated, dynamically-dispatched stream that yields `Result<IngestionNode>` items.
/// - `Send`: Ensures that the stream can be sent across thread boundaries, facilitating concurrent processing.
///
/// # Usage
/// The `IngestionStream` type is typically used in the ingestion pipeline to process data asynchronously. It allows for efficient handling
/// of large volumes of data by leveraging Rust's asynchronous capabilities.
///
/// # Error Handling
/// Each item in the stream is a `Result<IngestionNode>`, which means that errors can be propagated and handled during the ingestion process.
/// This design allows for robust error handling and ensures that the ingestion pipeline can gracefully handle failures.
///
/// # Performance Considerations
/// The use of `Pin` and `Box` ensures that the stream's memory location is fixed and heap-allocated, respectively. This design choice is
/// crucial for asynchronous operations that require stable memory addresses. Additionally, the `Send` trait enables concurrent processing,
/// which can significantly improve performance in multi-threaded environments.
///
/// # Edge Cases
/// - The stream may yield errors (`Err` variants) instead of valid `IngestionNode` items. These errors should be handled appropriately
/// to ensure the robustness of the ingestion pipeline.
/// - The stream must be pinned to ensure that its memory location remains fixed, which is necessary for certain asynchronous operations.

pub type IngestionStream = Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>;
pub use futures_util::{StreamExt, TryStreamExt};

// We need to inform the compiler that `inner` is pinned as well
pin_project! {
/// An asynchronous stream of `IngestionNode` items.
///
/// Wraps an internal stream of `Result<IngestionNode>` items.
///
/// Streams, iterators and vectors of `Result<IngestionNode>` can be converted into an `IngestionStream`.
pub struct IngestionStream {
#[pin]
inner: Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>>,
}
}

impl Stream for IngestionStream {
type Item = Result<IngestionNode>;

fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.project();
this.inner.poll_next(cx)
}
}

impl Into<IngestionStream> for Vec<Result<IngestionNode>> {
fn into(self) -> IngestionStream {
IngestionStream::iter(self)
}
}

impl Into<IngestionStream> for Pin<Box<dyn Stream<Item = Result<IngestionNode>> + Send>> {
fn into(self) -> IngestionStream {
IngestionStream { inner: self }
}
}

impl IngestionStream {
pub fn empty() -> Self {
IngestionStream {
inner: stream::empty().boxed(),
}
}

// NOTE: Can we really guarantee that the iterator will outlive the stream?
pub fn iter<I>(iter: I) -> Self
where
I: IntoIterator<Item = Result<IngestionNode>> + Send + 'static,
<I as IntoIterator>::IntoIter: Send,
{
IngestionStream {
inner: stream::iter(iter).boxed(),
}
}
}
7 changes: 3 additions & 4 deletions swiftide/src/integrations/qdrant/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use anyhow::Result;
use async_trait::async_trait;
use futures_util::{stream, StreamExt};

use crate::{
ingestion::{IngestionNode, IngestionStream},
Expand Down Expand Up @@ -82,7 +81,7 @@ impl Persist for Qdrant {
.collect::<Result<Vec<_>>>();

if points.is_err() {
return stream::iter(vec![Err(points.unwrap_err())]).boxed();
return vec![Err(points.unwrap_err())].into();
}

let points = points.unwrap();
Expand All @@ -93,9 +92,9 @@ impl Persist for Qdrant {
.await;

if result.is_ok() {
stream::iter(nodes.into_iter().map(Ok)).boxed()
IngestionStream::iter(nodes.into_iter().map(Ok))
} else {
stream::iter(vec![Err(result.unwrap_err())]).boxed()
vec![Err(result.unwrap_err())].into()
}
}
}
9 changes: 4 additions & 5 deletions swiftide/src/integrations/redis/persist.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use anyhow::{Context as _, Result};
use async_trait::async_trait;
use futures_util::{stream, StreamExt};

use crate::{
ingestion::{IngestionNode, IngestionStream},
Expand Down Expand Up @@ -58,7 +57,7 @@ impl Persist for Redis {
.collect::<Result<Vec<_>>>();

if args.is_err() {
return stream::iter(vec![Err(args.unwrap_err())]).boxed();
return vec![Err(args.unwrap_err())].into();
}

let args = args.unwrap();
Expand All @@ -70,12 +69,12 @@ impl Persist for Redis {
.context("Error persisting to redis");

if result.is_ok() {
stream::iter(nodes.into_iter().map(Ok)).boxed()
IngestionStream::iter(nodes.into_iter().map(Ok))
} else {
stream::iter(vec![Err(result.unwrap_err())]).boxed()
IngestionStream::iter([Err(result.unwrap_err())])
}
} else {
stream::iter(vec![Err(anyhow::anyhow!("Failed to connect to Redis"))]).boxed()
IngestionStream::iter([Err(anyhow::anyhow!("Failed to connect to Redis"))])
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions swiftide/src/loaders/file_loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{ingestion::IngestionNode, ingestion::IngestionStream, Loader};
use futures_util::{stream, StreamExt};
use std::path::PathBuf;

/// The `FileLoader` struct is responsible for loading files from a specified directory,
Expand Down Expand Up @@ -103,7 +102,7 @@ impl Loader for FileLoader {
})
});

stream::iter(file_paths).boxed()
IngestionStream::iter(file_paths)
}
}

Expand Down
Loading