diff --git a/imap/src/client/auth.rs b/imap/src/client/auth.rs index a89d812..7390902 100644 --- a/imap/src/client/auth.rs +++ b/imap/src/client/auth.rs @@ -1,7 +1,7 @@ use anyhow::Result; use crate::command::Command; -use crate::response::{Response, Status}; +use crate::response::{Response, ResponseDone, Status}; use super::{ClientAuthenticated, ClientUnauthenticated}; @@ -37,10 +37,10 @@ impl Auth for Plain { if !matches!( result, - Response::Done { + Response::Done(ResponseDone { status: Status::Ok, .. - } + }) ) { bail!("unable to login: {:?}", result); } diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index 4555214..3d44831 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -23,7 +23,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use crate::command::Command; use crate::parser::{parse_capability, parse_response}; -use crate::response::{Capability, Response, ResponseCode, Status}; +use crate::response::{Capability, Response, ResponseCode, ResponseData, ResponseDone, Status}; use super::ClientConfig; @@ -35,6 +35,7 @@ type ResultQueue = Arc>>; pub type GreetingState = Arc, Option)>>; pub const TAG_PREFIX: &str = "ptag"; +#[derive(Debug)] struct HandlerResult { id: usize, end: Option>, @@ -137,17 +138,8 @@ where debug!("EX[{}]: send the command to the server", id); let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd); - // debug!("[{}] writing to socket: {:?}", id, cmd_str); self.conn.write_all(cmd_str.as_bytes()).await?; self.conn.flush().await?; - // debug!("[{}] written.", id); - // let resp = ExecWaiter(self, id, false).await; - // let resp = { - // let mut handlers = self.results.write(); - // handlers.remove(&id).unwrap().0.unwrap() - // }; - - // let resp = end_rx.await?; debug!("EX[{}]: hellosu", id); let q = self.results.clone(); @@ -178,14 +170,14 @@ where .execute(cmd) .await .context("error executing CAPABILITY command")?; - let result = result.await?; - debug!("cap resp: {:?}", result); + let _ = result.await?; if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate) .filter(|resp| future::ready(matches!(resp, Response::Capabilities(_)))) .next() .await { + debug!("FOUND NEW CAPABILITIES: {:?}", new_caps); let mut caps = self.caps.write(); *caps = Some(new_caps.iter().cloned().collect()); } @@ -292,6 +284,7 @@ where debug!("[LISTEN] got a new line {:?}", next_line); let resp = parse_response(next_line)?; + debug!("[LISTEN] parsed as {:?}", resp); // if this is the very first message, treat it as a greeting if let Some(greeting) = greeting.take() { @@ -306,11 +299,11 @@ where // update capabilities list // TODO: probably not really necessary here (done somewhere else)? if let Response::Capabilities(new_caps) - | Response::Data { + | Response::Data(ResponseData { status: Status::Ok, code: Some(ResponseCode::Capabilities(new_caps)), .. - } = &resp + }) = &resp { let caps = &mut *caps.write(); *caps = Some(new_caps.iter().cloned().collect()); @@ -318,43 +311,24 @@ where } match &resp { - Response::Data { - status: Status::Ok, .. - } => { - let mut results = results.write(); - if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() { - // we don't really care if it fails to send - // this just means that the other side has dropped the channel - // - // which is fine since that just means they don't care about - // intermediate messages - let _ = sender.send(resp); - debug!("[LISTEN] pushed to intermediate for id {}", id); - } - std::mem::drop(results); - debug!("[LISTEN] << unlocked self.results"); - } - // bye - Response::Data { + Response::Data(ResponseData { status: Status::Bye, .. - } => { + }) => { bail!("disconnected"); } - Response::Done { tag, .. } => { + Response::Done(ResponseDone { tag, .. }) => { if tag.starts_with(TAG_PREFIX) { // let id = tag.trim_start_matches(TAG_PREFIX).parse::()?; + debug!("[LISTEN] Done: {:?}", tag); let mut results = results.write(); - if let Some(HandlerResult { - end: ref mut opt, - waker, - .. - }) = results.iter_mut().next() + if let Some(HandlerResult { end, waker, .. }) = + results.iter_mut().next() { - if let Some(opt) = opt.take() { - opt.send(resp).unwrap(); + if let Some(end) = end.take() { + end.send(resp).unwrap(); } // *opt = Some(resp); if let Some(waker) = waker.take() { @@ -364,7 +338,20 @@ where } } - _ => {} + _ => { + debug!("[LISTEN] RESPONSE: {:?}", resp); + let mut results = results.write(); + if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() { + // we don't really care if it fails to send + // this just means that the other side has dropped the channel + // + // which is fine since that just means they don't care about + // intermediate messages + let _ = sender.send(resp); + debug!("[LISTEN] pushed to intermediate for id {}", id); + debug!("[LISTEN] res: {:?}", results); + } + } // _ => {} } } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index d353d52..0998c54 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -39,6 +39,10 @@ mod inner; use std::sync::Arc; use anyhow::Result; +use futures::{ + future::{self, Either, FutureExt}, + stream::StreamExt, +}; use tokio::net::TcpStream; use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, @@ -46,7 +50,7 @@ use tokio_rustls::{ use tokio_stream::wrappers::UnboundedReceiverStream; use crate::command::Command; -use crate::response::Response; +use crate::response::{Response, ResponseData, ResponseDone}; pub use self::inner::{Client, ResponseFuture, ResponseStream}; @@ -56,7 +60,7 @@ pub use self::inner::{Client, ResponseFuture, ResponseStream}; /// the connection to the server. /// /// [1]: self::ClientConfigBuilder::build -/// [2]: self::ClientConfig::open +/// [2]: self::ClientConfig::connect pub type ClientBuilder = ClientConfigBuilder; /// An IMAP client that hasn't been connected yet. @@ -133,6 +137,12 @@ impl ClientUnauthenticated { } } +#[derive(Debug)] +pub struct ResponseCombined { + pub data: Vec, + pub done: ResponseDone, +} + pub enum ClientAuthenticated { Encrypted(Client>), Unencrypted(Client), @@ -147,13 +157,47 @@ impl ClientAuthenticated { } } + /// A wrapper around `execute` that waits for the response and returns a combined data + /// structure containing the intermediate results as well as the final status + async fn execute_combined(&mut self, cmd: Command) -> Result { + let (resp, mut stream) = self.execute(cmd).await?; + let mut resp = resp.into_stream(); // turn into stream to avoid mess with returning futures from select + + let mut data = Vec::new(); + debug!("[COMBI] loop"); + let done = loop { + let fut1 = resp.next().fuse(); + let fut2 = stream.recv().fuse(); + pin_mut!(fut1); + pin_mut!(fut2); + + match future::select(fut1, fut2).await { + Either::Left((Some(Ok(Response::Done(done))), _)) => { + debug!("[COMBI] left: {:?}", done); + break done; + } + Either::Left(_) => unreachable!("got non-Response::Done from listen!"), + + Either::Right((Some(resp), _)) => { + debug!("[COMBI] right: {:?}", resp); + data.push(resp); + } + Either::Right(_) => unreachable!(), + } + }; + + Ok(ResponseCombined { data, done }) + } + /// Runs the LIST command pub async fn list(&mut self) -> Result> { let cmd = Command::List { reference: "".to_owned(), mailbox: "*".to_owned(), }; - let (mut resp, mut st) = self.execute(cmd).await?; + + let res = self.execute_combined(cmd).await?; + debug!("res: {:?}", res); todo!() // let mut folders = Vec::new(); @@ -187,14 +231,22 @@ impl ClientAuthenticated { debug!("ST: {:?}", st.recv().await); let resp = resp.await?; debug!("select response: {:?}", resp); + + // nuke the capabilities cache + self.nuke_capabilities(); + Ok(()) } - /// Runs the SELECT command + /// Runs the IDLE command #[cfg(feature = "rfc2177-idle")] pub async fn idle(&mut self) -> Result> { let cmd = Command::Idle; let (_, stream) = self.execute(cmd).await?; Ok(UnboundedReceiverStream::new(stream)) } + + fn nuke_capabilities(&mut self) { + // TODO: do something here + } } diff --git a/imap/src/parser/mod.rs b/imap/src/parser/mod.rs index 682b5f6..b661547 100644 --- a/imap/src/parser/mod.rs +++ b/imap/src/parser/mod.rs @@ -48,12 +48,12 @@ fn build_response(pair: Pair) -> Response { let pair = pairs.next().unwrap(); let (status, code, information) = build_resp_cond_state(pair); - Response::Done { + Response::Done(ResponseDone { tag, status, code, information, - } + }) } _ => unreachable!("{:#?}", pair), } @@ -64,11 +64,11 @@ fn build_response(pair: Pair) -> Response { match pair.as_rule() { Rule::resp_cond_state => { let (status, code, information) = build_resp_cond_state(pair); - Response::Data { + Response::Data(ResponseData { status, code, information, - } + }) } Rule::mailbox_data => Response::MailboxData(build_mailbox_data(pair)), Rule::capability_data => Response::Capabilities(build_capabilities(pair)), diff --git a/imap/src/response/mod.rs b/imap/src/response/mod.rs index c54127f..0de1eac 100644 --- a/imap/src/response/mod.rs +++ b/imap/src/response/mod.rs @@ -9,17 +9,8 @@ pub enum Response { code: Option, information: Option, }, - Done { - tag: String, - status: Status, - code: Option, - information: Option, - }, - Data { - status: Status, - code: Option, - information: Option, - }, + Done(ResponseDone), + Data(ResponseData), Expunge(u32), Vanished { earlier: bool, @@ -29,6 +20,21 @@ pub enum Response { MailboxData(MailboxData), } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ResponseData { + pub status: Status, + pub code: Option, + pub information: Option, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ResponseDone { + pub tag: String, + pub status: Status, + pub code: Option, + pub information: Option, +} + #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub enum Capability { Imap4rev1, diff --git a/src/mail/mod.rs b/src/mail/mod.rs index a79548f..33ebf6b 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -136,7 +136,7 @@ async fn imap_main(acct: MailAccountConfig, mail2ui_tx: UnboundedSender