diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a8eda6e --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +/target +/Cargo.lock + +# IntelliJ, CLion, RustRover, ... +.idea + +# direnv (https://direnv.net/) +.envrc +.direnv diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..a77a1c5 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,40 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] - YYYY-MM-DD + +### Added + +* Created README, CHANGELOG, badges, rustfmt.toml, ... +* Created project board +* Setup CI: Check, Build, Lint, Audit, Coverage, ... +* Licensed everything as "APACHE OR MIT" +* `imap-flow` + * Implemented literal handling, handles, events, and examples + * Implemented AUTHENTICATE and IDLE + * Implemented a self-test, and tested against a few providers +* `proxy` + * Implemented argument processing and configuration + * Smoke tested against a few providers (and a few MUAs) + * Provided a README + * Supported capabilities are ... + * AUTH={PLAIN,LOGIN,XOAUTH2,ScramSha1,ScramSha256} + * SASL-IR + * QUOTA* + * MOVE + * LITERAL+/LITERAL- + * UNSELECT + * ID + * IDLE + * Use ALPN==imap +* `imap-tasks` prototype + * Designed `Task`s trait + * Implemented `Task` for a few commands + * Implemented a task scheduler/manager +* `tag-generator` + +[Unreleased]: https://github.com/duesee/imap-flow/compare/0a89b5e180ad7dfd3d67d1184370fa1028ea92b4...HEAD diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..dd521f4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "imap-client" +version = "0.1.0" +edition = "2021" +license = "MIT" + +[dependencies] +imap-flow = { git = "https://github.com/soywod/imap-flow", branch = "into-inner-stream" } +imap-types = "2" +once_cell = "1" +rustls-native-certs = "0.7.0" +tag-generator = { git = "https://github.com/duesee/imap-flow" } +thiserror = "1.0.50" +tokio = { version = "1.37.0", features = ["net", "time"] } +tokio-rustls = "0.26.0" +tracing = "0.1.40" + +[dev-dependencies] +tokio = { version = "1.37.0", features = ["full"] } + +[patch.crates-io] +imap-codec = { git = "https://github.com/duesee/imap-codec" } +imap-types = { git = "https://github.com/duesee/imap-codec" } diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..a8f240c --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 soywod + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a5307aa --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# imap-client + +*Work In Progress* diff --git a/deny.toml b/deny.toml new file mode 100644 index 0000000..3b00fa4 --- /dev/null +++ b/deny.toml @@ -0,0 +1,17 @@ +[sources] +unknown-registry = "deny" +unknown-git = "deny" + +allow-git = [ + "https://github.com/duesee/imap-codec", +] + +[licenses] +allow = [ "Apache-2.0", "BSD-3-Clause", "MIT", "Unicode-DFS-2016", "ISC", "OpenSSL" ] + +[[licenses.clarify]] +name = "ring" +expression = "MIT AND ISC AND OpenSSL" +license-files = [ + { path = "LICENSE", hash = 0xbd0eed23 } +] diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..b8ff390 --- /dev/null +++ b/flake.lock @@ -0,0 +1,83 @@ +{ + "nodes": { + "fenix": { + "inputs": { + "nixpkgs": [ + "nixpkgs" + ], + "rust-analyzer-src": "rust-analyzer-src" + }, + "locked": { + "lastModified": 1715063087, + "narHash": "sha256-cktPkcCmJ2sR0V/FaWEuCWmKuGPbwoMltih/EfF0mXg=", + "owner": "nix-community", + "repo": "fenix", + "rev": "f8f16c1f2c83bea4e51e6522d988ec8bfcc8420e", + "type": "github" + }, + "original": { + "owner": "nix-community", + "repo": "fenix", + "type": "github" + } + }, + "flake-compat": { + "flake": false, + "locked": { + "lastModified": 1696426674, + "narHash": "sha256-kvjfFW7WAETZlt09AgDn1MrtKzP7t90Vf7vypd3OL1U=", + "owner": "edolstra", + "repo": "flake-compat", + "rev": "0f9255e01c2351cc7d116c072cb317785dd33b33", + "type": "github" + }, + "original": { + "owner": "edolstra", + "repo": "flake-compat", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1714971268, + "narHash": "sha256-IKwMSwHj9+ec660l+I4tki/1NRoeGpyA2GdtdYpAgEw=", + "owner": "nixos", + "repo": "nixpkgs", + "rev": "27c13997bf450a01219899f5a83bd6ffbfc70d3c", + "type": "github" + }, + "original": { + "owner": "nixos", + "ref": "nixos-23.11", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "fenix": "fenix", + "flake-compat": "flake-compat", + "nixpkgs": "nixpkgs" + } + }, + "rust-analyzer-src": { + "flake": false, + "locked": { + "lastModified": 1714936835, + "narHash": "sha256-M+PpgfRMBfHo8Jb2ou/s3maAZbps0XnuHXQU9Hv9vL0=", + "owner": "rust-lang", + "repo": "rust-analyzer", + "rev": "c4618fe14d39992fbbb85c2d6cad028a232c13d2", + "type": "github" + }, + "original": { + "owner": "rust-lang", + "ref": "nightly", + "repo": "rust-analyzer", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..c0cf0cc --- /dev/null +++ b/flake.nix @@ -0,0 +1,53 @@ +{ + inputs = { + nixpkgs.url = "github:nixos/nixpkgs/nixos-23.11"; + + # The rustup equivalent for Nix. + fenix = { + url = "github:nix-community/fenix"; + inputs.nixpkgs.follows = "nixpkgs"; + }; + + # Allows non-flakes users to still be able to `nix-shell` based on + # `shell.nix` instead of this `flake.nix`. + flake-compat = { + url = "github:edolstra/flake-compat"; + flake = false; + }; + }; + + outputs = { self, nixpkgs, fenix, ... }: + let + inherit (nixpkgs) lib; + + eachSupportedSystem = lib.genAttrs supportedSystems; + supportedSystems = [ + "x86_64-linux" + "aarch64-linux" + "x86_64-darwin" + "aarch64-darwin" + ]; + + mkDevShells = system: + let + pkgs = import nixpkgs { inherit system; }; + + # get the rust toolchain from the rustup + # `rust-toolchain.toml` configuration file + rust-toolchain = fenix.packages.${system}.fromToolchainFile { + file = ./rust-toolchain.toml; + sha256 = "opUgs6ckUQCyDxcB9Wy51pqhd0MPGHUVbwRKKPGiwZU="; + }; + + in + { + default = pkgs.mkShell { + buildInputs = [ rust-toolchain ]; + }; + }; + + in + { + devShells = eachSupportedSystem mkDevShells; + }; +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..3ccec48 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,4 @@ +[toolchain] +channel = "stable" +profile = "default" +components = [ "rust-src", "rust-analyzer" ] diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..2cbf0d3 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,3 @@ +format_code_in_doc_comments=true +group_imports="StdExternalCrate" +imports_granularity="Crate" diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..9d07c80 --- /dev/null +++ b/shell.nix @@ -0,0 +1,13 @@ +# Compatiblity file for non-flake Nix users. +# +# +(import + ( + let lock = builtins.fromJSON (builtins.readFile ./flake.lock); in + fetchTarball { + url = lock.nodes.flake-compat.locked.url or "https://github.com/edolstra/flake-compat/archive/${lock.nodes.flake-compat.locked.rev}.tar.gz"; + sha256 = lock.nodes.flake-compat.locked.narHash; + } + ) + { src = ./.; } +).shellNix diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..1e2c992 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1336 @@ +pub mod tasks; + +use std::{cmp::Ordering, collections::HashMap, num::NonZeroU32, sync::Arc, time::Duration}; + +use imap_flow::{ + client::{ClientFlow, ClientFlowError, ClientFlowEvent, ClientFlowOptions}, + stream::{Stream, StreamError}, +}; +use imap_types::{ + auth::AuthMechanism, + bounded_static::IntoBoundedStatic, + command::{Command, CommandBody}, + core::{IString, Literal, LiteralMode, NString, QuotedChar, Tag, Vec1}, + error::ValidationError, + extensions::{ + binary::{Literal8, LiteralOrLiteral8}, + sort::{SortCriterion, SortKey}, + thread::{Thread, ThreadingAlgorithm}, + }, + fetch::{MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName}, + flag::{Flag, FlagNameAttribute, StoreType}, + mailbox::{ListMailbox, Mailbox}, + response::{Capability, Code, Status, Tagged}, + search::SearchKey, + sequence::SequenceSet, +}; +use once_cell::sync::Lazy; +use tasks::{ + resolver::Resolver, + tasks::{ + append::{AppendTask, PostAppendCheckTask, PostAppendNoOpTask}, + appenduid::AppendUidTask, + authenticate::AuthenticateTask, + capability::CapabilityTask, + check::CheckTask, + copy::CopyTask, + create::CreateTask, + delete::DeleteTask, + expunge::ExpungeTask, + fetch::{FetchFirstTask, FetchTask}, + id::IdTask, + list::ListTask, + noop::NoOpTask, + r#move::MoveTask, + search::SearchTask, + select::{SelectDataUnvalidated, SelectTask}, + sort::SortTask, + starttls::StartTlsTask, + store::StoreTask, + thread::ThreadTask, + TaskError, + }, + SchedulerError, SchedulerEvent, Task, +}; +use thiserror::Error; +use tokio::net::TcpStream; +use tokio_rustls::{ + rustls::{pki_types::ServerName, ClientConfig, RootCertStore}, + TlsConnector, TlsStream, +}; +use tracing::{debug, trace, warn}; + +static ROOT_CERT_STORE: Lazy = Lazy::new(|| { + let mut root_store = RootCertStore::empty(); + + for cert in rustls_native_certs::load_native_certs().unwrap() { + root_store.add(cert).unwrap(); + } + + root_store +}); + +#[derive(Debug, Error)] +pub enum ClientError { + #[error("cannot connect to server using TCP")] + ConnectTcp(#[source] tokio::io::Error), + #[error("cannot connect to server using TLS")] + ConnectTls(#[source] tokio::io::Error), + + #[error("stream error")] + Stream(#[from] StreamError), + #[error("validation error")] + Validation(#[from] ValidationError), + + #[error("cannot receive greeting from server")] + ReceiveGreeting(#[source] StreamError), + + #[error("cannot resolve IMAP task")] + ResolveTask(#[from] TaskError), +} + +pub struct Client { + host: String, + stream: Stream, + resolver: Resolver, + capabilities: Vec1>, + idle_timeout: Duration, +} + +/// Client constructors. +/// +/// This section defines 3 public constructors for [`Client`]: +/// `insecure`, `tls` and `starttls`. +impl Client { + /// Creates an insecure client, using TCP. + /// + /// This constructor creates a client based on an raw + /// [`TcpStream`], receives greeting then saves server + /// capabilities. + pub async fn insecure(host: impl ToString, port: u16) -> Result { + let mut client = Self::tcp(host, port).await?; + + if !client.receive_greeting().await? { + client.refresh_capabilities().await?; + } + + Ok(client) + } + + /// Creates a secure client, using SSL/TLS. + /// + /// This constructor creates an client based on a secure + /// [`TcpStream`] wrapped into a [`TlsStream`], receives greeting + /// then saves server capabilities. + pub async fn tls(host: impl ToString, port: u16) -> Result { + let tcp = Self::tcp(host, port).await?; + Self::upgrade_tls(tcp, false).await + } + + /// Creates a secure client, using STARTTLS. + /// + /// This constructor creates an insecure client based on a raw + /// [`TcpStream`], receives greeting, wraps the [`TcpStream`] into + /// a secured [`TlsStream`] then saves server capabilities. + pub async fn starttls(host: impl ToString, port: u16) -> Result { + let tcp = Self::tcp(host, port).await?; + Self::upgrade_tls(tcp, true).await + } + + /// Creates an insecure client based on a raw [`TcpStream`]. + /// + /// This function is internally used by public constructors + /// `insecure`, `tls` and `starttls`. + async fn tcp(host: impl ToString, port: u16) -> Result { + let host = host.to_string(); + + let tcp_stream = TcpStream::connect((host.as_str(), port)) + .await + .map_err(ClientError::ConnectTcp)?; + + let stream = Stream::insecure(tcp_stream); + + let mut flow_opts = ClientFlowOptions::default(); + flow_opts.crlf_relaxed = true; + + let flow = ClientFlow::new(flow_opts); + let resolver = Resolver::new(flow); + + Ok(Self { + host, + stream, + resolver, + capabilities: Vec1::from(Capability::Imap4Rev1), + idle_timeout: Duration::from_secs(5 * 60), // 5 min + }) + } + + /// Turns an insecure client into a secure one. + /// + /// The flow changes depending on the `starttls` parameter: + /// + /// If `true`: receives greeting, sends STARTTLS command, upgrades + /// to TLS then force-refreshes server capabilities. + /// + /// If `false`: upgrades straight to TLS, receives greeting then + /// refreshes server capabilities if needed. + async fn upgrade_tls(mut self, starttls: bool) -> Result { + if starttls { + self.receive_greeting().await?; + let _ = self + .stream + .progress(self.resolver.resolve(StartTlsTask::new())) + .await; + } + + let mut config = ClientConfig::builder() + .with_root_certificates(ROOT_CERT_STORE.clone()) + .with_no_client_auth(); + + // See + config.alpn_protocols = vec![b"imap".to_vec()]; + + let connector = TlsConnector::from(Arc::new(config)); + let dnsname = ServerName::try_from(self.host.clone()).unwrap(); + + let tls_stream = connector + .connect(dnsname, TcpStream::from(self.stream)) + .await + .map_err(ClientError::ConnectTls)?; + + self.stream = Stream::tls(TlsStream::Client(tls_stream)); + + if starttls || !self.receive_greeting().await? { + self.refresh_capabilities().await?; + } + + Ok(self) + } + + /// Receives server greeting. + /// + /// Returns `true` if server capabilities were found in the + /// greeting, otherwise `false`. This boolean is internally used + /// to determine if server capabilities need to be explicitly + /// requested or not. + async fn receive_greeting(&mut self) -> Result { + let evt = self + .stream + .progress(&mut self.resolver) + .await + .map_err(ClientError::ReceiveGreeting)?; + + if let SchedulerEvent::GreetingReceived(greeting) = evt { + if let Some(Code::Capability(capabilities)) = greeting.code { + self.capabilities = capabilities; + return Ok(true); + } + } + + Ok(false) + } +} + +/// Client getters and setters. +/// +/// This section defines helpers to easily manipulate the client's +/// parameters and data. +impl Client { + pub fn get_idle_timeout(&self) -> &Duration { + &self.idle_timeout + } + + pub fn set_idle_timeout(&mut self, timeout: Duration) { + self.idle_timeout = timeout; + } + + pub fn set_some_idle_timeout(&mut self, timeout: Option) { + if let Some(timeout) = timeout { + self.set_idle_timeout(timeout) + } + } + + pub fn with_idle_timeout(mut self, timeout: Duration) -> Self { + self.set_idle_timeout(timeout); + self + } + + pub fn with_some_idle_timeout(mut self, timeout: Option) -> Self { + self.set_some_idle_timeout(timeout); + self + } + + /// Returns the server capabilities. + /// + /// This function does not *fetch* capabilities from server, it + /// just returns capabilities saved during the creation of this + /// client (using [`Client::insecure`], [`Client::tls`] or + /// [`Client::starttls`]). + pub fn capabilities(&self) -> &Vec1> { + &self.capabilities + } + + /// Returns the server capabilities, as an iterator. + /// + /// Same as [`Client::capabilities`], but just returns an iterator + /// instead. + pub fn capabilities_iter(&self) -> impl Iterator> + '_ { + self.capabilities().as_ref().iter() + } + + /// Returns supported authentication mechanisms, as an iterator. + pub fn supported_auth_mechanisms(&self) -> impl Iterator> + '_ { + self.capabilities_iter().filter_map(|capability| { + if let Capability::Auth(mechanism) = capability { + Some(mechanism) + } else { + None + } + }) + } + + /// Returns `true` if the given authentication mechanism is + /// supported by the server. + pub fn supports_auth_mechanism(&self, mechanism: AuthMechanism<'static>) -> bool { + self.capabilities_iter().any(|capability| { + if let Capability::Auth(m) = capability { + m == &mechanism + } else { + false + } + }) + } + + /// Returns `true` if the `SASL-IR` extension is supported by the + /// server. + pub fn ext_sasl_ir_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::SaslIr)) + } + + /// Returns `true` if the `ID` extension is supported by the + /// server. + pub fn ext_id_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::Id)) + } + + /// Returns `true` if the `UIDPLUS` extension is supported by the + /// server. + pub fn ext_uidplus_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::UidPlus)) + } + + /// Returns `true` if the `SORT` extension is supported by the + /// server. + pub fn ext_sort_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::Sort(_))) + } + + /// Returns `true` if the `THREAD` extension is supported by the + /// server. + pub fn ext_thread_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::Thread(_))) + } + + /// Returns `true` if the `IDLE` extension is supported by the + /// server. + pub fn ext_idle_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::Idle)) + } + + /// Returns `true` if the `BINARY` extension is supported by the + /// server. + pub fn ext_binary_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::Binary)) + } + + /// Returns `true` if the `MOVE` extension is supported by the + /// server. + pub fn ext_move_supported(&self) -> bool { + self.capabilities_iter() + .any(|c| matches!(c, Capability::Move)) + } +} + +/// Client low-level API. +/// +/// This section defines the low-level API of the client, by exposing +/// convenient wrappers around [`Task`]s. They do not contain any +/// logic. +impl Client { + /// Resolves the given [`Task`]. + pub async fn resolve(&mut self, task: T) -> Result { + Ok(self.stream.progress(self.resolver.resolve(task)).await?) + } + + /// Creates a new mailbox. + pub async fn create( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + let mbox = mailbox.try_into()?.into_static(); + Ok(self.resolve(CreateTask::new(mbox)).await??) + } + + /// Lists mailboxes. + pub async fn list( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + mailbox_wildcard: impl TryInto, Error = ValidationError>, + ) -> Result< + Vec<( + Mailbox<'static>, + Option, + Vec>, + )>, + ClientError, + > { + let mbox = mailbox.try_into()?.into_static(); + let mbox_wcard = mailbox_wildcard.try_into()?.into_static(); + Ok(self.resolve(ListTask::new(mbox, mbox_wcard)).await??) + } + + /// Selects the given mailbox. + pub async fn select( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result { + let mbox = mailbox.try_into()?.into_static(); + Ok(self.resolve(SelectTask::new(mbox)).await??) + } + + /// Selects the given mailbox in read-only mode. + pub async fn examine( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result { + let mbox = mailbox.try_into()?.into_static(); + Ok(self.resolve(SelectTask::read_only(mbox)).await??) + } + + /// Expunges the selected mailbox. + /// + /// A mailbox needs to be selected before, otherwise this function + /// will fail. + pub async fn expunge(&mut self) -> Result, ClientError> { + Ok(self.resolve(ExpungeTask::new()).await??) + } + + /// Deletes the given mailbox. + pub async fn delete( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + let mbox = mailbox.try_into()?.into_static(); + Ok(self.resolve(DeleteTask::new(mbox)).await??) + } + + /// Searches messages matching the given criteria. + async fn _search( + &mut self, + criteria: impl IntoIterator>, + uid: bool, + ) -> Result, ClientError> { + let criteria: Vec<_> = criteria + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + + let criteria = if criteria.is_empty() { + Vec1::from(SearchKey::All) + } else { + Vec1::try_from(criteria).unwrap() + }; + + Ok(self + .resolve(SearchTask::new(criteria).with_uid(uid)) + .await??) + } + + /// Searches messages matching the given criteria. + /// + /// This function returns sequence numbers, if you need UID see + /// [`Client::uid_search`]. + pub async fn search( + &mut self, + criteria: impl IntoIterator>, + ) -> Result, ClientError> { + self._search(criteria, false).await + } + + /// Searches messages matching the given criteria. + /// + /// This function returns UIDs, if you need sequence numbers see + /// [`Client::search`]. + pub async fn uid_search( + &mut self, + criteria: impl IntoIterator>, + ) -> Result, ClientError> { + self._search(criteria, true).await + } + + /// Searches messages matching the given search criteria, sorted + /// by the given sort criteria. + async fn _sort( + &mut self, + sort_criteria: impl IntoIterator, + search_criteria: impl IntoIterator>, + uid: bool, + ) -> Result, ClientError> { + let sort: Vec<_> = sort_criteria.into_iter().collect(); + let sort = if sort.is_empty() { + Vec1::from(SortCriterion { + reverse: true, + key: SortKey::Date, + }) + } else { + Vec1::try_from(sort).unwrap() + }; + + let search: Vec<_> = search_criteria + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + let search = if search.is_empty() { + Vec1::from(SearchKey::All) + } else { + Vec1::try_from(search).unwrap() + }; + + Ok(self + .resolve(SortTask::new(sort, search).with_uid(uid)) + .await??) + } + + /// Searches messages matching the given search criteria, sorted + /// by the given sort criteria. + /// + /// This function returns sequence numbers, if you need UID see + /// [`Client::uid_sort`]. + pub async fn sort( + &mut self, + sort_criteria: impl IntoIterator, + search_criteria: impl IntoIterator>, + ) -> Result, ClientError> { + self._sort(sort_criteria, search_criteria, false).await + } + + /// Searches messages matching the given search criteria, sorted + /// by the given sort criteria. + /// + /// This function returns UIDs, if you need sequence numbers see + /// [`Client::sort`]. + pub async fn uid_sort( + &mut self, + sort_criteria: impl IntoIterator, + search_criteria: impl IntoIterator>, + ) -> Result, ClientError> { + self._sort(sort_criteria, search_criteria, true).await + } + + async fn _thread( + &mut self, + algorithm: ThreadingAlgorithm<'_>, + search_criteria: impl IntoIterator>, + uid: bool, + ) -> Result, ClientError> { + let alg = algorithm.into_static(); + + let search: Vec<_> = search_criteria + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + let search = if search.is_empty() { + Vec1::from(SearchKey::All) + } else { + Vec1::try_from(search).unwrap() + }; + + Ok(self + .resolve(ThreadTask::new(alg, search).with_uid(uid)) + .await??) + } + + pub async fn thread( + &mut self, + algorithm: ThreadingAlgorithm<'_>, + search_criteria: impl IntoIterator>, + ) -> Result, ClientError> { + self._thread(algorithm, search_criteria, false).await + } + + pub async fn uid_thread( + &mut self, + algorithm: ThreadingAlgorithm<'_>, + search_criteria: impl IntoIterator>, + ) -> Result, ClientError> { + self._thread(algorithm, search_criteria, true).await + } + + async fn _store( + &mut self, + sequence_set: SequenceSet, + kind: StoreType, + flags: impl IntoIterator>, + uid: bool, + ) -> Result>>, ClientError> { + let flags: Vec<_> = flags + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + + Ok(self + .resolve(StoreTask::new(sequence_set, kind, flags).with_uid(uid)) + .await??) + } + + pub async fn store( + &mut self, + sequence_set: SequenceSet, + kind: StoreType, + flags: impl IntoIterator>, + ) -> Result>>, ClientError> { + self._store(sequence_set, kind, flags, false).await + } + + pub async fn uid_store( + &mut self, + sequence_set: SequenceSet, + kind: StoreType, + flags: impl IntoIterator>, + ) -> Result>>, ClientError> { + self._store(sequence_set, kind, flags, true).await + } + + async fn _silent_store( + &mut self, + sequence_set: SequenceSet, + kind: StoreType, + flags: impl IntoIterator>, + uid: bool, + ) -> Result<(), ClientError> { + let flags: Vec<_> = flags + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + + let task = StoreTask::new(sequence_set, kind, flags) + .with_uid(uid) + .silent(); + + Ok(self.resolve(task).await??) + } + + pub async fn silent_store( + &mut self, + sequence_set: SequenceSet, + kind: StoreType, + flags: impl IntoIterator>, + ) -> Result<(), ClientError> { + self._silent_store(sequence_set, kind, flags, false).await + } + + pub async fn uid_silent_store( + &mut self, + sequence_set: SequenceSet, + kind: StoreType, + flags: impl IntoIterator>, + ) -> Result<(), ClientError> { + self._silent_store(sequence_set, kind, flags, true).await + } + + pub async fn post_append_noop(&mut self) -> Result, ClientError> { + Ok(self.resolve(PostAppendNoOpTask::new()).await??) + } + + pub async fn post_append_check(&mut self) -> Result, ClientError> { + Ok(self.resolve(PostAppendCheckTask::new()).await??) + } + + async fn _fetch_first( + &mut self, + id: NonZeroU32, + items: MacroOrMessageDataItemNames<'_>, + uid: bool, + ) -> Result>, ClientError> { + let items = items.into_static(); + + Ok(self + .resolve(FetchFirstTask::new(id, items).with_uid(uid)) + .await??) + } + + pub async fn fetch_first( + &mut self, + id: NonZeroU32, + items: MacroOrMessageDataItemNames<'_>, + ) -> Result>, ClientError> { + self._fetch_first(id, items, false).await + } + + pub async fn uid_fetch_first( + &mut self, + id: NonZeroU32, + items: MacroOrMessageDataItemNames<'_>, + ) -> Result>, ClientError> { + self._fetch_first(id, items, true).await + } + + async fn _copy( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + uid: bool, + ) -> Result<(), ClientError> { + let mbox = mailbox.try_into()?.into_static(); + + Ok(self + .resolve(CopyTask::new(sequence_set, mbox).with_uid(uid)) + .await??) + } + + pub async fn copy( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + self._copy(sequence_set, mailbox, false).await + } + + pub async fn uid_copy( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + self._copy(sequence_set, mailbox, true).await + } + + async fn _move( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + uid: bool, + ) -> Result<(), ClientError> { + let mbox = mailbox.try_into()?.into_static(); + + Ok(self + .resolve(MoveTask::new(sequence_set, mbox).with_uid(uid)) + .await??) + } + + pub async fn r#move( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + self._move(sequence_set, mailbox, false).await + } + + pub async fn uid_move( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + self._move(sequence_set, mailbox, true).await + } + + /// Executes the `CHECK` command. + pub async fn check(&mut self) -> Result<(), ClientError> { + Ok(self.resolve(CheckTask::new()).await??) + } + + /// Executes the `NOOP` command. + pub async fn noop(&mut self) -> Result<(), ClientError> { + Ok(self.resolve(NoOpTask::new()).await??) + } +} + +/// Client medium-level API. +/// +/// This section defines the medium-level API of the client (based on +/// the low-level one), by exposing helpers that update client state +/// and use a small amount of logic (mostly conditional code depending +/// on available server capabilities). +impl Client { + /// Fetches server capabilities, then saves them. + pub async fn refresh_capabilities(&mut self) -> Result<(), ClientError> { + self.capabilities = self.resolve(CapabilityTask::new()).await??; + Ok(()) + } + + /// Authenticates the user using the given [`AuthenticateTask`]. + /// + /// This function also refreshes capabilities (either from the + /// task output or from explicit request). + async fn authenticate(&mut self, task: AuthenticateTask) -> Result<(), ClientError> { + match self.resolve(task).await?? { + Some(capabilities) => { + self.capabilities = capabilities; + } + None => { + self.refresh_capabilities().await?; + } + }; + + Ok(()) + } + + /// Authenticates the user using the `PLAIN` mechanism. + pub async fn authenticate_plain( + &mut self, + login: impl AsRef, + password: impl AsRef, + ) -> Result<(), ClientError> { + self.authenticate(AuthenticateTask::plain( + login.as_ref(), + password.as_ref(), + self.ext_sasl_ir_supported(), + )) + .await + } + + /// Authenticates the user using the `XOAUTH2` mechanism. + pub async fn authenticate_xoauth2( + &mut self, + login: impl AsRef, + token: impl AsRef, + ) -> Result<(), ClientError> { + self.authenticate(AuthenticateTask::xoauth2( + login.as_ref(), + token.as_ref(), + self.ext_sasl_ir_supported(), + )) + .await + } + + /// Authenticates the user using the `OAUTHBEARER` mechanism. + pub async fn authenticate_oauthbearer( + &mut self, + user: impl AsRef, + host: impl AsRef, + port: u16, + token: impl AsRef, + ) -> Result<(), ClientError> { + self.authenticate(AuthenticateTask::oauthbearer( + user.as_ref(), + host.as_ref(), + port, + token.as_ref(), + self.ext_sasl_ir_supported(), + )) + .await + } + + /// Exchanges client/server ids. + /// + /// If the server does not support the `ID` extension, this + /// function has no effect. + pub async fn id( + &mut self, + params: Option, NString<'static>)>>, + ) -> Result, NString<'static>)>>, ClientError> { + Ok(if self.ext_id_supported() { + self.resolve(IdTask::new(params)).await?? + } else { + warn!("IMAP ID extension not supported, skipping"); + None + }) + } + + pub async fn append( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + flags: impl IntoIterator>, + message: impl AsRef<[u8]>, + ) -> Result, ClientError> { + let mbox = mailbox.try_into()?.into_static(); + + let flags: Vec<_> = flags + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + + let msg = to_static_literal(message, self.ext_binary_supported())?; + + Ok(self + .resolve(AppendTask::new(mbox, msg).with_flags(flags)) + .await??) + } + + pub async fn appenduid( + &mut self, + mailbox: impl TryInto, Error = ValidationError>, + flags: impl IntoIterator>, + message: impl AsRef<[u8]>, + ) -> Result, ClientError> { + let mbox = mailbox.try_into()?.into_static(); + + let flags: Vec<_> = flags + .into_iter() + .map(IntoBoundedStatic::into_static) + .collect(); + + let msg = to_static_literal(message, self.ext_binary_supported())?; + + Ok(self + .resolve(AppendUidTask::new(mbox, msg).with_flags(flags)) + .await??) + } +} + +/// Client high-level API. +/// +/// This section defines the high-level API of the client (based on +/// the low and medium ones), by exposing opinionated helpers. They +/// contain more logic, and make use of fallbacks depending on +/// available server capabilities. +impl Client { + async fn _fetch( + &mut self, + sequence_set: SequenceSet, + items: MacroOrMessageDataItemNames<'_>, + uid: bool, + ) -> Result>>, ClientError> { + let mut items = match items { + MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(), + MacroOrMessageDataItemNames::MessageDataItemNames(items) => items.into_static(), + }; + + if uid { + items.push(MessageDataItemName::Uid); + } + + let seq_map = self + .resolve(FetchTask::new(sequence_set, items.into()).with_uid(uid)) + .await??; + + if uid { + let mut uid_map = HashMap::new(); + + for (seq, items) in seq_map { + let uid = items.as_ref().iter().find_map(|item| { + if let MessageDataItem::Uid(uid) = item { + Some(*uid) + } else { + None + } + }); + + match uid { + Some(uid) => { + uid_map.insert(uid, items); + } + None => { + warn!(?seq, "cannot get message uid, skipping it"); + } + } + } + + Ok(uid_map) + } else { + Ok(seq_map) + } + } + + pub async fn fetch( + &mut self, + sequence_set: SequenceSet, + items: MacroOrMessageDataItemNames<'_>, + ) -> Result>>, ClientError> { + self._fetch(sequence_set, items, false).await + } + + pub async fn uid_fetch( + &mut self, + sequence_set: SequenceSet, + items: MacroOrMessageDataItemNames<'_>, + ) -> Result>>, ClientError> { + self._fetch(sequence_set, items, true).await + } + + async fn _sort_or_fallback( + &mut self, + sort_criteria: impl IntoIterator + Clone, + search_criteria: impl IntoIterator>, + fetch_items: MacroOrMessageDataItemNames<'_>, + uid: bool, + ) -> Result>>, ClientError> { + let mut fetch_items = match fetch_items { + MacroOrMessageDataItemNames::Macro(m) => m.expand().into_static(), + MacroOrMessageDataItemNames::MessageDataItemNames(items) => items, + }; + + if uid { + fetch_items.push(MessageDataItemName::Uid); + } + + if self.ext_sort_supported() { + let fetch_items = MacroOrMessageDataItemNames::MessageDataItemNames(fetch_items); + + let ids = self.sort(sort_criteria, search_criteria).await?; + + let mut fetches = self + ._fetch(ids.clone().try_into().unwrap(), fetch_items, uid) + .await?; + + let items = ids.into_iter().flat_map(|id| fetches.remove(&id)).collect(); + + Ok(items) + } else { + warn!("IMAP SORT extension not supported, using fallback"); + let ids = self._search(search_criteria, uid).await?; + + sort_criteria + .clone() + .into_iter() + .filter_map(|criterion| match criterion.key { + SortKey::Arrival => Some(MessageDataItemName::InternalDate), + SortKey::Cc => Some(MessageDataItemName::Envelope), + SortKey::Date => Some(MessageDataItemName::Envelope), + SortKey::From => Some(MessageDataItemName::Envelope), + SortKey::Size => Some(MessageDataItemName::Rfc822Size), + SortKey::Subject => Some(MessageDataItemName::Envelope), + SortKey::To => Some(MessageDataItemName::Envelope), + SortKey::DisplayFrom => None, + SortKey::DisplayTo => None, + }) + .for_each(|item| { + if !fetch_items.contains(&item) { + fetch_items.push(item) + } + }); + + let mut fetches: Vec<_> = self + ._fetch(ids.try_into().unwrap(), fetch_items.into(), uid) + .await? + .into_values() + .collect(); + + fetches.sort_by(|a, b| { + for criterion in sort_criteria.clone().into_iter() { + let mut cmp = cmp_fetch_items(&criterion.key, a, b); + + if criterion.reverse { + cmp = cmp.reverse(); + } + + if cmp.is_ne() { + return cmp; + } + } + + cmp_fetch_items(&SortKey::Date, a, b) + }); + + Ok(fetches) + } + } + + pub async fn sort_or_fallback( + &mut self, + sort_criteria: impl IntoIterator + Clone, + search_criteria: impl IntoIterator>, + fetch_items: MacroOrMessageDataItemNames<'_>, + ) -> Result>>, ClientError> { + self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, false) + .await + } + + pub async fn uid_sort_or_fallback( + &mut self, + sort_criteria: impl IntoIterator + Clone, + search_criteria: impl IntoIterator>, + fetch_items: MacroOrMessageDataItemNames<'_>, + ) -> Result>>, ClientError> { + self._sort_or_fallback(sort_criteria, search_criteria, fetch_items, true) + .await + } + + pub async fn appenduid_or_fallback( + &mut self, + mailbox: impl TryInto, Error = ValidationError> + Clone, + flags: impl IntoIterator>, + message: impl AsRef<[u8]>, + ) -> Result, ClientError> { + if self.ext_uidplus_supported() { + Ok(self + .appenduid(mailbox, flags, message) + .await? + .map(|(uid, _)| uid)) + } else { + warn!("IMAP UIDPLUS extension not supported, using fallback"); + + // If the mailbox is currently selected, the normal new + // message actions SHOULD occur. Specifically, the server + // SHOULD notify the client immediately via an untagged + // EXISTS response. If the server does not do so, the + // client MAY issue a NOOP command (or failing that, a + // CHECK command) after one or more APPEND commands. + // + // + self.select(mailbox.clone()).await?; + + let seq = match self.append(mailbox, flags, message).await? { + Some(seq) => seq, + None => match self.post_append_noop().await? { + Some(seq) => seq, + None => self + .post_append_check() + .await? + .ok_or(ClientError::ResolveTask(TaskError::MissingData( + "APPENDUID: seq".into(), + )))?, + }, + }; + + let uid = self + .search(Vec1::from(SearchKey::SequenceSet(seq.try_into().unwrap()))) + .await? + .into_iter() + .next(); + + Ok(uid) + } + } + + async fn _move_or_fallback( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + uid: bool, + ) -> Result<(), ClientError> { + if self.ext_move_supported() { + self._move(sequence_set, mailbox, uid).await + } else { + warn!("IMAP MOVE extension not supported, using fallback"); + self._copy(sequence_set.clone(), mailbox, uid).await?; + self._silent_store(sequence_set, StoreType::Add, Some(Flag::Deleted), uid) + .await?; + self.expunge().await?; + Ok(()) + } + } + + pub async fn move_or_fallback( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + self._move_or_fallback(sequence_set, mailbox, false).await + } + + pub async fn uid_move_or_fallback( + &mut self, + sequence_set: SequenceSet, + mailbox: impl TryInto, Error = ValidationError>, + ) -> Result<(), ClientError> { + self._move_or_fallback(sequence_set, mailbox, true).await + } + + pub fn enqueue_idle(&mut self) -> Tag<'static> { + let tag = self.resolver.scheduler.tag_generator.generate(); + + self.resolver.scheduler.flow.enqueue_command(Command { + tag: tag.clone(), + body: CommandBody::Idle, + }); + + tag.into_static() + } + + #[tracing::instrument(name = "idle", skip_all)] + pub async fn idle(&mut self, tag: Tag<'static>) -> Result<(), StreamError> { + debug!("starting the main loop"); + + loop { + let progress = self.stream.progress(&mut self.resolver.scheduler.flow); + match tokio::time::timeout(self.idle_timeout, progress).await { + Err(_) => { + debug!("timed out, sending done command…"); + self.resolver.scheduler.flow.set_idle_done(); + } + Ok(Err(err)) => { + break Err(err); + } + Ok(Ok(ClientFlowEvent::IdleCommandSent { .. })) => { + debug!("command sent"); + } + Ok(Ok(ClientFlowEvent::IdleAccepted { .. })) => { + debug!("command accepted, entering idle mode"); + } + Ok(Ok(ClientFlowEvent::IdleRejected { status, .. })) => { + warn!("command rejected, aborting: {status:?}"); + break Ok(()); + } + Ok(Ok(ClientFlowEvent::IdleDoneSent { .. })) => { + debug!("done command sent"); + } + Ok(Ok(ClientFlowEvent::DataReceived { data })) => { + debug!("received data, sending done command…"); + trace!("{data:#?}"); + self.resolver.scheduler.flow.set_idle_done(); + } + Ok(Ok(ClientFlowEvent::StatusReceived { + status: + Status::Tagged(Tagged { + tag: ref got_tag, .. + }), + })) if *got_tag == tag => { + debug!("received tagged response, exiting"); + break Ok(()); + } + Ok(event) => { + debug!("received unknown event, ignoring: {event:?}"); + } + } + } + } + + #[tracing::instrument(name = "idle/done", skip_all)] + pub async fn idle_done( + &mut self, + tag: Tag<'static>, + ) -> Result<(), StreamError> { + self.resolver.scheduler.flow.set_idle_done(); + + loop { + let progress = self + .stream + .progress(&mut self.resolver.scheduler.flow) + .await?; + + match progress { + ClientFlowEvent::IdleDoneSent { .. } => { + debug!("done command sent"); + } + ClientFlowEvent::StatusReceived { + status: + Status::Tagged(Tagged { + tag: ref got_tag, .. + }), + } if *got_tag == tag => { + debug!("received tagged response, exiting"); + break Ok(()); + } + event => { + debug!("received unknown event, ignoring: {event:?}"); + } + } + } + } +} + +fn cmp_fetch_items( + criterion: &SortKey, + a: &Vec1, + b: &Vec1, +) -> Ordering { + use MessageDataItem::*; + + match &criterion { + SortKey::Arrival => { + let a = a.as_ref().iter().find_map(|a| { + if let InternalDate(dt) = a { + Some(dt.as_ref()) + } else { + None + } + }); + + let b = b.as_ref().iter().find_map(|b| { + if let InternalDate(dt) = b { + Some(dt.as_ref()) + } else { + None + } + }); + + a.cmp(&b) + } + SortKey::Date => { + let a = a.as_ref().iter().find_map(|a| { + if let Envelope(envelope) = a { + envelope.date.0.as_ref().map(AsRef::as_ref) + } else { + None + } + }); + + let b = b.as_ref().iter().find_map(|b| { + if let Envelope(envelope) = b { + envelope.date.0.as_ref().map(AsRef::as_ref) + } else { + None + } + }); + + a.cmp(&b) + } + SortKey::Size => { + let a = a.as_ref().iter().find_map(|a| { + if let Rfc822Size(size) = a { + Some(size) + } else { + None + } + }); + + let b = b.as_ref().iter().find_map(|b| { + if let Rfc822Size(size) = b { + Some(size) + } else { + None + } + }); + + a.cmp(&b) + } + SortKey::Subject => { + let a = a.as_ref().iter().find_map(|a| { + if let Envelope(envelope) = a { + envelope.subject.0.as_ref().map(AsRef::as_ref) + } else { + None + } + }); + + let b = b.as_ref().iter().find_map(|b| { + if let Envelope(envelope) = b { + envelope.subject.0.as_ref().map(AsRef::as_ref) + } else { + None + } + }); + + a.cmp(&b) + } + // FIXME: Address missing Ord derive in imap-types + SortKey::Cc | SortKey::From | SortKey::To | SortKey::DisplayFrom | SortKey::DisplayTo => { + Ordering::Equal + } + } +} + +fn to_static_literal( + message: impl AsRef<[u8]>, + ext_binary_supported: bool, +) -> Result, ValidationError> { + let message = if ext_binary_supported { + LiteralOrLiteral8::Literal8(Literal8 { + data: message.as_ref().into(), + mode: LiteralMode::Sync, + }) + } else { + warn!("IMAP BINARY extension not supported, using fallback"); + Literal::validate(message.as_ref())?; + LiteralOrLiteral8::Literal(Literal::unvalidated(message.as_ref())) + }; + + Ok(message.into_static()) +} diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs new file mode 100644 index 0000000..5257336 --- /dev/null +++ b/src/tasks/mod.rs @@ -0,0 +1,535 @@ +pub mod resolver; +pub mod tasks; + +use std::{ + any::Any, + collections::VecDeque, + fmt::{Debug, Formatter}, + marker::PhantomData, +}; + +use imap_flow::{ + client::{ClientFlow, ClientFlowCommandHandle, ClientFlowError, ClientFlowEvent}, + Flow, FlowInterrupt, +}; +use imap_types::{ + auth::AuthenticateData, + command::{Command, CommandBody}, + core::Tag, + response::{ + Bye, CommandContinuationRequest, Data, Greeting, Response, Status, StatusBody, Tagged, + }, +}; +use tag_generator::TagGenerator; +use thiserror::Error; + +/// Tells how a specific IMAP [`Command`] is processed. +/// +/// Most `process_` trait methods consume interesting responses (returning `None`), +/// and move out uninteresting responses (returning `Some(...)`). +/// +/// If no active task is interested in a given response, we call this response "unsolicited". +pub trait Task: Send + 'static { + /// Output of the task. + /// + /// Returned in [`Self::process_tagged`]. + type Output: Any + Send; + + /// Returns the [`CommandBody`] to issue for this task. + /// + /// Note: The [`Scheduler`] will tag the [`CommandBody`] creating a complete [`Command`]. + fn command_body(&self) -> CommandBody<'static>; + + /// Process data response. + fn process_data(&mut self, data: Data<'static>) -> Option> { + // Default: Don't process server data + Some(data) + } + + /// Process untagged response. + fn process_untagged( + &mut self, + status_body: StatusBody<'static>, + ) -> Option> { + // Default: Don't process untagged status + Some(status_body) + } + + /// Process command continuation request response. + fn process_continuation_request( + &mut self, + continuation: CommandContinuationRequest<'static>, + ) -> Option> { + // Default: Don't process command continuation request response + Some(continuation) + } + + /// Process command continuation request response (during authenticate). + fn process_continuation_request_authenticate( + &mut self, + continuation: CommandContinuationRequest<'static>, + ) -> Result, CommandContinuationRequest<'static>> { + // Default: Don't process command continuation request response (during authenticate) + Err(continuation) + } + + /// Process bye response. + fn process_bye(&mut self, bye: Bye<'static>) -> Option> { + // Default: Don't process bye + Some(bye) + } + + /// Process command completion result response. + /// + /// The [`Scheduler`] already chooses the corresponding response by tag. + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output; +} + +/// Scheduler managing enqueued tasks and routing incoming responses to active tasks. +pub struct Scheduler { + pub flow: ClientFlow, + waiting_tasks: TaskMap, + active_tasks: TaskMap, + pub tag_generator: TagGenerator, +} + +impl Scheduler { + /// Create a new scheduler. + pub fn new(flow: ClientFlow) -> Self { + Self { + flow, + waiting_tasks: Default::default(), + active_tasks: Default::default(), + tag_generator: TagGenerator::new(), + } + } + + /// Enqueue a [`Task`]. + pub fn enqueue_task(&mut self, task: T) -> TaskHandle + where + T: Task, + { + let tag = self.tag_generator.generate(); + + let cmd = { + let body = task.command_body(); + Command { + tag: tag.clone(), + body, + } + }; + + let handle = self.flow.enqueue_command(cmd); + + self.waiting_tasks.push_back(handle, tag, Box::new(task)); + + TaskHandle::new(handle) + } + + pub fn enqueue_input(&mut self, bytes: &[u8]) { + self.flow.enqueue_input(bytes); + } + + /// Progress the connection returning the next event. + pub fn progress(&mut self) -> Result> { + loop { + let event = match self.flow.progress() { + Ok(event) => event, + Err(FlowInterrupt::Io(io)) => return Err(FlowInterrupt::Io(io)), + Err(FlowInterrupt::Error(err)) => { + return Err(FlowInterrupt::Error(SchedulerError::Flow(err))) + } + }; + + match event { + ClientFlowEvent::GreetingReceived { greeting } => { + return Ok(SchedulerEvent::GreetingReceived(greeting)); + } + ClientFlowEvent::CommandSent { handle, .. } => { + // This `unwrap` can't fail because `waiting_tasks` contains all unsent `Commands`. + let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap(); + self.active_tasks.push_back(handle, tag, task); + } + ClientFlowEvent::CommandRejected { handle, status, .. } => { + let body = match status { + Status::Tagged(Tagged { body, .. }) => body, + _ => unreachable!(), + }; + + // This `unwrap` can't fail because `active_tasks` contains all in-progress `Commands`. + let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap(); + + let output = Some(task.process_tagged(body)); + + return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output })); + } + ClientFlowEvent::AuthenticateStarted { handle } => { + let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap(); + self.active_tasks.push_back(handle, tag, task); + } + ClientFlowEvent::AuthenticateContinuationRequestReceived { + handle, + continuation_request, + } => { + let task = self.active_tasks.get_task_by_handle_mut(handle).unwrap(); + + let continuation = + task.process_continuation_request_authenticate(continuation_request); + + match continuation { + Ok(data) => { + self.flow.set_authenticate_data(data).unwrap(); + } + Err(continuation) => { + return Ok(SchedulerEvent::Unsolicited( + Response::CommandContinuationRequest(continuation), + )); + } + } + } + ClientFlowEvent::AuthenticateStatusReceived { handle, status, .. } => { + let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap(); + + let body = match status { + Status::Untagged(_) => unreachable!(), + Status::Tagged(tagged) => tagged.body, + Status::Bye(_) => unreachable!(), + }; + + let output = Some(task.process_tagged(body)); + + return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output })); + } + ClientFlowEvent::DataReceived { data } => { + if let Some(data) = + trickle_down(data, self.active_tasks.tasks_mut(), |task, data| { + task.process_data(data) + }) + { + return Ok(SchedulerEvent::Unsolicited(Response::Data(data))); + } + } + ClientFlowEvent::ContinuationRequestReceived { + continuation_request, + } => { + if let Some(continuation) = trickle_down( + continuation_request, + self.active_tasks.tasks_mut(), + |task, continuation_request| { + task.process_continuation_request(continuation_request) + }, + ) { + return Ok(SchedulerEvent::Unsolicited( + Response::CommandContinuationRequest(continuation), + )); + } + } + ClientFlowEvent::StatusReceived { status } => match status { + Status::Untagged(body) => { + if let Some(body) = + trickle_down(body, self.active_tasks.tasks_mut(), |task, body| { + task.process_untagged(body) + }) + { + return Ok(SchedulerEvent::Unsolicited(Response::Status( + Status::Untagged(body), + ))); + } + } + Status::Bye(bye) => { + if let Some(bye) = + trickle_down(bye, self.active_tasks.tasks_mut(), |task, bye| { + task.process_bye(bye) + }) + { + return Ok(SchedulerEvent::Unsolicited(Response::Status(Status::Bye( + bye, + )))); + } + } + Status::Tagged(Tagged { tag, body }) => { + let Some((handle, _, task)) = self.active_tasks.remove_by_tag(&tag) else { + return Err(FlowInterrupt::Error( + SchedulerError::UnexpectedTaggedResponse(Tagged { tag, body }), + )); + }; + + let output = Some(task.process_tagged(body)); + + return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output })); + } + }, + ClientFlowEvent::IdleCommandSent { handle, .. } => { + // This `unwrap` can't fail because `waiting_tasks` contains all unsent `Commands`. + let (handle, tag, task) = self.waiting_tasks.remove_by_handle(handle).unwrap(); + self.active_tasks.push_back(handle, tag, task); + } + ClientFlowEvent::IdleAccepted { .. } => { + println!("IDLE accepted!"); + } + ClientFlowEvent::IdleRejected { handle, status, .. } => { + let body = match status { + Status::Tagged(Tagged { body, .. }) => body, + _ => unreachable!(), + }; + + // This `unwrap` can't fail because `active_tasks` contains all in-progress `Commands`. + let (_, _, task) = self.active_tasks.remove_by_handle(handle).unwrap(); + + let output = Some(task.process_tagged(body)); + + return Ok(SchedulerEvent::TaskFinished(TaskToken { handle, output })); + } + ClientFlowEvent::IdleDoneSent { .. } => { + println!("IDLE done!"); + } + } + } + } +} + +impl Flow for Scheduler { + type Event = SchedulerEvent; + type Error = SchedulerError; + + fn enqueue_input(&mut self, bytes: &[u8]) { + self.enqueue_input(bytes); + } + + fn progress(&mut self) -> Result> { + self.progress() + } +} + +#[derive(Default)] +struct TaskMap { + tasks: VecDeque<(ClientFlowCommandHandle, Tag<'static>, Box)>, +} + +impl TaskMap { + fn push_back( + &mut self, + handle: ClientFlowCommandHandle, + tag: Tag<'static>, + task: Box, + ) { + self.tasks.push_back((handle, tag, task)); + } + + fn get_task_by_handle_mut( + &mut self, + handle: ClientFlowCommandHandle, + ) -> Option<&mut Box> { + self.tasks + .iter_mut() + .find_map(|(current_handle, _, task)| (handle == *current_handle).then_some(task)) + } + + fn tasks_mut(&mut self) -> impl Iterator> { + self.tasks.iter_mut().map(|(_, _, task)| task) + } + + fn remove_by_handle( + &mut self, + handle: ClientFlowCommandHandle, + ) -> Option<(ClientFlowCommandHandle, Tag<'static>, Box)> { + let index = self + .tasks + .iter() + .position(|(current_handle, _, _)| handle == *current_handle)?; + self.tasks.remove(index) + } + + fn remove_by_tag( + &mut self, + tag: &Tag, + ) -> Option<(ClientFlowCommandHandle, Tag<'static>, Box)> { + let index = self + .tasks + .iter() + .position(|(_, current_tag, _)| tag == current_tag)?; + self.tasks.remove(index) + } +} + +#[derive(Debug)] +pub enum SchedulerEvent { + GreetingReceived(Greeting<'static>), + TaskFinished(TaskToken), + Unsolicited(Response<'static>), +} + +#[derive(Debug, Error)] +pub enum SchedulerError { + /// Flow error. + #[error("flow error")] + Flow(#[from] ClientFlowError), + /// Unexpected tag in command completion result. + /// + /// The scheduler received a tag that cannot be matched to an active command. + /// This could be due to a severe implementation error in the scheduler, + /// the server, or anything in-between, really. + /// + /// It's better to halt the execution to avoid damage. + #[error("unexpected tag in command completion result")] + UnexpectedTaggedResponse(Tagged<'static>), + #[error("unexpected BYE response")] + UnexpectedByeResponse(Bye<'static>), +} + +#[derive(Eq)] +pub struct TaskHandle { + handle: ClientFlowCommandHandle, + _t: PhantomData, +} + +impl TaskHandle { + fn new(handle: ClientFlowCommandHandle) -> Self { + Self { + handle, + _t: Default::default(), + } + } + + /// Try resolving the task invalidating the token. + /// + /// The token is invalidated iff the return value is `Some`. + pub fn resolve(&self, token: &mut TaskToken) -> Option { + if token.handle != self.handle { + return None; + } + + let output = token.output.take()?; + let output = output.downcast::().unwrap(); + + Some(*output) + } +} + +impl Debug for TaskHandle { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("TaskHandle") + .field("handle", &self.handle) + .finish() + } +} + +impl Clone for TaskHandle { + fn clone(&self) -> Self { + *self + } +} + +impl Copy for TaskHandle {} + +impl PartialEq for TaskHandle { + fn eq(&self, other: &Self) -> bool { + self.handle == other.handle + } +} + +#[derive(Debug)] +pub struct TaskToken { + handle: ClientFlowCommandHandle, + output: Option>, +} + +// ------------------------------------------------------------------------------------------------- + +/// Move `trickle` from consumer to consumer until the first consumer doesn't hand it back. +/// +/// If none of the consumers is interested in `trickle`, give it back. +fn trickle_down(trickle: T, consumers: I, f: F) -> Option +where + I: Iterator, + F: Fn(&mut I::Item, T) -> Option, +{ + let mut trickle = Some(trickle); + + for mut consumer in consumers { + if let Some(trickle_) = trickle { + trickle = f(&mut consumer, trickle_); + + if trickle.is_none() { + break; + } + } + } + + trickle +} + +// ------------------------------------------------------------------------------------------------- + +/// Helper trait that ... +/// +/// * doesn't have an associated type and uses [`Any`] in [`Self::process_tagged`] +/// * is an object-safe "subset" of [`Task`] +trait TaskAny: Send { + fn process_data(&mut self, data: Data<'static>) -> Option>; + + fn process_untagged(&mut self, status_body: StatusBody<'static>) + -> Option>; + + fn process_continuation_request( + &mut self, + continuation_request: CommandContinuationRequest<'static>, + ) -> Option>; + + fn process_continuation_request_authenticate( + &mut self, + continuation_request: CommandContinuationRequest<'static>, + ) -> Result, CommandContinuationRequest<'static>>; + + fn process_bye(&mut self, bye: Bye<'static>) -> Option>; + + fn process_tagged(self: Box, status_body: StatusBody<'static>) -> Box; +} + +impl TaskAny for T +where + T: Task, +{ + fn process_data(&mut self, data: Data<'static>) -> Option> { + T::process_data(self, data) + } + + fn process_untagged( + &mut self, + status_body: StatusBody<'static>, + ) -> Option> { + T::process_untagged(self, status_body) + } + + fn process_continuation_request( + &mut self, + continuation_request: CommandContinuationRequest<'static>, + ) -> Option> { + T::process_continuation_request(self, continuation_request) + } + + fn process_continuation_request_authenticate( + &mut self, + continuation_request: CommandContinuationRequest<'static>, + ) -> Result, CommandContinuationRequest<'static>> { + T::process_continuation_request_authenticate(self, continuation_request) + } + + fn process_bye(&mut self, bye: Bye<'static>) -> Option> { + T::process_bye(self, bye) + } + + /// Returns [`Any`] instead of [`Task::Output`]. + fn process_tagged(self: Box, status_body: StatusBody<'static>) -> Box { + Box::new(T::process_tagged(*self, status_body)) + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::Scheduler; + + assert_impl_all!(Scheduler: Send); +} diff --git a/src/tasks/resolver.rs b/src/tasks/resolver.rs new file mode 100644 index 0000000..ec869e8 --- /dev/null +++ b/src/tasks/resolver.rs @@ -0,0 +1,90 @@ +use imap_flow::{client::ClientFlow, Flow, FlowInterrupt}; +use imap_types::response::{Response, Status}; +use tracing::{debug, warn}; + +use super::{Scheduler, SchedulerError, SchedulerEvent, Task, TaskHandle}; + +/// The resolver is a scheduler than manages one task at a time. +pub struct Resolver { + pub scheduler: Scheduler, +} + +impl Resolver { + /// Create a new resolver. + pub fn new(flow: ClientFlow) -> Self { + Self { + scheduler: Scheduler::new(flow), + } + } + + /// Enqueue a [`Task`] for immediate resolution. + pub fn resolve(&mut self, task: T) -> ResolvingTask { + let handle = self.scheduler.enqueue_task(task); + + ResolvingTask { + resolver: self, + handle, + } + } +} + +impl Flow for Resolver { + type Event = SchedulerEvent; + type Error = SchedulerError; + + fn enqueue_input(&mut self, bytes: &[u8]) { + self.scheduler.enqueue_input(bytes); + } + + fn progress(&mut self) -> Result> { + self.scheduler.progress() + } +} + +pub struct ResolvingTask<'a, T: Task> { + resolver: &'a mut Resolver, + handle: TaskHandle, +} + +impl Flow for ResolvingTask<'_, T> { + type Event = T::Output; + type Error = SchedulerError; + + fn enqueue_input(&mut self, bytes: &[u8]) { + self.resolver.enqueue_input(bytes); + } + + fn progress(&mut self) -> Result> { + loop { + match self.resolver.progress()? { + SchedulerEvent::GreetingReceived(greeting) => { + debug!("received greeting: {greeting:?}"); + } + SchedulerEvent::TaskFinished(mut token) => { + if let Some(output) = self.handle.resolve(&mut token) { + break Ok(output); + } else { + warn!(?token, "received unexpected task token") + } + } + SchedulerEvent::Unsolicited(unsolicited) => { + if let Response::Status(Status::Bye(bye)) = unsolicited { + let err = SchedulerError::UnexpectedByeResponse(bye); + break Err(FlowInterrupt::Error(err)); + } else { + warn!(?unsolicited, "received unsolicited"); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use static_assertions::assert_impl_all; + + use super::Resolver; + + assert_impl_all!(Resolver: Send); +} diff --git a/src/tasks/tasks/append.rs b/src/tasks/tasks/append.rs new file mode 100644 index 0000000..fee03bc --- /dev/null +++ b/src/tasks/tasks/append.rs @@ -0,0 +1,179 @@ +use imap_types::{ + command::CommandBody, + datetime::DateTime, + extensions::binary::LiteralOrLiteral8, + flag::Flag, + mailbox::Mailbox, + response::{Data, StatusBody, StatusKind}, +}; +use tracing::warn; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct AppendTask { + mailbox: Mailbox<'static>, + flags: Vec>, + date: Option, + message: LiteralOrLiteral8<'static>, + output: Option, +} + +impl AppendTask { + pub fn new(mailbox: Mailbox<'static>, message: LiteralOrLiteral8<'static>) -> Self { + Self { + mailbox, + flags: Default::default(), + date: Default::default(), + message, + output: Default::default(), + } + } + + pub fn set_flags(&mut self, flags: Vec>) { + self.flags = flags; + } + + pub fn add_flag(&mut self, flag: Flag<'static>) { + self.flags.push(flag); + } + + pub fn with_flags(mut self, flags: Vec>) -> Self { + self.set_flags(flags); + self + } + + pub fn with_flag(mut self, flag: Flag<'static>) -> Self { + self.add_flag(flag); + self + } + + pub fn set_date(&mut self, date: DateTime) { + self.date = Some(date); + } + + pub fn with_date(mut self, date: DateTime) -> Self { + self.set_date(date); + self + } +} + +impl Task for AppendTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Append { + mailbox: self.mailbox.clone(), + flags: self.flags.clone(), + date: self.date.clone(), + message: self.message.clone(), + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + // In case the mailbox is already selected, we should receive + // an `EXISTS` response. + if let Data::Exists(seq) = data { + if self.output.is_some() { + warn!("received duplicate APPEND EXISTS data"); + } + self.output = Some(seq); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} + +/// Special [`NoOpTask`](super::noop::NoOpTask) that captures `EXISTS` +/// responses. +/// +/// This task should be used whenever [`AppendTask`] does not return +/// the number of messages in the mailbox the appended message +/// resides. +#[derive(Clone, Debug, Default)] +pub struct PostAppendNoOpTask { + output: Option, +} + +impl PostAppendNoOpTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for PostAppendNoOpTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Noop + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Exists(seq) = data { + self.output = Some(seq); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} + +/// Special [`CheckTask`](super::check::CheckTask) that captures +/// `EXISTS` responses. +/// +/// This task should be used whenever [`AppendTask`] and +/// [`PostAppendNoOpTask`] do not return the number of messages in the +/// mailbox the appended message resides. +#[derive(Clone, Debug, Default)] +pub struct PostAppendCheckTask { + output: Option, +} + +impl PostAppendCheckTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for PostAppendCheckTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Check + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Exists(seq) = data { + self.output = Some(seq); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/appenduid.rs b/src/tasks/tasks/appenduid.rs new file mode 100644 index 0000000..11b4e70 --- /dev/null +++ b/src/tasks/tasks/appenduid.rs @@ -0,0 +1,86 @@ +use std::num::NonZeroU32; + +use imap_types::{ + command::CommandBody, + datetime::DateTime, + extensions::binary::LiteralOrLiteral8, + flag::Flag, + mailbox::Mailbox, + response::{Code, StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct AppendUidTask { + mailbox: Mailbox<'static>, + flags: Vec>, + date: Option, + message: LiteralOrLiteral8<'static>, +} + +impl AppendUidTask { + pub fn new(mailbox: Mailbox<'static>, message: LiteralOrLiteral8<'static>) -> Self { + Self { + mailbox, + flags: Default::default(), + date: Default::default(), + message, + } + } + + pub fn set_flags(&mut self, flags: Vec>) { + self.flags = flags; + } + + pub fn add_flag(&mut self, flag: Flag<'static>) { + self.flags.push(flag); + } + + pub fn with_flags(mut self, flags: Vec>) -> Self { + self.set_flags(flags); + self + } + + pub fn with_flag(mut self, flag: Flag<'static>) -> Self { + self.add_flag(flag); + self + } + + pub fn set_date(&mut self, date: DateTime) { + self.date = Some(date); + } + + pub fn with_date(mut self, date: DateTime) -> Self { + self.set_date(date); + self + } +} + +impl Task for AppendUidTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Append { + mailbox: self.mailbox.clone(), + flags: self.flags.clone(), + date: self.date.clone(), + message: self.message.clone(), + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => { + if let Some(Code::AppendUid { uid, uid_validity }) = status_body.code { + Ok(Some((uid, uid_validity))) + } else { + Ok(None) + } + } + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/authenticate.rs b/src/tasks/tasks/authenticate.rs new file mode 100644 index 0000000..686abcd --- /dev/null +++ b/src/tasks/tasks/authenticate.rs @@ -0,0 +1,136 @@ +use std::borrow::Cow; + +use imap_types::{ + auth::{AuthMechanism, AuthenticateData}, + command::CommandBody, + core::Vec1, + response::{Capability, Code, CommandContinuationRequest, Data, StatusBody, StatusKind}, + secret::Secret, + utils::escape_byte_string, +}; +use tracing::error; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct AuthenticateTask { + /// Authentication mechanism. + /// + /// Note: Currently used for `AUTH=PLAIN`, `AUTH=OAUTHBEARER` and `AUTH=XOAUTH2`. + /// Invariants need to be enforced through constructors. + mechanism: AuthMechanism<'static>, + /// Static authentication data. + /// + /// Note: Currently used for `AUTH=PLAIN`, `AUTH=OAUTHBEARER` and `AUTH=XOAUTH2`. + line: Option>, + /// Does the server support SASL's initial response? + ir: bool, + output: Option>>, +} + +impl AuthenticateTask { + pub fn plain(login: &str, passwd: &str, ir: bool) -> Self { + let line = format!("\x00{login}\x00{passwd}"); + + Self { + mechanism: AuthMechanism::Plain, + line: Some(line.into_bytes()), + ir, + output: None, + } + } + + pub fn oauthbearer(user: &str, host: &str, port: u16, token: &str, ir: bool) -> Self { + let line = + format!("n,a={user},\x01host={host}\x01port={port}\x01auth=Bearer {token}\x01\x01"); + + Self { + mechanism: AuthMechanism::OAuthBearer, + line: Some(line.into_bytes()), + ir, + output: None, + } + } + + pub fn xoauth2(user: &str, token: &str, ir: bool) -> Self { + let line = format!("user={user}\x01auth=Bearer {token}\x01\x01"); + + Self { + mechanism: AuthMechanism::XOAuth2, + line: Some(line.into_bytes()), + ir, + output: None, + } + } +} + +impl Task for AuthenticateTask { + type Output = Result>>, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Authenticate { + mechanism: self.mechanism.clone(), + initial_response: if self.ir { + // TODO: command_body must only be called once... hm... + Some(Secret::new(Cow::Owned(self.line.clone().unwrap()))) + } else { + None + }, + } + } + + // Capabilities may (unfortunately) be found in a data response. + // See https://github.com/modern-email/defects/issues/18 + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Capability(capabilities) = data { + self.output = Some(capabilities); + None + } else { + Some(data) + } + } + + fn process_continuation_request_authenticate( + &mut self, + continuation: CommandContinuationRequest<'static>, + ) -> Result, CommandContinuationRequest<'static>> { + let cancel = || match self.mechanism { + AuthMechanism::XOAuth2 => { + let err = match continuation { + CommandContinuationRequest::Basic(data) => data.text().to_string(), + CommandContinuationRequest::Base64(data) => escape_byte_string(data.as_ref()), + }; + + error!("cannot authenticate using XOAUTH2 mechanism: {err}"); + + AuthenticateData::r#continue(vec![]) + } + _ => AuthenticateData::Cancel, + }; + + if self.ir { + return Ok(cancel()); + } + + match self.line.take() { + Some(data) => Ok(AuthenticateData::r#continue(data)), + None => Ok(cancel()), + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok( + // Capabilities may be found in the status body of tagged response. + if let Some(Code::Capability(capabilities)) = status_body.code { + Some(capabilities) + } else { + self.output + }, + ), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/capability.rs b/src/tasks/tasks/capability.rs new file mode 100644 index 0000000..f0b1d89 --- /dev/null +++ b/src/tasks/tasks/capability.rs @@ -0,0 +1,70 @@ +use imap_types::{ + command::CommandBody, + core::Vec1, + response::{Capability, Code, Data, StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct CapabilityTask { + /// We use this as scratch space. + output: Option>>, +} + +impl CapabilityTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for CapabilityTask { + type Output = Result>, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Capability + } + + // Capabilities may be found in a data response. + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Capability(capabilities) = data { + self.output = Some(capabilities); + None + } else { + Some(data) + } + } + + // Capabilities may (unfortunately) be found in a data response. + // See https://github.com/modern-email/defects/issues/18 + fn process_untagged( + &mut self, + status_body: StatusBody<'static>, + ) -> Option> { + if let Some(Code::Capability(capabilities)) = status_body.code { + self.output = Some(capabilities); + None + } else { + Some(status_body) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => match self.output { + Some(capabilities) => Ok(capabilities), + None => { + // Capabilities may be found in the status body of tagged response. + if let Some(Code::Capability(capabilities)) = status_body.code { + Ok(capabilities) + } else { + Err(TaskError::MissingData("CAPABILITY".into())) + } + } + }, + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/check.rs b/src/tasks/tasks/check.rs new file mode 100644 index 0000000..530678b --- /dev/null +++ b/src/tasks/tasks/check.rs @@ -0,0 +1,32 @@ +use imap_types::{ + command::CommandBody, + response::{StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct CheckTask; + +impl CheckTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for CheckTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Check + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/copy.rs b/src/tasks/tasks/copy.rs new file mode 100644 index 0000000..c0c7ebb --- /dev/null +++ b/src/tasks/tasks/copy.rs @@ -0,0 +1,54 @@ +use imap_types::{ + command::CommandBody, + mailbox::Mailbox, + response::{StatusBody, StatusKind}, + sequence::SequenceSet, +}; + +use super::TaskError; +use crate::Task; + +pub struct CopyTask { + sequence_set: SequenceSet, + mailbox: Mailbox<'static>, + uid: bool, +} + +impl CopyTask { + pub fn new(sequence_set: SequenceSet, mailbox: Mailbox<'static>) -> Self { + Self { + sequence_set, + mailbox, + uid: true, + } + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Task for CopyTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Copy { + sequence_set: self.sequence_set.clone(), + mailbox: self.mailbox.clone(), + uid: self.uid, + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/create.rs b/src/tasks/tasks/create.rs new file mode 100644 index 0000000..f0dfdb6 --- /dev/null +++ b/src/tasks/tasks/create.rs @@ -0,0 +1,37 @@ +use imap_types::{ + command::CommandBody, + mailbox::Mailbox, + response::{StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct CreateTask { + mailbox: Mailbox<'static>, +} + +impl CreateTask { + pub fn new(mailbox: Mailbox<'static>) -> Self { + Self { mailbox } + } +} + +impl Task for CreateTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Create { + mailbox: self.mailbox.clone(), + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/delete.rs b/src/tasks/tasks/delete.rs new file mode 100644 index 0000000..3c6c626 --- /dev/null +++ b/src/tasks/tasks/delete.rs @@ -0,0 +1,37 @@ +use imap_types::{ + command::CommandBody, + mailbox::Mailbox, + response::{StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct DeleteTask { + mailbox: Mailbox<'static>, +} + +impl DeleteTask { + pub fn new(mailbox: Mailbox<'static>) -> Self { + Self { mailbox } + } +} + +impl Task for DeleteTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Delete { + mailbox: self.mailbox.clone(), + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/expunge.rs b/src/tasks/tasks/expunge.rs new file mode 100644 index 0000000..174f637 --- /dev/null +++ b/src/tasks/tasks/expunge.rs @@ -0,0 +1,57 @@ +use std::num::NonZeroU32; + +use imap_types::{ + command::CommandBody, + response::{Data, StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +/// Permanently removes messages containing the Deleted flag in their +/// envelope. +/// +/// Be aware that the returned vector can contain multiple time the +/// same sequence number, depending on the server implementation +/// style: +/// +/// > For example, if the last 5 messages in a 9-message mailbox are +/// expunged, a "lower to higher" server will send five untagged +/// EXPUNGE responses for message sequence number 5, whereas a "higher +/// to lower server" will send successive untagged EXPUNGE responses +/// for message sequence numbers 9, 8, 7, 6, and 5 +#[derive(Clone, Debug, Default)] +pub struct ExpungeTask { + output: Vec, +} + +impl ExpungeTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for ExpungeTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Expunge + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Expunge(seq) = data { + self.output.push(seq); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/fetch.rs b/src/tasks/tasks/fetch.rs new file mode 100644 index 0000000..e4a75c2 --- /dev/null +++ b/src/tasks/tasks/fetch.rs @@ -0,0 +1,137 @@ +use std::{collections::HashMap, num::NonZeroU32}; + +use imap_types::{ + command::CommandBody, + core::Vec1, + fetch::{MacroOrMessageDataItemNames, MessageDataItem}, + response::{Data, StatusBody, StatusKind}, + sequence::{SeqOrUid, SequenceSet}, +}; +use tracing::warn; + +use super::TaskError; +use crate::Task; + +/// Fetch message data items matching the given sequence set and the +/// given item names (or macro). +#[derive(Clone, Debug)] +pub struct FetchTask { + sequence_set: SequenceSet, + macro_or_item_names: MacroOrMessageDataItemNames<'static>, + uid: bool, + output: HashMap>>, +} + +impl FetchTask { + pub fn new(sequence_set: SequenceSet, items: MacroOrMessageDataItemNames<'static>) -> Self { + Self { + sequence_set, + macro_or_item_names: items, + uid: true, + output: Default::default(), + } + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Task for FetchTask { + type Output = Result>>, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Fetch { + sequence_set: self.sequence_set.clone(), + macro_or_item_names: self.macro_or_item_names.clone(), + uid: self.uid, + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Fetch { items, seq } = data { + if let Some(items) = self.output.insert(seq, items) { + warn!(seq, ?items, "received duplicate items"); + } + + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} + +/// Same as [`FetchTask`], except that it only collects message data +/// items for the first message matching the given id. +#[derive(Clone, Debug)] +pub struct FetchFirstTask { + id: NonZeroU32, + macro_or_item_names: MacroOrMessageDataItemNames<'static>, + uid: bool, + output: Option>>, +} + +impl FetchFirstTask { + pub fn new(id: NonZeroU32, items: MacroOrMessageDataItemNames<'static>) -> Self { + Self { + id, + macro_or_item_names: items, + uid: true, + output: None, + } + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Task for FetchFirstTask { + type Output = Result>, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Fetch { + sequence_set: SequenceSet::from(SeqOrUid::from(self.id)), + macro_or_item_names: self.macro_or_item_names.clone(), + uid: self.uid, + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Fetch { items, .. } = data { + self.output = Some(items); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => match self.output { + Some(items) => Ok(items), + None => Err(TaskError::MissingData("FETCH: items".into())), + }, + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/id.rs b/src/tasks/tasks/id.rs new file mode 100644 index 0000000..68fcc91 --- /dev/null +++ b/src/tasks/tasks/id.rs @@ -0,0 +1,50 @@ +use imap_types::{ + command::CommandBody, + core::{IString, NString}, + response::{Data, StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct IdTask { + client: Option, NString<'static>)>>, + server: Option, NString<'static>)>>, +} + +impl IdTask { + pub fn new(parameters: Option, NString<'static>)>>) -> Self { + Self { + client: parameters, + server: None, + } + } +} + +impl Task for IdTask { + type Output = Result, NString<'static>)>>, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Id { + parameters: self.client.clone(), + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Id { parameters } = data { + self.server = parameters; + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.server), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/list.rs b/src/tasks/tasks/list.rs new file mode 100644 index 0000000..f07922a --- /dev/null +++ b/src/tasks/tasks/list.rs @@ -0,0 +1,71 @@ +use imap_types::{ + command::CommandBody, + core::QuotedChar, + flag::FlagNameAttribute, + mailbox::{ListMailbox, Mailbox}, + response::{Data, StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct ListTask { + mailbox: Mailbox<'static>, + mailbox_wildcard: ListMailbox<'static>, + output: Vec<( + Mailbox<'static>, + Option, + Vec>, + )>, +} + +impl ListTask { + pub fn new(mailbox: Mailbox<'static>, mailbox_wildcard: ListMailbox<'static>) -> Self { + Self { + mailbox, + mailbox_wildcard, + output: Vec::new(), + } + } +} + +impl Task for ListTask { + type Output = Result< + Vec<( + Mailbox<'static>, + Option, + Vec>, + )>, + TaskError, + >; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::List { + reference: self.mailbox.clone(), + mailbox_wildcard: self.mailbox_wildcard.clone(), + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::List { + items, + delimiter, + mailbox, + } = data + { + self.output.push((mailbox, delimiter, items)); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/logout.rs b/src/tasks/tasks/logout.rs new file mode 100644 index 0000000..8f5d0a2 --- /dev/null +++ b/src/tasks/tasks/logout.rs @@ -0,0 +1,45 @@ +use imap_types::{ + command::CommandBody, + response::{Bye, StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct LogoutTask { + got_bye: bool, +} + +impl LogoutTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for LogoutTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Logout + } + + fn process_bye(&mut self, _: Bye<'static>) -> Option> { + self.got_bye = true; + None + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => { + if self.got_bye { + Ok(()) + } else { + Err(TaskError::MissingData("LOGOUT: BYE".into())) + } + } + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/mod.rs b/src/tasks/tasks/mod.rs new file mode 100644 index 0000000..cece5fc --- /dev/null +++ b/src/tasks/tasks/mod.rs @@ -0,0 +1,36 @@ +use imap_types::response::StatusBody; +use thiserror::Error; + +pub mod append; +pub mod appenduid; +pub mod authenticate; +pub mod capability; +pub mod check; +pub mod copy; +pub mod create; +pub mod delete; +pub mod expunge; +pub mod fetch; +pub mod id; +pub mod list; +pub mod logout; +pub mod r#move; +pub mod noop; +pub mod search; +pub mod select; +pub mod sort; +pub mod starttls; +pub mod store; +pub mod thread; + +#[derive(Debug, Error)] +pub enum TaskError { + #[error("unexpected BAD response: {}", .0.text)] + UnexpectedBadResponse(StatusBody<'static>), + + #[error("unexpected NO response: {}", .0.text)] + UnexpectedNoResponse(StatusBody<'static>), + + #[error("missing required data for command {0}")] + MissingData(String), +} diff --git a/src/tasks/tasks/move.rs b/src/tasks/tasks/move.rs new file mode 100644 index 0000000..d6a4a7b --- /dev/null +++ b/src/tasks/tasks/move.rs @@ -0,0 +1,54 @@ +use imap_types::{ + command::CommandBody, + mailbox::Mailbox, + response::{StatusBody, StatusKind}, + sequence::SequenceSet, +}; + +use super::TaskError; +use crate::Task; + +pub struct MoveTask { + sequence_set: SequenceSet, + mailbox: Mailbox<'static>, + uid: bool, +} + +impl MoveTask { + pub fn new(sequence_set: SequenceSet, mailbox: Mailbox<'static>) -> Self { + Self { + sequence_set, + mailbox, + uid: true, + } + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Task for MoveTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Move { + sequence_set: self.sequence_set.clone(), + mailbox: self.mailbox.clone(), + uid: self.uid, + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/noop.rs b/src/tasks/tasks/noop.rs new file mode 100644 index 0000000..fda031d --- /dev/null +++ b/src/tasks/tasks/noop.rs @@ -0,0 +1,32 @@ +use imap_types::{ + command::CommandBody, + response::{StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct NoOpTask; + +impl NoOpTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for NoOpTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Noop + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/search.rs b/src/tasks/tasks/search.rs new file mode 100644 index 0000000..1968465 --- /dev/null +++ b/src/tasks/tasks/search.rs @@ -0,0 +1,75 @@ +use std::num::NonZeroU32; + +use imap_types::{ + command::CommandBody, + core::Vec1, + response::{Data, StatusBody, StatusKind}, + search::SearchKey, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct SearchTask { + criteria: Vec1>, + uid: bool, + output: Vec, +} + +impl SearchTask { + pub fn new(criteria: Vec1>) -> Self { + Self { + criteria, + ..Default::default() + } + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Default for SearchTask { + fn default() -> Self { + Self { + criteria: Vec1::from(SearchKey::All), + uid: true, + output: Default::default(), + } + } +} + +impl Task for SearchTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Search { + charset: None, + criteria: self.criteria.clone(), + uid: self.uid, + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Search(ids) = data { + self.output = ids; + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/select.rs b/src/tasks/tasks/select.rs new file mode 100644 index 0000000..784577c --- /dev/null +++ b/src/tasks/tasks/select.rs @@ -0,0 +1,154 @@ +use std::num::NonZeroU32; + +use imap_types::{ + command::CommandBody, + flag::{Flag, FlagPerm}, + mailbox::Mailbox, + response::{Code, Data, StatusBody, StatusKind}, +}; +use tracing::warn; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct SelectDataUnvalidated { + // required untagged responses + pub flags: Option>>, + pub exists: Option, + pub recent: Option, + + // required OK untagged responses + pub unseen: Option, + pub permanent_flags: Option>>, + pub uid_next: Option, + pub uid_validity: Option, +} + +impl SelectDataUnvalidated { + pub fn validate(self) -> Result { + if self.flags.is_none() { + warn!("missing required FLAGS untagged response"); + } + + if self.exists.is_none() { + warn!("missing required EXISTS untagged response"); + } + + if self.recent.is_none() { + warn!("missing required RECENT untagged response"); + } + + if self.unseen.is_none() { + warn!("missing required UNSEEN OK untagged response"); + } + + if self.permanent_flags.is_none() { + warn!("missing required PERMANENTFLAGS OK untagged response"); + } + + if self.uid_next.is_none() { + warn!("missing required UIDNEXT OK untagged response"); + } + + if self.uid_validity.is_none() { + warn!("missing required UIDVALIDITY OK untagged response"); + } + + Ok(self) + } +} + +#[derive(Clone, Debug)] +pub struct SelectTask { + mailbox: Mailbox<'static>, + read_only: bool, + output: SelectDataUnvalidated, +} + +impl SelectTask { + pub fn new(mailbox: Mailbox<'static>) -> Self { + Self { + mailbox, + read_only: false, + output: Default::default(), + } + } + + pub fn read_only(mailbox: Mailbox<'static>) -> Self { + Self { + mailbox, + read_only: true, + output: Default::default(), + } + } +} + +impl Task for SelectTask { + type Output = Result; + + fn command_body(&self) -> CommandBody<'static> { + let mailbox = self.mailbox.clone(); + + if self.read_only { + CommandBody::Examine { mailbox } + } else { + CommandBody::Select { mailbox } + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + match data { + Data::Flags(flags) => { + self.output.flags = Some(flags); + None + } + Data::Exists(count) => { + self.output.exists = Some(count); + None + } + Data::Recent(count) => { + self.output.recent = Some(count); + None + } + data => Some(data), + } + } + + fn process_untagged( + &mut self, + status_body: StatusBody<'static>, + ) -> Option> { + if let StatusKind::Ok = status_body.kind { + match status_body.code { + Some(Code::Unseen(seq)) => { + self.output.unseen = Some(seq); + None + } + Some(Code::PermanentFlags(flags)) => { + self.output.permanent_flags = Some(flags); + None + } + Some(Code::UidNext(uid)) => { + self.output.uid_next = Some(uid); + None + } + Some(Code::UidValidity(uid)) => { + self.output.uid_validity = Some(uid); + None + } + _ => Some(status_body), + } + } else { + Some(status_body) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => self.output.validate(), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/sort.rs b/src/tasks/tasks/sort.rs new file mode 100644 index 0000000..32ba630 --- /dev/null +++ b/src/tasks/tasks/sort.rs @@ -0,0 +1,91 @@ +use std::num::NonZeroU32; + +use imap_types::{ + command::CommandBody, + core::{Charset, Vec1}, + extensions::sort::SortCriterion, + response::{Data, StatusBody, StatusKind}, + search::SearchKey, +}; +use tracing::warn; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct SortTask { + sort_criteria: Vec1, + charset: Charset<'static>, + search_criteria: Vec1>, + uid: bool, + output: Option>, +} + +impl SortTask { + pub fn new( + sort_criteria: Vec1, + search_criteria: Vec1>, + ) -> Self { + Self { + sort_criteria, + charset: Charset::try_from("UTF-8").unwrap(), + search_criteria, + uid: true, + output: Default::default(), + } + } + + pub fn set_charset(&mut self, charset: Charset<'static>) { + self.charset = charset; + } + + pub fn with_charset(mut self, charset: Charset<'static>) -> Self { + self.set_charset(charset); + self + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Task for SortTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Sort { + sort_criteria: self.sort_criteria.clone(), + charset: self.charset.clone(), + search_criteria: self.search_criteria.clone(), + uid: self.uid, + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Sort(ids) = data { + if self.output.is_some() { + warn!("received duplicate sort data"); + } + self.output = Some(ids); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => match self.output { + Some(output) => Ok(output), + None => Err(TaskError::MissingData("SORT".into())), + }, + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/starttls.rs b/src/tasks/tasks/starttls.rs new file mode 100644 index 0000000..9a7dd04 --- /dev/null +++ b/src/tasks/tasks/starttls.rs @@ -0,0 +1,32 @@ +use imap_types::{ + command::CommandBody, + response::{StatusBody, StatusKind}, +}; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug, Default)] +pub struct StartTlsTask; + +impl StartTlsTask { + pub fn new() -> Self { + Default::default() + } +} + +impl Task for StartTlsTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::StartTLS + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/store.rs b/src/tasks/tasks/store.rs new file mode 100644 index 0000000..0777170 --- /dev/null +++ b/src/tasks/tasks/store.rs @@ -0,0 +1,117 @@ +use std::{collections::HashMap, num::NonZeroU32}; + +use imap_types::{ + command::CommandBody, + core::Vec1, + fetch::MessageDataItem, + flag::{Flag, StoreResponse, StoreType}, + response::{Data, StatusBody, StatusKind}, + sequence::SequenceSet, +}; +use tracing::warn; + +use super::TaskError; +use crate::Task; + +/// Alter message data. +#[derive(Clone, Debug)] +pub struct StoreTask { + sequence_set: SequenceSet, + kind: StoreType, + flags: Vec>, + uid: bool, + output: HashMap>>, +} + +impl StoreTask { + pub fn new(sequence_set: SequenceSet, kind: StoreType, flags: Vec>) -> Self { + Self { + sequence_set, + kind, + flags, + uid: true, + output: Default::default(), + } + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } + + pub fn silent(self) -> SilentStoreTask { + SilentStoreTask::new(self) + } +} + +impl Task for StoreTask { + type Output = Result>>, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Store { + sequence_set: self.sequence_set.clone(), + kind: self.kind, + response: StoreResponse::Answer, + flags: self.flags.clone(), + uid: self.uid, + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Fetch { items, seq } = data { + if let Some(items) = self.output.insert(seq, items) { + warn!(seq, ?items, "received duplicate items"); + } + + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(self.output), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} + +/// Alter message data instructing the server to not send the updated values. +/// +/// Note: Same as [`StoreTask`], except that it does not return any output. +#[derive(Clone, Debug)] +pub struct SilentStoreTask(StoreTask); + +impl SilentStoreTask { + pub fn new(store: StoreTask) -> Self { + Self(store) + } +} + +impl Task for SilentStoreTask { + type Output = Result<(), TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Store { + sequence_set: self.0.sequence_set.clone(), + kind: self.0.kind, + response: StoreResponse::Silent, + flags: self.0.flags.clone(), + uid: self.0.uid, + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => Ok(()), + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +} diff --git a/src/tasks/tasks/thread.rs b/src/tasks/tasks/thread.rs new file mode 100644 index 0000000..1d97e82 --- /dev/null +++ b/src/tasks/tasks/thread.rs @@ -0,0 +1,89 @@ +use imap_types::{ + command::CommandBody, + core::{Charset, Vec1}, + extensions::thread::{Thread, ThreadingAlgorithm}, + response::{Data, StatusBody, StatusKind}, + search::SearchKey, +}; +use tracing::warn; + +use super::TaskError; +use crate::Task; + +#[derive(Clone, Debug)] +pub struct ThreadTask { + algorithm: ThreadingAlgorithm<'static>, + charset: Charset<'static>, + search_criteria: Vec1>, + uid: bool, + output: Option>, +} + +impl ThreadTask { + pub fn new( + algorithm: ThreadingAlgorithm<'static>, + search_criteria: Vec1>, + ) -> Self { + Self { + algorithm, + charset: Charset::try_from("UTF-8").unwrap(), + search_criteria, + uid: true, + output: Default::default(), + } + } + + pub fn set_charset(&mut self, charset: Charset<'static>) { + self.charset = charset; + } + + pub fn with_charset(mut self, charset: Charset<'static>) -> Self { + self.set_charset(charset); + self + } + + pub fn set_uid(&mut self, uid: bool) { + self.uid = uid; + } + + pub fn with_uid(mut self, uid: bool) -> Self { + self.set_uid(uid); + self + } +} + +impl Task for ThreadTask { + type Output = Result, TaskError>; + + fn command_body(&self) -> CommandBody<'static> { + CommandBody::Thread { + algorithm: self.algorithm.clone(), + charset: self.charset.clone(), + search_criteria: self.search_criteria.clone(), + uid: self.uid, + } + } + + fn process_data(&mut self, data: Data<'static>) -> Option> { + if let Data::Thread(threads) = data { + if self.output.is_some() { + warn!("received duplicate thread data"); + } + self.output = Some(threads); + None + } else { + Some(data) + } + } + + fn process_tagged(self, status_body: StatusBody<'static>) -> Self::Output { + match status_body.kind { + StatusKind::Ok => match self.output { + Some(output) => Ok(output), + None => Err(TaskError::MissingData("SORT".into())), + }, + StatusKind::No => Err(TaskError::UnexpectedNoResponse(status_body)), + StatusKind::Bad => Err(TaskError::UnexpectedBadResponse(status_body)), + } + } +}