update read loop and parser to read body

This commit is contained in:
Michael Zhang 2021-03-25 06:34:11 -05:00
parent 192ea349b5
commit 4be9e6e1e8
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
7 changed files with 86 additions and 29 deletions

View file

@ -221,7 +221,6 @@ where
line = write_fut => { line = write_fut => {
if let Some(line) = line { if let Some(line) = line {
trace!("got line {:?}", line);
conn.write_all(line.as_bytes()).await?; conn.write_all(line.as_bytes()).await?;
conn.flush().await?; conn.flush().await?;
trace!("C>>>S: {:?}", line); trace!("C>>>S: {:?}", line);
@ -288,6 +287,7 @@ where
Some(Ok(v)) => v, Some(Ok(v)) => v,
a => { error!("failed: {:?}", a); bail!("fuck"); }, a => { error!("failed: {:?}", a); bail!("fuck"); },
}; };
trace!("S>>>C: {:?}", resp);
// if this is the very first response, then it's a greeting // if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() { if let Some(greeting_tx) = greeting_tx.take() {
@ -298,7 +298,7 @@ where
// since this is the DONE message, clear curr_cmd so another one can be sent // since this is the DONE message, clear curr_cmd so another one can be sent
if let Some((_, _, cmd_tx)) = curr_cmd.take() { if let Some((_, _, cmd_tx)) = curr_cmd.take() {
let res = cmd_tx.send(resp); let res = cmd_tx.send(resp);
debug!("res0: {:?}", res); // debug!("res0: {:?}", res);
} }
} else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() { } else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() {
// we got a response from the server for this command, so send it over the // we got a response from the server for this command, so send it over the

View file

@ -205,7 +205,6 @@ impl ClientAuthenticated {
let mut select = SelectResponse::default(); let mut select = SelectResponse::default();
for resp in data { for resp in data {
debug!("execute called returned: {:?}", resp);
match resp { match resp {
Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags, Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags,
Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists), Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists),
@ -246,10 +245,11 @@ impl ClientAuthenticated {
pub async fn fetch( pub async fn fetch(
&mut self, &mut self,
uids: &[u32], uids: &[u32],
items: FetchItems,
) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> { ) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> {
let cmd = Command::Fetch { let cmd = Command::Fetch {
uids: uids.to_vec(), uids: uids.to_vec(),
items: FetchItems::All, items,
}; };
debug!("fetch: {}", cmd); debug!("fetch: {}", cmd);
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
@ -265,10 +265,11 @@ impl ClientAuthenticated {
pub async fn uid_fetch( pub async fn uid_fetch(
&mut self, &mut self,
uids: &[u32], uids: &[u32],
items: FetchItems,
) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> { ) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> {
let cmd = Command::UidFetch { let cmd = Command::UidFetch {
uids: uids.to_vec(), uids: uids.to_vec(),
items: FetchItems::All, items,
}; };
debug!("uid fetch: {}", cmd); debug!("uid fetch: {}", cmd);
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
@ -293,12 +294,12 @@ impl ClientAuthenticated {
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct SelectResponse { pub struct SelectResponse {
flags: Vec<MailboxFlag>, pub flags: Vec<MailboxFlag>,
exists: Option<u32>, pub exists: Option<u32>,
recent: Option<u32>, pub recent: Option<u32>,
uid_next: Option<u32>, pub uid_next: Option<u32>,
uid_validity: Option<u32>, pub uid_validity: Option<u32>,
unseen: Option<u32>, pub unseen: Option<u32>,
} }
/// A token that represents an idling connection. /// A token that represents an idling connection.

View file

@ -109,7 +109,11 @@ pub enum FetchItems {
All, All,
Fast, Fast,
Full, Full,
BodyPeek,
Items(Vec<FetchAttr>), Items(Vec<FetchAttr>),
/// item set that panorama uses, TODO: remove when FetchItems has a builder
PanoramaAll,
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -122,7 +126,9 @@ impl fmt::Display for FetchItems {
All => write!(f, "ALL"), All => write!(f, "ALL"),
Fast => write!(f, "FAST"), Fast => write!(f, "FAST"),
Full => write!(f, "FULL"), Full => write!(f, "FULL"),
FetchAttr => write!(f, ""), BodyPeek => write!(f, "(BODY.PEEK[])"),
PanoramaAll => write!(f, "(FLAGS INTERNALDATE RFC822.SIZE ENVELOPE BODY.PEEK[])"),
Items(attrs) => write!(f, ""),
} }
} }
} }

View file

@ -22,6 +22,12 @@ struct Rfc3501;
pub type ParseResult<T, E = Error<Rule>> = Result<T, E>; pub type ParseResult<T, E = Error<Rule>> = Result<T, E>;
macro_rules! parse_fail {
($($tt:tt)*) => {
{ error!($($tt)*); panic!(); }
};
}
pub fn parse_capability(s: impl AsRef<str>) -> ParseResult<Capability> { pub fn parse_capability(s: impl AsRef<str>) -> ParseResult<Capability> {
let mut pairs = Rfc3501::parse(Rule::capability, s.as_ref())?; let mut pairs = Rfc3501::parse(Rule::capability, s.as_ref())?;
let pair = pairs.next().unwrap(); let pair = pairs.next().unwrap();
@ -29,6 +35,7 @@ pub fn parse_capability(s: impl AsRef<str>) -> ParseResult<Capability> {
} }
pub fn parse_streamed_response(s: impl AsRef<str>) -> ParseResult<(Response, usize)> { pub fn parse_streamed_response(s: impl AsRef<str>) -> ParseResult<(Response, usize)> {
// trace!("parsing streamed reponse: {:?}", s.as_ref());
let mut pairs = Rfc3501::parse(Rule::streamed_response, s.as_ref())?; let mut pairs = Rfc3501::parse(Rule::streamed_response, s.as_ref())?;
let pair = unwrap1(pairs.next().unwrap()); let pair = unwrap1(pairs.next().unwrap());
let span = pair.as_span(); let span = pair.as_span();
@ -152,9 +159,7 @@ fn build_msg_att(pair: Pair<Rule>) -> AttributeValue {
} }
fn build_msg_att_static(pair: Pair<Rule>) -> AttributeValue { fn build_msg_att_static(pair: Pair<Rule>) -> AttributeValue {
if !matches!(pair.as_rule(), Rule::msg_att_static) { assert!(matches!(pair.as_rule(), Rule::msg_att_static));
unreachable!("{:#?}", pair);
}
let mut pairs = pair.into_inner(); let mut pairs = pair.into_inner();
let pair = pairs.next().unwrap(); let pair = pairs.next().unwrap();
@ -166,16 +171,30 @@ fn build_msg_att_static(pair: Pair<Rule>) -> AttributeValue {
Rule::msg_att_static_rfc822_size => AttributeValue::Rfc822Size(build_number(unwrap1(pair))), Rule::msg_att_static_rfc822_size => AttributeValue::Rfc822Size(build_number(unwrap1(pair))),
Rule::msg_att_static_envelope => AttributeValue::Envelope(build_envelope(unwrap1(pair))), Rule::msg_att_static_envelope => AttributeValue::Envelope(build_envelope(unwrap1(pair))),
// TODO: do this // TODO: do this
Rule::msg_att_static_body => AttributeValue::BodySection { Rule::msg_att_static_body_structure => AttributeValue::BodySection {
section: None, section: None,
index: None, index: None,
data: None, data: None,
}, },
Rule::msg_att_static_body_section => {
let section = None;
let index = None;
let data = None;
AttributeValue::BodySection {
section,
index,
data,
}
}
Rule::msg_att_static_uid => AttributeValue::Uid(build_number(unwrap1(unwrap1(pair)))), Rule::msg_att_static_uid => AttributeValue::Uid(build_number(unwrap1(unwrap1(pair)))),
_ => unreachable!("{:#?}", pair), _ => parse_fail!("{:#?}", pair),
} }
} }
fn build_section(pair: Pair<Rule>) -> () {
assert!(matches!(pair.as_rule(), Rule::section));
}
fn build_envelope(pair: Pair<Rule>) -> Envelope { fn build_envelope(pair: Pair<Rule>) -> Envelope {
// TODO: do this // TODO: do this
let mut pairs = pair.into_inner(); let mut pairs = pair.into_inner();

View file

@ -91,8 +91,9 @@ message_data_fetch = { ^"FETCH" ~ sp ~ msg_att }
msg_att = { "(" ~ msg_att_dyn_or_stat ~ (sp ~ msg_att_dyn_or_stat)* ~ ")" } msg_att = { "(" ~ msg_att_dyn_or_stat ~ (sp ~ msg_att_dyn_or_stat)* ~ ")" }
msg_att_dyn_or_stat = { msg_att_dynamic | msg_att_static } msg_att_dyn_or_stat = { msg_att_dynamic | msg_att_static }
msg_att_dynamic = { ^"FLAGS" ~ sp ~ "(" ~ (flag_fetch ~ (sp ~ flag_fetch)*)? ~ ")" } msg_att_dynamic = { ^"FLAGS" ~ sp ~ "(" ~ (flag_fetch ~ (sp ~ flag_fetch)*)? ~ ")" }
msg_att_static = { msg_att_static_envelope | msg_att_static_internaldate | (^"RFC822" ~ (^".HEADER" | ^".TEXT") ~ sp ~ nstring) | msg_att_static_rfc822_size | msg_att_static_body | (^"BODY" ~ section ~ ("<" ~ number ~ ">")? ~ sp ~ nstring) | msg_att_static_uid } msg_att_static = { msg_att_static_envelope | msg_att_static_internaldate | (^"RFC822" ~ (^".HEADER" | ^".TEXT") ~ sp ~ nstring) | msg_att_static_rfc822_size | msg_att_static_body_structure | msg_att_static_body_section | msg_att_static_uid }
msg_att_static_body = { ^"BODY" ~ ^"STRUCTURE"? ~ sp ~ body } msg_att_static_body_structure = { ^"BODY" ~ ^"STRUCTURE"? ~ sp ~ body }
msg_att_static_body_section = { ^"BODY" ~ section ~ ("<" ~ number ~ ">")? ~ sp ~ nstring }
msg_att_static_envelope = { ^"ENVELOPE" ~ sp ~ envelope } msg_att_static_envelope = { ^"ENVELOPE" ~ sp ~ envelope }
msg_att_static_internaldate = { ^"INTERNALDATE" ~ sp ~ date_time } msg_att_static_internaldate = { ^"INTERNALDATE" ~ sp ~ date_time }
msg_att_static_rfc822_size = { ^"RFC822.SIZE" ~ sp ~ number } msg_att_static_rfc822_size = { ^"RFC822.SIZE" ~ sp ~ number }

View file

@ -9,7 +9,7 @@ use panorama_imap::{
auth::{self, Auth}, auth::{self, Auth},
ClientBuilder, ClientConfig, ClientBuilder, ClientConfig,
}, },
command::Command as ImapCommand, command::{Command as ImapCommand, FetchItems},
response::{AttributeValue, Envelope, MailboxData, Response}, response::{AttributeValue, Envelope, MailboxData, Response},
}; };
use tokio::{ use tokio::{
@ -65,16 +65,39 @@ pub async fn sync_main(
debug!("authentication successful!"); debug!("authentication successful!");
let folder_list = authed.list().await?;
debug!("mailbox list: {:?}", folder_list);
for folder in folder_list.iter() {
debug!("folder: {}", folder);
let select = authed.select(folder).await?;
debug!("select response: {:?}", select);
if let Some(exists) = select.exists {
if exists < 10 {
let mut fetched = authed
.uid_fetch(&(1..=exists).collect::<Vec<_>>(), FetchItems::PanoramaAll)
.await?;
while let Some((uid, attrs)) = fetched.next().await {
debug!("- {} : {:?}", uid, attrs);
mail_store.store_email();
}
}
}
}
let _ = mail2ui_tx.send(MailEvent::FolderList(acct_name.clone(), folder_list));
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// TODO: remove this later
continue;
// let's just select INBOX for now, maybe have a config for default mailbox later? // let's just select INBOX for now, maybe have a config for default mailbox later?
debug!("selecting the INBOX mailbox"); debug!("selecting the INBOX mailbox");
let select = authed.select("INBOX").await?; let select = authed.select("INBOX").await?;
debug!("select result: {:?}", select); debug!("select result: {:?}", select);
loop { loop {
let folder_list = authed.list().await?;
debug!("mailbox list: {:?}", folder_list);
let _ = mail2ui_tx.send(MailEvent::FolderList(acct_name.clone(), folder_list));
let message_uids = authed.uid_search().await?; let message_uids = authed.uid_search().await?;
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>(); let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
let _ = mail2ui_tx.send(MailEvent::MessageUids( let _ = mail2ui_tx.send(MailEvent::MessageUids(
@ -83,7 +106,10 @@ pub async fn sync_main(
)); ));
// TODO: make this happen concurrently with the main loop? // TODO: make this happen concurrently with the main loop?
let mut message_list = authed.uid_fetch(&message_uids).await.unwrap(); let mut message_list = authed
.uid_fetch(&message_uids, FetchItems::All)
.await
.unwrap();
while let Some((uid, attrs)) = message_list.next().await { while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs); let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs);
mail2ui_tx.send(evt); mail2ui_tx.send(evt);
@ -123,7 +149,10 @@ pub async fn sync_main(
)); ));
// TODO: make this happen concurrently with the main loop? // TODO: make this happen concurrently with the main loop?
let mut message_list = authed.uid_fetch(&message_uids).await.unwrap(); let mut message_list = authed
.uid_fetch(&message_uids, FetchItems::All)
.await
.unwrap();
while let Some((uid, attrs)) = message_list.next().await { while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs); let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs);
// debug!("sent {:?}", evt); // debug!("sent {:?}", evt);

View file

@ -11,7 +11,7 @@ use tokio::fs;
static MIGRATOR: Migrator = sqlx::migrate!(); static MIGRATOR: Migrator = sqlx::migrate!();
/// SQLite email manager /// Manages email storage on disk, for both database and caches
/// ///
/// This struct is clone-safe: cloning it will just return a reference to the same data structure /// This struct is clone-safe: cloning it will just return a reference to the same data structure
#[derive(Clone)] #[derive(Clone)]
@ -43,7 +43,8 @@ impl MailStore {
} }
/// Gets the list of all the UIDs in the given folder that need to be updated /// Gets the list of all the UIDs in the given folder that need to be updated
pub fn get_new_uids(&self, exists: u32) { pub fn get_new_uids(&self, exists: u32) {}
} /// Stores the given email
pub fn store_email(&self) {}
} }