From 4be9e6e1e815bfd54e085e19f15462984b7a3241 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Thu, 25 Mar 2021 06:34:11 -0500 Subject: [PATCH] update read loop and parser to read body --- imap/src/client/inner.rs | 4 ++-- imap/src/client/mod.rs | 19 ++++++++-------- imap/src/command.rs | 8 ++++++- imap/src/parser/mod.rs | 29 +++++++++++++++++++----- imap/src/parser/rfc3501.pest | 5 +++-- src/mail/client.rs | 43 ++++++++++++++++++++++++++++++------ src/mail/store.rs | 7 +++--- 7 files changed, 86 insertions(+), 29 deletions(-) diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index 96c58ff..c19afee 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -221,7 +221,6 @@ where line = write_fut => { if let Some(line) = line { - trace!("got line {:?}", line); conn.write_all(line.as_bytes()).await?; conn.flush().await?; trace!("C>>>S: {:?}", line); @@ -288,6 +287,7 @@ where Some(Ok(v)) => v, a => { error!("failed: {:?}", a); bail!("fuck"); }, }; + trace!("S>>>C: {:?}", resp); // if this is the very first response, then it's a greeting if let Some(greeting_tx) = greeting_tx.take() { @@ -298,7 +298,7 @@ where // since this is the DONE message, clear curr_cmd so another one can be sent if let Some((_, _, cmd_tx)) = curr_cmd.take() { let res = cmd_tx.send(resp); - debug!("res0: {:?}", res); + // debug!("res0: {:?}", res); } } else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() { // we got a response from the server for this command, so send it over the diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index 3825e81..f01085e 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -205,7 +205,6 @@ impl ClientAuthenticated { let mut select = SelectResponse::default(); for resp in data { - debug!("execute called returned: {:?}", resp); match resp { Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags, Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists), @@ -246,10 +245,11 @@ impl ClientAuthenticated { pub async fn fetch( &mut self, uids: &[u32], + items: FetchItems, ) -> Result)>> { let cmd = Command::Fetch { uids: uids.to_vec(), - items: FetchItems::All, + items, }; debug!("fetch: {}", cmd); let stream = self.execute(cmd).await?; @@ -265,10 +265,11 @@ impl ClientAuthenticated { pub async fn uid_fetch( &mut self, uids: &[u32], + items: FetchItems, ) -> Result)>> { let cmd = Command::UidFetch { uids: uids.to_vec(), - items: FetchItems::All, + items, }; debug!("uid fetch: {}", cmd); let stream = self.execute(cmd).await?; @@ -293,12 +294,12 @@ impl ClientAuthenticated { #[derive(Debug, Default)] pub struct SelectResponse { - flags: Vec, - exists: Option, - recent: Option, - uid_next: Option, - uid_validity: Option, - unseen: Option, + pub flags: Vec, + pub exists: Option, + pub recent: Option, + pub uid_next: Option, + pub uid_validity: Option, + pub unseen: Option, } /// A token that represents an idling connection. diff --git a/imap/src/command.rs b/imap/src/command.rs index aa86ee5..335b9cc 100644 --- a/imap/src/command.rs +++ b/imap/src/command.rs @@ -109,7 +109,11 @@ pub enum FetchItems { All, Fast, Full, + BodyPeek, Items(Vec), + + /// item set that panorama uses, TODO: remove when FetchItems has a builder + PanoramaAll, } #[derive(Clone, Debug)] @@ -122,7 +126,9 @@ impl fmt::Display for FetchItems { All => write!(f, "ALL"), Fast => write!(f, "FAST"), Full => write!(f, "FULL"), - FetchAttr => write!(f, ""), + BodyPeek => write!(f, "(BODY.PEEK[])"), + PanoramaAll => write!(f, "(FLAGS INTERNALDATE RFC822.SIZE ENVELOPE BODY.PEEK[])"), + Items(attrs) => write!(f, ""), } } } diff --git a/imap/src/parser/mod.rs b/imap/src/parser/mod.rs index 96e941f..2da4540 100644 --- a/imap/src/parser/mod.rs +++ b/imap/src/parser/mod.rs @@ -22,6 +22,12 @@ struct Rfc3501; pub type ParseResult> = Result; +macro_rules! parse_fail { + ($($tt:tt)*) => { + { error!($($tt)*); panic!(); } + }; +} + pub fn parse_capability(s: impl AsRef) -> ParseResult { let mut pairs = Rfc3501::parse(Rule::capability, s.as_ref())?; let pair = pairs.next().unwrap(); @@ -29,6 +35,7 @@ pub fn parse_capability(s: impl AsRef) -> ParseResult { } pub fn parse_streamed_response(s: impl AsRef) -> ParseResult<(Response, usize)> { + // trace!("parsing streamed reponse: {:?}", s.as_ref()); let mut pairs = Rfc3501::parse(Rule::streamed_response, s.as_ref())?; let pair = unwrap1(pairs.next().unwrap()); let span = pair.as_span(); @@ -152,9 +159,7 @@ fn build_msg_att(pair: Pair) -> AttributeValue { } fn build_msg_att_static(pair: Pair) -> AttributeValue { - if !matches!(pair.as_rule(), Rule::msg_att_static) { - unreachable!("{:#?}", pair); - } + assert!(matches!(pair.as_rule(), Rule::msg_att_static)); let mut pairs = pair.into_inner(); let pair = pairs.next().unwrap(); @@ -166,16 +171,30 @@ fn build_msg_att_static(pair: Pair) -> AttributeValue { Rule::msg_att_static_rfc822_size => AttributeValue::Rfc822Size(build_number(unwrap1(pair))), Rule::msg_att_static_envelope => AttributeValue::Envelope(build_envelope(unwrap1(pair))), // TODO: do this - Rule::msg_att_static_body => AttributeValue::BodySection { + Rule::msg_att_static_body_structure => AttributeValue::BodySection { section: None, index: None, data: None, }, + Rule::msg_att_static_body_section => { + let section = None; + let index = None; + let data = None; + AttributeValue::BodySection { + section, + index, + data, + } + } Rule::msg_att_static_uid => AttributeValue::Uid(build_number(unwrap1(unwrap1(pair)))), - _ => unreachable!("{:#?}", pair), + _ => parse_fail!("{:#?}", pair), } } +fn build_section(pair: Pair) -> () { + assert!(matches!(pair.as_rule(), Rule::section)); +} + fn build_envelope(pair: Pair) -> Envelope { // TODO: do this let mut pairs = pair.into_inner(); diff --git a/imap/src/parser/rfc3501.pest b/imap/src/parser/rfc3501.pest index 7390104..0cbb46d 100644 --- a/imap/src/parser/rfc3501.pest +++ b/imap/src/parser/rfc3501.pest @@ -91,8 +91,9 @@ message_data_fetch = { ^"FETCH" ~ sp ~ msg_att } msg_att = { "(" ~ msg_att_dyn_or_stat ~ (sp ~ msg_att_dyn_or_stat)* ~ ")" } msg_att_dyn_or_stat = { msg_att_dynamic | msg_att_static } msg_att_dynamic = { ^"FLAGS" ~ sp ~ "(" ~ (flag_fetch ~ (sp ~ flag_fetch)*)? ~ ")" } -msg_att_static = { msg_att_static_envelope | msg_att_static_internaldate | (^"RFC822" ~ (^".HEADER" | ^".TEXT") ~ sp ~ nstring) | msg_att_static_rfc822_size | msg_att_static_body | (^"BODY" ~ section ~ ("<" ~ number ~ ">")? ~ sp ~ nstring) | msg_att_static_uid } -msg_att_static_body = { ^"BODY" ~ ^"STRUCTURE"? ~ sp ~ body } +msg_att_static = { msg_att_static_envelope | msg_att_static_internaldate | (^"RFC822" ~ (^".HEADER" | ^".TEXT") ~ sp ~ nstring) | msg_att_static_rfc822_size | msg_att_static_body_structure | msg_att_static_body_section | msg_att_static_uid } +msg_att_static_body_structure = { ^"BODY" ~ ^"STRUCTURE"? ~ sp ~ body } +msg_att_static_body_section = { ^"BODY" ~ section ~ ("<" ~ number ~ ">")? ~ sp ~ nstring } msg_att_static_envelope = { ^"ENVELOPE" ~ sp ~ envelope } msg_att_static_internaldate = { ^"INTERNALDATE" ~ sp ~ date_time } msg_att_static_rfc822_size = { ^"RFC822.SIZE" ~ sp ~ number } diff --git a/src/mail/client.rs b/src/mail/client.rs index dd3491f..4718935 100644 --- a/src/mail/client.rs +++ b/src/mail/client.rs @@ -9,7 +9,7 @@ use panorama_imap::{ auth::{self, Auth}, ClientBuilder, ClientConfig, }, - command::Command as ImapCommand, + command::{Command as ImapCommand, FetchItems}, response::{AttributeValue, Envelope, MailboxData, Response}, }; use tokio::{ @@ -65,16 +65,39 @@ pub async fn sync_main( debug!("authentication successful!"); + let folder_list = authed.list().await?; + debug!("mailbox list: {:?}", folder_list); + + for folder in folder_list.iter() { + debug!("folder: {}", folder); + let select = authed.select(folder).await?; + debug!("select response: {:?}", select); + + if let Some(exists) = select.exists { + if exists < 10 { + let mut fetched = authed + .uid_fetch(&(1..=exists).collect::>(), FetchItems::PanoramaAll) + .await?; + while let Some((uid, attrs)) = fetched.next().await { + debug!("- {} : {:?}", uid, attrs); + mail_store.store_email(); + } + } + } + } + + let _ = mail2ui_tx.send(MailEvent::FolderList(acct_name.clone(), folder_list)); + tokio::time::sleep(std::time::Duration::from_secs(50)).await; + + // TODO: remove this later + continue; + // let's just select INBOX for now, maybe have a config for default mailbox later? debug!("selecting the INBOX mailbox"); let select = authed.select("INBOX").await?; debug!("select result: {:?}", select); loop { - let folder_list = authed.list().await?; - debug!("mailbox list: {:?}", folder_list); - let _ = mail2ui_tx.send(MailEvent::FolderList(acct_name.clone(), folder_list)); - let message_uids = authed.uid_search().await?; let message_uids = message_uids.into_iter().take(30).collect::>(); let _ = mail2ui_tx.send(MailEvent::MessageUids( @@ -83,7 +106,10 @@ pub async fn sync_main( )); // TODO: make this happen concurrently with the main loop? - let mut message_list = authed.uid_fetch(&message_uids).await.unwrap(); + let mut message_list = authed + .uid_fetch(&message_uids, FetchItems::All) + .await + .unwrap(); while let Some((uid, attrs)) = message_list.next().await { let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs); mail2ui_tx.send(evt); @@ -123,7 +149,10 @@ pub async fn sync_main( )); // TODO: make this happen concurrently with the main loop? - let mut message_list = authed.uid_fetch(&message_uids).await.unwrap(); + let mut message_list = authed + .uid_fetch(&message_uids, FetchItems::All) + .await + .unwrap(); while let Some((uid, attrs)) = message_list.next().await { let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs); // debug!("sent {:?}", evt); diff --git a/src/mail/store.rs b/src/mail/store.rs index ec74489..d4d471c 100644 --- a/src/mail/store.rs +++ b/src/mail/store.rs @@ -11,7 +11,7 @@ use tokio::fs; static MIGRATOR: Migrator = sqlx::migrate!(); -/// SQLite email manager +/// Manages email storage on disk, for both database and caches /// /// This struct is clone-safe: cloning it will just return a reference to the same data structure #[derive(Clone)] @@ -43,7 +43,8 @@ impl MailStore { } /// Gets the list of all the UIDs in the given folder that need to be updated - pub fn get_new_uids(&self, exists: u32) { + pub fn get_new_uids(&self, exists: u32) {} - } + /// Stores the given email + pub fn store_email(&self) {} }