diff --git a/imap/Cargo.toml b/imap/Cargo.toml index 99bd30a..d025b96 100644 --- a/imap/Cargo.toml +++ b/imap/Cargo.toml @@ -19,6 +19,7 @@ required-features = ["stderrlog"] [features] default = ["rfc2177", "rfc6154"] +low-level = [] rfc2087 = [] # quota rfc2177 = [] # idle rfc6154 = [] # list diff --git a/imap/src/client/client.rs b/imap/src/client/client.rs index 1d297d9..05db555 100644 --- a/imap/src/client/client.rs +++ b/imap/src/client/client.rs @@ -6,7 +6,7 @@ use futures::{ future::{self, FutureExt}, stream::{Stream, StreamExt}, }; -use tokio::{net::TcpStream, sync::mpsc}; +use tokio::net::TcpStream; use tokio_rustls::client::TlsStream; use crate::proto::{ @@ -46,6 +46,10 @@ pub struct Config { pub verify_hostname: bool, } +impl Config { + pub fn builder() -> ConfigBuilder { ConfigBuilder::default() } +} + impl ConfigBuilder { pub async fn open(&self) -> Result { let config = self.build()?; @@ -67,6 +71,7 @@ impl ConfigBuilder { } } +/// A client that hasn't been authenticated. pub enum ClientUnauthenticated { Encrypted(Inner>), Unencrypted(Inner), @@ -87,6 +92,7 @@ impl ClientUnauthenticated { client_expose!(async has_capability(cap: impl AsRef) -> Result); } +/// A client that has been authenticated. pub enum ClientAuthenticated { Encrypted(Inner>), Unencrypted(Inner), @@ -96,13 +102,6 @@ impl ClientAuthenticated { client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); - fn sender(&self) -> mpsc::UnboundedSender { - match self { - ClientAuthenticated::Encrypted(e) => e.write_tx.clone(), - ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(), - } - } - /// Runs the LIST command pub async fn list(&mut self) -> Result> { let cmd = Command::List(CommandList { @@ -216,11 +215,13 @@ impl ClientAuthenticated { /// Runs the IDLE command #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] - pub async fn idle(&mut self) -> Result { + pub async fn idle<'a>(&'a mut self) -> Result> { let cmd = Command::Idle; let stream = self.execute(cmd).await?; - let sender = self.sender(); - Ok(IdleToken { stream, sender }) + Ok(IdleToken { + stream, + _client: self, + }) } } @@ -240,23 +241,24 @@ pub struct SelectResponse { /// DONE command will be sent to the server as a result. #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] -pub struct IdleToken { +pub struct IdleToken<'a> { pub stream: ResponseStream, - sender: mpsc::UnboundedSender, + // sender: mpsc::UnboundedSender, + _client: &'a mut ClientAuthenticated, } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] -impl Drop for IdleToken { +impl<'a> Drop for IdleToken<'a> { fn drop(&mut self) { - // TODO: should ignore this? - self.sender.send(format!("DONE\r\n")).unwrap(); + // TODO: put this into a channel instead + // tokio::spawn(self.client.execute(Command::Done)); } } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] -impl Stream for IdleToken { +impl<'a> Stream for IdleToken<'a> { type Item = Response; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let stream = Pin::new(&mut self.stream); diff --git a/imap/src/codec.rs b/imap/src/client/codec.rs similarity index 67% rename from imap/src/codec.rs rename to imap/src/client/codec.rs index ecc39da..b1144e3 100644 --- a/imap/src/codec.rs +++ b/imap/src/client/codec.rs @@ -1,11 +1,16 @@ use std::io; -use bytes::{Buf, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use nom::Needed; use tokio_util::codec::{Decoder, Encoder}; -use crate::proto::{command::Command, response::Response, rfc3501::response as parse_response}; +use crate::proto::{ + command::Command, + response::{Response, Tag}, + rfc3501::response as parse_response, +}; +/// A codec that can be used for decoding `Response`s and encoding `Command`s. #[derive(Default)] pub struct ImapCodec { decode_need_message_bytes: usize, @@ -47,14 +52,19 @@ impl<'a> Decoder for ImapCodec { } } -impl<'a> Encoder<&'a Command> for ImapCodec { +/// A command with its accompanying tag. +pub struct TaggedCommand(pub Tag, pub Command); + +impl<'a> Encoder<&'a TaggedCommand> for ImapCodec { type Error = io::Error; - fn encode(&mut self, _msg: &Command, _dst: &mut BytesMut) -> Result<(), io::Error> { - todo!() - // dst.put(&*msg.0); - // dst.put_u8(b' '); - // dst.put_slice(&*msg.1); - // dst.put_slice(b"\r\n"); - // Ok(()) + fn encode(&mut self, tagged_cmd: &TaggedCommand, dst: &mut BytesMut) -> Result<(), io::Error> { + let tag = &tagged_cmd.0; + let _command = &tagged_cmd.1; + + dst.put(&*tag.0); + dst.put_u8(b' '); + // TODO: write command + dst.put_slice(b"\r\n"); + Ok(()) } } diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index c78be57..5d14dd4 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -1,27 +1,33 @@ -use std::sync::atomic::AtomicU32; +use std::sync::atomic::{AtomicU32, Ordering}; use anyhow::Result; use futures::{ - future::{FutureExt, TryFutureExt}, + future::{self, FutureExt, TryFutureExt}, + sink::SinkExt, stream::StreamExt, }; use tokio::{ - io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}, + io::{split, AsyncRead, AsyncWrite, ReadHalf, WriteHalf}, sync::{mpsc, oneshot}, task::JoinHandle, }; use tokio_rustls::client::TlsStream; -use tokio_util::codec::FramedRead; +use tokio_util::codec::{FramedRead, FramedWrite}; -use crate::codec::ImapCodec; use crate::proto::{ - bytes::Bytes, command::Command, response::Response, rfc3501::capability as parse_capability, + bytes::Bytes, + command::Command, + response::{Response, Tag}, + rfc3501::capability as parse_capability, }; use super::client::Config; +use super::codec::{ImapCodec, TaggedCommand}; use super::response_stream::ResponseStream; use super::upgrade::upgrade; +const TAG_PREFIX: &str = "panotag"; + type ExitSender = oneshot::Sender<()>; type ExitListener = oneshot::Receiver<()>; type GreetingSender = oneshot::Sender<()>; @@ -32,19 +38,23 @@ type GreetingWaiter = oneshot::Receiver<()>; pub struct Inner { config: Config, tag_number: AtomicU32, + command_tx: mpsc::UnboundedSender, read_exit: ExitSender, read_handle: JoinHandle>, write_exit: ExitSender, - pub(crate) write_tx: mpsc::UnboundedSender, write_handle: JoinHandle>, + _write_tx: mpsc::UnboundedSender, greeting_rx: Option, } +#[derive(Debug)] struct CommandContainer { + tag: Tag, command: Command, + channel: mpsc::UnboundedSender, } impl Inner @@ -52,6 +62,8 @@ where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { pub async fn new(c: C, config: Config) -> Result { + let (command_tx, command_rx) = mpsc::unbounded_channel(); + // break the stream of bytes into a reader and a writer // the read_half represents the server->client connection // the write_half represents the client->server connection @@ -63,28 +75,47 @@ where // spawn the server->client loop let (read_exit, exit_rx) = oneshot::channel(); - let read_handle = tokio::spawn(read_loop(read_half, exit_rx, greeting_tx)); + let (write_tx, write_rx) = mpsc::unbounded_channel(); // TODO: maybe an arbitrary/configurable limit here would be better? + let read_handle = tokio::spawn(read_loop( + read_half, + exit_rx, + greeting_tx, + write_tx.clone(), + command_rx, + )); // spawn the client->server loop let (write_exit, exit_rx) = oneshot::channel(); - // TODO: maybe an arbitrary/configurable limit here would be better? - let (write_tx, write_rx) = mpsc::unbounded_channel(); let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx)); let tag_number = AtomicU32::new(0); Ok(Inner { config, tag_number, + command_tx, read_exit, read_handle, write_exit, - write_tx, write_handle, + _write_tx: write_tx, greeting_rx: Some(greeting_rx), }) } - pub async fn execute(&mut self, _command: Command) -> Result { todo!() } + pub async fn execute(&mut self, command: Command) -> Result { + let id = self.tag_number.fetch_add(1, Ordering::SeqCst); + let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id))); + + let (channel, rx) = mpsc::unbounded_channel(); + self.command_tx.send(CommandContainer { + tag, + command, + channel, + })?; + + let stream = ResponseStream { inner: rx }; + Ok(stream) + } pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { // TODO: cache capabilities if needed? @@ -139,6 +170,8 @@ async fn read_loop( stream: ReadHalf, exit: ExitListener, greeting_tx: GreetingSender, + write_tx: mpsc::UnboundedSender, + mut command_rx: mpsc::UnboundedReceiver, ) -> ReadHalf where C: AsyncRead, @@ -158,14 +191,55 @@ where let next = framed.next().fuse(); pin_mut!(next); + // only listen for a new command if there isn't one already + let mut cmd_fut = if let Some(_) = curr_cmd { + // if there is one, just make a future that never resolves so it'll always pick + // the other options in the select. + future::pending().boxed().fuse() + } else { + command_rx.recv().boxed().fuse() + }; + select! { - msg = next => { - println!("hellosu {:?}", msg); + // read a command from the command list + mut command = cmd_fut => { + if curr_cmd.is_none() { + if let Some(CommandContainer { tag, command, .. }) = command.take() { + let _ = write_tx.send(TaggedCommand(tag, command)); + // let cmd_str = format!("{} {:?}\r\n", tag, cmd); + // write_tx.send(cmd_str); + } + curr_cmd = command; + } + } + + // new message from the server + resp = next => { + let resp = match resp { + Some(Ok(v)) => v, + a => { error!("failed: {:?}", a); todo!("fuck"); }, + }; + trace!("S>>>C: {:?}", resp); // if this is the very first response, then it's a greeting if let Some(greeting_tx) = greeting_tx.take() { greeting_tx.send(()).unwrap(); } + + if let Response::Done(_) = resp { + // since this is the DONE message, clear curr_cmd so another one can be sent + if let Some(CommandContainer { channel, .. }) = curr_cmd.take() { + let _ = channel.send(resp); + // debug!("res0: {:?}", res); + } + } else if let Some(CommandContainer { channel, .. }) = curr_cmd.as_mut() { + // we got a response from the server for this command, so send it over the + // channel + + // debug!("sending {:?} to tag {}", resp, tag); + let _res = channel.send(resp); + // debug!("res1: {:?}", res); + } } _ = exit => break, } @@ -175,30 +249,35 @@ where } async fn write_loop( - mut stream: WriteHalf, + stream: WriteHalf, exit_rx: ExitListener, - mut write_rx: mpsc::UnboundedReceiver, + mut command_rx: mpsc::UnboundedReceiver, ) -> WriteHalf where C: AsyncWrite, { + // set up framed communication + let codec = ImapCodec::default(); + let mut framed = FramedWrite::new(stream, codec); + let mut exit_rx = exit_rx.map_err(|_| ()).shared(); loop { - let write_fut = write_rx.recv().fuse(); - pin_mut!(write_fut); + let command_fut = command_rx.recv().fuse(); + pin_mut!(command_fut); select! { - line = write_fut => { - if let Some(line) = line { - // TODO: handle errors here - let _ = stream.write_all(line.as_bytes()).await; - let _ = stream.flush().await; - trace!("C>>>S: {:?}", line); + command = command_fut => { + // TODO: handle errors here + if let Some(command) = command { + let _ = framed.send(&command).await; } + // let _ = stream.write_all(line.as_bytes()).await; + // let _ = stream.flush().await; + // trace!("C>>>S: {:?}", line); } _ = exit_rx => break, } } - stream + framed.into_inner() } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index 2ef71c9..7edd38a 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -1,13 +1,27 @@ +//! High-level IMAP Client +//! --- +//! +//! ```no_run +//! let client = Config::builder() +//! .hostname("your.domain") +//! .port(993) +//! .open().await?; +//! ``` + #[macro_use] mod macros; pub mod auth; -pub mod client; pub mod configurable_cert_verifier; pub mod response_stream; -pub mod upgrade; +mod client; +mod codec; mod inner; +mod upgrade; -pub use self::client::ConfigBuilder; +pub use self::client::{ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder}; +pub use self::codec::{ImapCodec, TaggedCommand}; + +#[cfg(feature = "low-level")] pub use self::inner::Inner; diff --git a/imap/src/lib.rs b/imap/src/lib.rs index 4245a1c..0d7e9c1 100644 --- a/imap/src/lib.rs +++ b/imap/src/lib.rs @@ -8,10 +8,9 @@ extern crate log; extern crate futures; #[macro_use] extern crate derive_builder; -#[macro_use] -extern crate bitflags; +// #[macro_use] +// extern crate bitflags; pub mod client; -pub mod codec; -pub mod events; +// pub mod events; pub mod proto; diff --git a/imap/src/proto/command.rs b/imap/src/proto/command.rs index c61a50b..26192f4 100644 --- a/imap/src/proto/command.rs +++ b/imap/src/proto/command.rs @@ -1,6 +1,6 @@ use crate::proto::bytes::Bytes; -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum Command { // Any state Capability, @@ -41,32 +41,34 @@ pub enum Command { // Extensions #[cfg(feature = "rfc2177")] Idle, + #[cfg(feature = "rfc2177")] + Done, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CommandFetch { pub ids: Vec, pub items: FetchItems, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CommandList { pub reference: Bytes, pub mailbox: Bytes, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CommandLogin { pub username: Bytes, pub password: Bytes, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CommandSearch { pub criteria: SearchCriteria, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CommandSelect { pub mailbox: Bytes, } diff --git a/imap/src/proto/mod.rs b/imap/src/proto/mod.rs index 17fd2b4..617c5dd 100644 --- a/imap/src/proto/mod.rs +++ b/imap/src/proto/mod.rs @@ -1,3 +1,5 @@ +//! Helper functions for manipulating the wire protocol. + #![allow(non_snake_case, dead_code)] // utils