From 534833d6163fa1fc904e97b4e51e6f54da2b6925 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Mon, 22 Feb 2021 03:06:40 -0600 Subject: [PATCH] actually use the newly parsed responses instead of the original shitty ones --- imap/src/client/inner.rs | 125 ++++++++++++++++++++++++++++----------- src/mail/mod.rs | 16 +++-- src/main.rs | 4 +- 3 files changed, 101 insertions(+), 44 deletions(-) diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index 95aef0d..ed79375 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -19,10 +19,12 @@ use tokio_rustls::{ }; use crate::command::Command; -use crate::response::Response; +use crate::response::{Capability, Response, ResponseCode}; +use crate::types::Status; use super::ClientConfig; +pub type CapsLock = Arc>>>; pub type ResultMap = Arc, Option)>>>; pub type GreetingState = Arc)>>; pub const TAG_PREFIX: &str = "panorama"; @@ -36,7 +38,7 @@ pub struct Client { results: ResultMap, /// cached set of capabilities - caps: Vec, + caps: CapsLock, /// join handle for the listener thread listener_handle: JoinHandle>>, @@ -58,8 +60,11 @@ where let results = Arc::new(RwLock::new(HashMap::new())); let (exit_tx, exit_rx) = mpsc::channel(1); let greeting = Arc::new(RwLock::new((false, None))); - let listen_fut = tokio::spawn(listen( + let caps: CapsLock = Arc::new(RwLock::new(None)); + + let listener_handle = tokio::spawn(listen( read_half, + caps.clone(), results.clone(), exit_rx, greeting.clone(), @@ -70,8 +75,8 @@ where conn: write_half, id: 0, results, - caps: Vec::new(), - listener_handle: listen_fut, + listener_handle, + caps, exit_tx, greeting, } @@ -129,7 +134,7 @@ where let resp = self.execute(Command::Starttls).await?; debug!("server response to starttls: {:?}", resp); - debug!("sending exit ()"); + debug!("sending exit for upgrade"); self.exit_tx.send(()).await?; let reader = self.listener_handle.await??; let writer = self.conn; @@ -145,6 +150,7 @@ where let tls_config = TlsConnector::from(Arc::new(tls_config)); let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap(); let stream = tls_config.connect(dnsname, conn).await?; + debug!("upgraded, stream is using TLS now"); Ok(Client::new(stream, self.config)) } @@ -156,6 +162,7 @@ impl Future for GreetingWaiter { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let (state, waker) = &mut *self.0.write(); + debug!("g {:?}", state); if waker.is_none() { *waker = Some(cx.waker().clone()); } @@ -192,6 +199,7 @@ impl<'a, C> Future for ExecWaiter<'a, C> { /// Main listen loop for the application async fn listen( conn: C, + caps: CapsLock, results: ResultMap, mut exit: mpsc::Receiver<()>, greeting: GreetingState, @@ -212,7 +220,11 @@ where match future::select(fut, fut2).await { Either::Left((res, _)) => { - res.context("read failed")?; + let bytes = res.context("read failed")?; + if bytes == 0 { + bail!("connection probably died"); + } + debug!("got a new line {:?}", next_line); let (_, resp) = match crate::parser::parse_response(next_line.as_bytes()) { Ok(v) => v, @@ -221,39 +233,80 @@ where continue; } }; - let resp = Response::from(resp); - debug!("parsed as: {:?}", resp); - let next_line = next_line.trim_end_matches('\n').trim_end_matches('\r'); - let mut parts = next_line.split(" "); - let tag = parts.next().unwrap(); - let rest = parts.collect::>().join(" "); - - if tag == "*" { - debug!("UNTAGGED {:?}", rest); - - // TODO: verify that the greeting is actually an OK - if let Some(greeting) = greeting.take() { - let (greeting, waker) = &mut *greeting.write(); - debug!("got greeting"); - *greeting = true; - if let Some(waker) = waker.take() { - waker.wake(); - } - } - } else if tag.starts_with(TAG_PREFIX) { - let id = tag.trim_start_matches(TAG_PREFIX).parse::()?; - debug!("set {} to {:?}", id, rest); - let mut results = results.write(); - if let Some((c, w)) = results.get_mut(&id) { - // *c = Some(rest.to_string()); - *c = Some(resp); - if let Some(waker) = w.take() { - waker.wake(); - } + if let Some(greeting) = greeting.take() { + let (greeting, waker) = &mut *greeting.write(); + debug!("received greeting!"); + *greeting = true; + if let Some(waker) = waker.take() { + waker.wake(); } } + + let resp = Response::from(resp); + debug!("resp: {:?}", resp); + match &resp { + Response::Capabilities(new_caps) + | Response::Data { + status: Status::Ok, + code: Some(ResponseCode::Capabilities(new_caps)), + .. + } => { + let caps = &mut *caps.write(); + *caps = Some(new_caps.clone()); + debug!("new caps: {:?}", caps); + } + + Response::Done { tag, .. } => { + let tag_str = &tag.0; + if tag_str.starts_with(TAG_PREFIX) { + let id = tag_str.trim_start_matches(TAG_PREFIX).parse::()?; + let mut results = results.write(); + if let Some((c, waker)) = results.get_mut(&id) { + *c = Some(resp); + if let Some(waker) = waker.take() { + waker.wake(); + } + } + } + } + + _ => todo!("unhandled response: {:?}", resp), + } + + // debug!("parsed as: {:?}", resp); + // let next_line = next_line.trim_end_matches('\n').trim_end_matches('\r'); + + // let mut parts = next_line.split(" "); + // let tag = parts.next().unwrap(); + // let rest = parts.collect::>().join(" "); + + // if tag == "*" { + // debug!("UNTAGGED {:?}", rest); + + // // TODO: verify that the greeting is actually an OK + // if let Some(greeting) = greeting.take() { + // let (greeting, waker) = &mut *greeting.write(); + // debug!("got greeting"); + // *greeting = true; + // if let Some(waker) = waker.take() { + // waker.wake(); + // } + // } + // } else if tag.starts_with(TAG_PREFIX) { + // let id = tag.trim_start_matches(TAG_PREFIX).parse::()?; + // debug!("set {} to {:?}", id, rest); + // let mut results = results.write(); + // if let Some((c, w)) = results.get_mut(&id) { + // // *c = Some(rest.to_string()); + // *c = Some(resp); + // if let Some(waker) = w.take() { + // waker.wake(); + // } + // } + // } } + Either::Right((_, _)) => { debug!("exiting read loop"); break; diff --git a/src/mail/mod.rs b/src/mail/mod.rs index e97c013..b589f72 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,7 +1,7 @@ //! Mail use anyhow::Result; -use futures::stream::StreamExt; +use futures::{future::FutureExt, stream::StreamExt}; use panorama_imap::{ client::{ClientBuilder, ClientConfig}, command::Command as ImapCommand, @@ -46,11 +46,15 @@ pub async fn run_mail( for acct in config.mail_accounts.into_iter() { let handle = tokio::spawn(async move { debug!("opening imap connection for {:?}", acct); - match imap_main(acct).await { - Ok(_) => {} - Err(err) => { - error!("IMAP Error: {}", err); + loop { + match imap_main(acct.clone()).await { + Ok(_) => {} + Err(err) => { + error!("IMAP Error: {}", err); + } } + + warn!("connection dropped, retrying"); } }); curr_conn.push(handle); @@ -91,7 +95,7 @@ async fn imap_main(acct: MailAccountConfig) -> Result<()> { // let result = unauth.capabilities().await?; loop { - tokio::time::sleep(std::time::Duration::from_secs(10)).await; + tokio::time::sleep(std::time::Duration::from_secs(60)).await; debug!("heartbeat"); } } diff --git a/src/main.rs b/src/main.rs index dbe48b8..f6402c1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,8 +33,8 @@ async fn main() -> Result<()> { let mut logger = fern::Dispatch::new() .format(move |out, message, record| { out.finish(format_args!( - "{}[{}][{}] {}", - chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"), + "[{}][{}] {}", + // chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"), record.target(), colors.color(record.level()), message