From aa796e533e4546a62355d14e893cbb049d53ecb1 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Fri, 26 Feb 2021 00:03:23 -0600 Subject: [PATCH] Modify the inner client to allow streaming intermediate content instead of just returning the last element --- Cargo.lock | 1 + imap/Cargo.toml | 1 + imap/src/client/auth.rs | 2 + imap/src/client/inner.rs | 177 +++++++++++++++++++++++++++------------ imap/src/client/mod.rs | 30 +++---- imap/src/command/mod.rs | 2 +- src/config.rs | 2 +- src/mail/mod.rs | 9 +- src/ui/drawable.rs | 4 +- src/ui/mod.rs | 2 +- src/ui/tabs.rs | 4 +- 11 files changed, 149 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 219e1b9..67683ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1192,6 +1192,7 @@ dependencies = [ "pest_derive", "tokio", "tokio-rustls", + "tokio-stream", "webpki-roots", ] diff --git a/imap/Cargo.toml b/imap/Cargo.toml index 69558ee..1324371 100644 --- a/imap/Cargo.toml +++ b/imap/Cargo.toml @@ -25,6 +25,7 @@ pest = "2.1.3" pest_derive = "2.1.0" tokio = { version = "1.1.1", features = ["full"] } tokio-rustls = "0.22.0" +tokio-stream = "0.1.3" webpki-roots = "0.21.0" [dev-dependencies] diff --git a/imap/src/client/auth.rs b/imap/src/client/auth.rs index eb3d519..a89d812 100644 --- a/imap/src/client/auth.rs +++ b/imap/src/client/auth.rs @@ -31,7 +31,9 @@ impl Auth for Plain { username: self.username, password: self.password, }; + let (result, _) = client.execute(command).await?; + let result = result.await?; if !matches!( result, diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index e423844..946751b 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -4,39 +4,60 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll, Waker}; -use anyhow::{Context as AnyhowContext, Result}; -use futures::future::{self, Either, Future, FutureExt}; +use anyhow::{Context as AnyhowContext, Error, Result}; +use futures::{ + future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt}, + stream::{BoxStream, Stream, StreamExt, TryStream}, +}; +use genawaiter::{ + sync::{gen, Gen}, + yield_, +}; use parking_lot::{RwLock, RwLockWriteGuard}; use tokio::{ io::{ self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, }, - sync::mpsc, + sync::{mpsc, oneshot}, task::JoinHandle, }; use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, }; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::command::Command; use crate::parser::{parse_capability, parse_response}; use crate::response::{Capability, Response, ResponseCode, Status}; -// use crate::types::{Capability as Capability_, Status}; use super::ClientConfig; pub type CapsLock = Arc>>>; -pub type ResultMap = Arc, Vec, Option)>>>; +pub type ResponseFuture = Box> + Send + Unpin>; +pub type ResponseSender = mpsc::UnboundedSender; +pub type ResponseStream = mpsc::UnboundedReceiver; +type ResultQueue = Arc>>; pub type GreetingState = Arc, Option)>>; pub const TAG_PREFIX: &str = "panorama"; +struct HandlerResult { + id: usize, + end: Option>, + sender: ResponseSender, + waker: Option, +} + /// The lower-level Client struct, that is shared by all of the exported structs in the state machine. pub struct Client { config: ClientConfig, + + /// write half of the connection conn: WriteHalf, + /// counter for monotonically incrementing unique ids id: usize, - results: ResultMap, + + results: ResultQueue, /// cached set of capabilities caps: CapsLock, @@ -90,13 +111,26 @@ where } /// Sends a command to the server and returns a handle to retrieve the result - pub async fn execute(&mut self, cmd: Command) -> Result<(Response, Vec)> { - debug!("executing command {:?}", cmd); + pub async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> { + // debug!("executing command {:?}", cmd); let id = self.id; self.id += 1; + + // create a channel for sending the final response + let (end_tx, end_rx) = oneshot::channel(); + + // create a channel for sending responses for this particular client call + // this should queue up responses + let (tx, rx) = mpsc::unbounded_channel(); + { let mut handlers = self.results.write(); - handlers.push_back((id, None, vec![], None)); + handlers.push_back(HandlerResult { + id, + end: Some(end_tx), + sender: tx, + waker: None, + }); } let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd); @@ -104,13 +138,24 @@ where self.conn.write_all(cmd_str.as_bytes()).await?; self.conn.flush().await?; // debug!("[{}] written.", id); - - let resp = ExecWaiter(self, id, false).await; + // let resp = ExecWaiter(self, id, false).await; // let resp = { // let mut handlers = self.results.write(); // handlers.remove(&id).unwrap().0.unwrap() // }; - Ok(resp) + + // let resp = end_rx.await?; + + let q = self.results.clone(); + // let end = Box::new(end_rx.map_err(|err| Error::from).map(move |resp| resp)); + let end = Box::new(end_rx.map_err(Error::from).map(move | resp | { + // pop the first entry from the list + let mut results = q.write(); + results.pop_front(); + resp + })); + + Ok((end, rx)) } /// Executes the CAPABILITY command @@ -123,16 +168,18 @@ where } let cmd = Command::Capability; - debug!("sending: {:?} {:?}", cmd, cmd.to_string()); + // debug!("sending: {:?} {:?}", cmd, cmd.to_string()); let (result, intermediate) = self .execute(cmd) .await .context("error executing CAPABILITY command")?; + let result = result.await?; debug!("cap resp: {:?}", result); - if let Some(Response::Capabilities(new_caps)) = intermediate - .iter() - .find(|resp| matches!(resp, Response::Capabilities(_))) + if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate) + .filter(|resp| future::ready(matches!(resp, Response::Capabilities(_)))) + .next() + .await { let mut caps = self.caps.write(); *caps = Some(new_caps.iter().cloned().collect()); @@ -149,7 +196,8 @@ where } // first, send the STARTTLS command - let resp = self.execute(Command::Starttls).await?; + let (resp, _) = self.execute(Command::Starttls).await?; + let resp = resp.await?; debug!("server response to starttls: {:?}", resp); debug!("sending exit for upgrade"); @@ -208,44 +256,55 @@ impl Future for GreetingWaiter { } } -pub struct ExecWaiter<'a, C>(&'a Client, usize, bool); - -impl<'a, C> Future for ExecWaiter<'a, C> { - type Output = (Response, Vec); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { - // add the waker - let mut results = self.0.results.write(); - if !self.2 { - if let Some((_, _, _, waker_ref)) = - results.iter_mut().find(|(id, _, _, _)| *id == self.1) - { - let waker = cx.waker().clone(); - *waker_ref = Some(waker); - self.2 = true; - } - } - - // if this struct exists then there's definitely at least one entry - let (id, last_response, _, _) = &results[0]; - if *id != self.1 || last_response.is_none() { - return Poll::Pending; - } - - let (_, last_response, intermediate_responses, _) = results.pop_front().unwrap(); - mem::drop(results); - - Poll::Ready(( - last_response.expect("already checked"), - intermediate_responses, - )) - } -} +// pub struct ExecWaiter<'a, C>(&'a Client, usize, bool); +// +// impl<'a, C> Future for ExecWaiter<'a, C> { +// type Output = (Response, ResponseStream); +// fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { +// // add the waker +// let mut results = self.0.results.write(); +// if !self.2 { +// if let Some(HandlerResult { +// waker: waker_ref, .. +// }) = results +// .iter_mut() +// .find(|HandlerResult { id, .. }| *id == self.1) +// { +// let waker = cx.waker().clone(); +// *waker_ref = Some(waker); +// self.2 = true; +// } +// } +// +// // if this struct exists then there's definitely at least one entry +// let HandlerResult { +// id, +// end: last_response, +// .. +// } = &results[0]; +// if *id != self.1 || last_response.is_none() { +// return Poll::Pending; +// } +// +// let HandlerResult { +// end: last_response, +// stream: intermediate_responses, +// .. +// } = results.pop_front().unwrap(); +// mem::drop(results); +// +// Poll::Ready(( +// last_response.expect("already checked"), +// intermediate_responses, +// )) +// } +// } /// Main listen loop for the application async fn listen( conn: C, caps: CapsLock, - results: ResultMap, + results: ResultQueue, mut exit: mpsc::Receiver<()>, greeting: GreetingState, ) -> Result @@ -302,9 +361,9 @@ where status: Status::Ok, .. } => { let mut results = results.write(); - if let Some((_, _, intermediate, _)) = results.iter_mut().next() { - debug!("pushed to intermediate: {:?}", resp); - intermediate.push(resp); + if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() { + debug!("pushed to intermediate for id {}: {:?}", id, resp); + sender.send(resp)?; } } @@ -320,8 +379,16 @@ where if tag.starts_with(TAG_PREFIX) { // let id = tag.trim_start_matches(TAG_PREFIX).parse::()?; let mut results = results.write(); - if let Some((_, opt, _, waker)) = results.iter_mut().next() { - *opt = Some(resp); + if let Some(HandlerResult { + end: ref mut opt, + waker, + .. + }) = results.iter_mut().next() + { + if let Some(opt) = opt.take() { + opt.send(resp).unwrap(); + } + // *opt = Some(resp); if let Some(waker) = waker.take() { waker.wake(); } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index 6932b86..4ecf9e3 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -39,17 +39,16 @@ mod inner; use std::sync::Arc; use anyhow::Result; -use genawaiter::{sync::gen, yield_}; use tokio::net::TcpStream; -use futures::stream::Stream; use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, }; +use tokio_stream::wrappers::UnboundedReceiverStream; use crate::command::Command; use crate::response::Response; -pub use self::inner::Client; +pub use self::inner::{Client, ResponseFuture, ResponseStream}; /// Struct used to start building the config for a client. /// @@ -118,7 +117,7 @@ impl ClientUnauthenticated { } /// Exposing low-level execute - async fn execute(&mut self, cmd: Command) -> Result<(Response, Vec)> { + async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> { match self { ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await, ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await, @@ -141,7 +140,7 @@ pub enum ClientAuthenticated { impl ClientAuthenticated { /// Exposing low-level execute - async fn execute(&mut self, cmd: Command) -> Result<(Response, Vec)> { + async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> { match self { ClientAuthenticated::Encrypted(e) => e.execute(cmd).await, ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await, @@ -154,7 +153,8 @@ impl ClientAuthenticated { reference: "".to_owned(), mailbox: "*".to_owned(), }; - let resp = self.execute(cmd).await?; + let (resp, stream) = self.execute(cmd).await?; + let resp = resp.await?; debug!("list response: {:?}", resp); Ok(()) } @@ -164,23 +164,17 @@ impl ClientAuthenticated { let cmd = Command::Select { mailbox: mailbox.as_ref().to_owned(), }; - let resp = self.execute(cmd).await?; + let (resp, stream) = self.execute(cmd).await?; + let resp = resp.await?; debug!("select response: {:?}", resp); Ok(()) } /// Runs the SELECT command #[cfg(feature = "rfc2177-idle")] - pub fn idle(&mut self) -> impl Stream { - gen!({ - loop { - tokio::time::sleep(std::time::Duration::from_secs(10)).await; - yield_!(()); - } - }) - // let cmd = Command::Idle; - // let resp = self.execute(cmd).await?; - // debug!("idle response: {:?}", resp); - // Ok(()) + pub async fn idle(&mut self) -> Result> { + let cmd = Command::Idle; + let (_, stream) = self.execute(cmd).await?; + Ok(UnboundedReceiverStream::new(stream)) } } diff --git a/imap/src/command/mod.rs b/imap/src/command/mod.rs index cc99bcc..c9c7b4b 100644 --- a/imap/src/command/mod.rs +++ b/imap/src/command/mod.rs @@ -1,7 +1,7 @@ use std::fmt; /// Commands, without the tag part. -#[derive(Clone, Debug)] +#[derive(Clone)] pub enum Command { Capability, Starttls, diff --git a/src/config.rs b/src/config.rs index 2900059..c24a8ce 100644 --- a/src/config.rs +++ b/src/config.rs @@ -130,7 +130,7 @@ async fn start_inotify_stream( debug!("reading config from {:?}", path_c); let config = read_config(path_c).await.context("read")?; - debug!("sending config {:?}", config); + // debug!("sending config {:?}", config); config_tx.send(config)?; } } diff --git a/src/mail/mod.rs b/src/mail/mod.rs index e5d5f41..e3db850 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,7 +1,10 @@ //! Mail use anyhow::Result; -use futures::{future::FutureExt, stream::StreamExt}; +use futures::{ + future::FutureExt, + stream::{Stream, StreamExt}, +}; use panorama_imap::{ client::{ auth::{self, Auth}, @@ -49,7 +52,7 @@ pub async fn run_mail( for acct in config.mail_accounts.into_iter() { let handle = tokio::spawn(async move { - debug!("opening imap connection for {:?}", acct); + // debug!("opening imap connection for {:?}", acct); loop { match imap_main(acct.clone()).await { Ok(_) => {} @@ -110,7 +113,7 @@ async fn imap_main(acct: MailAccountConfig) -> Result<()> { debug!("listing all emails..."); let folder_tree = authed.list().await?; - let mut idle_stream = authed.idle(); + let mut idle_stream = authed.idle().await?; loop { idle_stream.next().await; diff --git a/src/ui/drawable.rs b/src/ui/drawable.rs index d8c41f1..94f3bf1 100644 --- a/src/ui/drawable.rs +++ b/src/ui/drawable.rs @@ -1,3 +1 @@ -pub trait Drawable { - -} +pub trait Drawable {} diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 248dc4f..87531a9 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -1,8 +1,8 @@ //! UI mod drawable; -mod tabs; mod table; +mod tabs; use std::fmt::Debug; use std::io::Write; diff --git a/src/ui/tabs.rs b/src/ui/tabs.rs index 0cf9414..aaf1bee 100644 --- a/src/ui/tabs.rs +++ b/src/ui/tabs.rs @@ -1,3 +1 @@ -pub struct Tabs { - -} +pub struct Tabs {}