Skip to content

Commit

Permalink
feat(imap-tasks): Introduce Resolver (#172)
Browse files Browse the repository at this point in the history
* feat: init Scheduler::run_task fn

* ci: fix check

* feat: ensure that Scheduler is Send

* fix: removed task utilities from Scheduler

* fix: adjust code after review

* feat: move run task to a dedicated Resolver

* fix: adjust resolver-related namings
  • Loading branch information
soywod authored May 14, 2024
1 parent 5798e56 commit dec350a
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 18 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ stream = ["dep:rustls", "dep:tokio", "dep:tokio-rustls"]
[dependencies]
bounded-static = "0.5.0"
bytes = "1.5.0"
imap-codec = { version = "2.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] }
imap-types = { version = "2.0.0" }
imap-codec = { version = "2.0.0", features = ["starttls", "quirk_crlf_relaxed", "bounded-static", "ext_condstore_qresync", "ext_login_referrals", "ext_mailbox_referrals", "ext_id", "ext_sort_thread", "ext_binary", "ext_metadata", "ext_uidplus"] }
imap-types = { version = "2.0.0", features = ["starttls", "ext_condstore_qresync", "ext_login_referrals", "ext_mailbox_referrals", "ext_id", "ext_sort_thread", "ext_binary", "ext_metadata", "ext_uidplus"] }
rustls = { version = "0.23.1", optional = true }
thiserror = "1.0.49"
tokio = { version = "1.32.0", optional = true, features = ["io-util", "macros", "net"] }
Expand All @@ -23,7 +23,7 @@ tracing = "0.1.40"
[dev-dependencies]
rand = "0.8.5"
tag-generator = { path = "tag-generator" }
tokio = { version = "1.32.0", features = ["rt", "sync"] }
tokio = { version = "1.37.0", features = ["full"] }

[workspace]
resolver = "2"
Expand Down
4 changes: 4 additions & 0 deletions tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ imap-flow = { path = ".." }
imap-types = "2.0.0"
tag-generator = { path = "../tag-generator" }
thiserror = "1.0.58"
tracing = "0.1.40"

[dev-dependencies]
static_assertions = "1.1.0"
tokio = { version = "1.37.0", features = ["full"] }
47 changes: 47 additions & 0 deletions tasks/examples/client-resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use imap_flow::{
client::{ClientFlow, ClientFlowOptions},
stream::Stream,
};
use tasks::{
resolver::Resolver,
tasks::{authenticate::AuthenticateTask, capability::CapabilityTask, logout::LogoutTask},
};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() {
let stream = TcpStream::connect("127.0.0.1:12345").await.unwrap();
let mut stream = Stream::insecure(stream);
let client = ClientFlow::new(ClientFlowOptions::default());
let mut resolver = Resolver::new(client);

let capability = stream
.progress(resolver.resolve(CapabilityTask::new()))
.await
.unwrap()
.unwrap();

println!("pre-auth capability: {capability:?}");

let capability = stream
.progress(resolver.resolve(AuthenticateTask::plain("alice", "pa²²w0rd", true)))
.await
.unwrap()
.unwrap();

println!("maybe post-auth capability: {capability:?}");

let capability = stream
.progress(resolver.resolve(CapabilityTask::new()))
.await
.unwrap()
.unwrap();

println!("post-auth capability: {capability:?}");

stream
.progress(resolver.resolve(LogoutTask::default()))
.await
.unwrap()
.unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ async fn main() {
let client = ClientFlow::new(ClientFlowOptions::default());
let mut scheduler = Scheduler::new(client);

let handle1 = scheduler.enqueue_task(CapabilityTask::default());
let capability_handle = scheduler.enqueue_task(CapabilityTask::default());

loop {
match stream.progress(&mut scheduler).await.unwrap() {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(capability) = handle1.resolve(&mut token) {
println!("handle1: {capability:?}");
if let Some(capability) = capability_handle.resolve(&mut token) {
println!("capability: {capability:?}");
break;
}
}
Expand All @@ -36,18 +36,18 @@ async fn main() {
}
}

let handle2 = scheduler.enqueue_task(AuthenticateTask::plain("alice", "pa²²w0rd", true));
let handle3 = scheduler.enqueue_task(LogoutTask::default());
let auth_handle = scheduler.enqueue_task(AuthenticateTask::plain("alice", "pa²²w0rd", true));
let logout_handle = scheduler.enqueue_task(LogoutTask::default());

loop {
match stream.progress(&mut scheduler).await.unwrap() {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(auth) = handle2.resolve(&mut token) {
println!("handle2: {auth:?}");
if let Some(auth) = auth_handle.resolve(&mut token) {
println!("auth: {auth:?}");
}

if let Some(logout) = handle3.resolve(&mut token) {
println!("handle3: {logout:?}");
if let Some(logout) = logout_handle.resolve(&mut token) {
println!("logout: {logout:?}");
break;
}
}
Expand Down
24 changes: 18 additions & 6 deletions tasks/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod resolver;
pub mod tasks;

use std::{
Expand Down Expand Up @@ -26,11 +27,11 @@ use thiserror::Error;
/// and move out uninteresting responses (returning `Some(...)`).
///
/// If no active task is interested in a given response, we call this response "unsolicited".
pub trait Task: 'static {
pub trait Task: Send + 'static {
/// Output of the task.
///
/// Returned in [`Self::process_tagged`].
type Output;
type Output: Any + Send;

/// Returns the [`CommandBody`] to issue for this task.
///
Expand Down Expand Up @@ -342,6 +343,8 @@ pub enum SchedulerError {
/// It's better to halt the execution to avoid damage.
#[error("unexpected tag in command completion result")]
UnexpectedTaggedResponse(Tagged<'static>),
#[error("unexpected BYE response")]
UnexpectedByeResponse(Bye<'static>),
}

#[derive(Eq)]
Expand Down Expand Up @@ -398,7 +401,7 @@ impl<T: Task> TaskHandle<T> {
#[derive(Debug)]
pub struct TaskToken {
handle: ClientFlowCommandHandle,
output: Option<Box<dyn Any>>,
output: Option<Box<dyn Any + Send>>,
}

// -------------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -432,7 +435,7 @@ where
///
/// * doesn't have an associated type and uses [`Any`] in [`Self::process_tagged`]
/// * is an object-safe "subset" of [`Task`]
trait TaskAny {
trait TaskAny: Send {
fn process_data(&mut self, data: Data<'static>) -> Option<Data<'static>>;

fn process_untagged(&mut self, status_body: StatusBody<'static>)
Expand All @@ -450,7 +453,7 @@ trait TaskAny {

fn process_bye(&mut self, bye: Bye<'static>) -> Option<Bye<'static>>;

fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any>;
fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send>;
}

impl<T> TaskAny for T
Expand Down Expand Up @@ -487,7 +490,16 @@ where
}

/// Returns [`Any`] instead of [`Task::Output`].
fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any> {
fn process_tagged(self: Box<Self>, status_body: StatusBody<'static>) -> Box<dyn Any + Send> {
Box::new(T::process_tagged(*self, status_body))
}
}

#[cfg(test)]
mod tests {
use static_assertions::assert_impl_all;

use super::Scheduler;

assert_impl_all!(Scheduler: Send);
}
87 changes: 87 additions & 0 deletions tasks/src/resolver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use imap_flow::{client::ClientFlow, Flow, FlowInterrupt};
use imap_types::response::{Response, Status};
use tracing::warn;

use crate::{Scheduler, SchedulerError, SchedulerEvent, Task, TaskHandle};

/// The resolver is a scheduler than manages one task at a time.
pub struct Resolver {
scheduler: Scheduler,
}

impl Flow for Resolver {
type Event = SchedulerEvent;
type Error = SchedulerError;

fn enqueue_input(&mut self, bytes: &[u8]) {
self.scheduler.enqueue_input(bytes);
}

fn progress(&mut self) -> Result<Self::Event, FlowInterrupt<Self::Error>> {
self.scheduler.progress()
}
}

impl Resolver {
/// Create a new resolver.
pub fn new(flow: ClientFlow) -> Self {
Self {
scheduler: Scheduler::new(flow),
}
}

/// Enqueue a [`Task`] for immediate resolution.
pub fn resolve<T: Task>(&mut self, task: T) -> ResolvingTask<T> {
let handle = self.scheduler.enqueue_task(task);

ResolvingTask {
resolver: self,
handle,
}
}
}

pub struct ResolvingTask<'a, T: Task> {
resolver: &'a mut Resolver,
handle: TaskHandle<T>,
}

impl<T: Task> Flow for ResolvingTask<'_, T> {
type Event = T::Output;
type Error = SchedulerError;

fn enqueue_input(&mut self, bytes: &[u8]) {
self.resolver.enqueue_input(bytes);
}

fn progress(&mut self) -> Result<Self::Event, FlowInterrupt<Self::Error>> {
loop {
match self.resolver.progress()? {
SchedulerEvent::TaskFinished(mut token) => {
if let Some(output) = self.handle.resolve(&mut token) {
break Ok(output);
} else {
warn!(?token, "received unexpected task token")
}
}
SchedulerEvent::Unsolicited(unsolicited) => {
if let Response::Status(Status::Bye(bye)) = unsolicited {
let err = SchedulerError::UnexpectedByeResponse(bye);
break Err(FlowInterrupt::Error(err));
} else {
warn!(?unsolicited, "received unsolicited");
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use static_assertions::assert_impl_all;

use super::Resolver;

assert_impl_all!(Resolver: Send);
}

0 comments on commit dec350a

Please sign in to comment.