From 7cd69bd6a8869f7ef3682f0cdaaa543cd5c47ddd Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Mon, 8 Mar 2021 17:07:44 -0600 Subject: [PATCH] updates --- Cargo.lock | 13 +++-- imap/Cargo.toml | 8 ++- imap/src/client/inner.rs | 40 ++++++++++---- imap/src/client/mod.rs | 17 +++--- imap/src/codec.rs | 30 +++++++++++ imap/src/{command/mod.rs => command.rs} | 0 imap/src/lib.rs | 1 + imap/src/parser/literal.rs | 63 ++++++++++++++++------- imap/src/parser/mod.rs | 10 ++++ imap/src/parser/rfc3501.pest | 6 ++- imap/src/{response/mod.rs => response.rs} | 0 src/mail/mod.rs | 12 ++++- src/main.rs | 3 +- src/ui/mail_tab.rs | 13 ++++- src/ui/mod.rs | 6 ++- 15 files changed, 169 insertions(+), 53 deletions(-) create mode 100644 imap/src/codec.rs rename imap/src/{command/mod.rs => command.rs} (100%) rename imap/src/{response/mod.rs => response.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 5de76e6..3ae7a7f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -212,7 +212,7 @@ dependencies = [ "git2", "git2-curl", "glob", - "hex 0.4.2", + "hex 0.4.3", "home", "humantime", "ignore", @@ -949,9 +949,9 @@ checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" [[package]] name = "hex" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "home" @@ -1425,6 +1425,7 @@ dependencies = [ "anyhow", "assert_matches", "async-trait", + "bytes", "derive_builder", "futures", "log", @@ -1434,6 +1435,7 @@ dependencies = [ "tokio", "tokio-rustls", "tokio-stream", + "tokio-util", "webpki-roots", ] @@ -1481,15 +1483,14 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pest" version = "2.1.3" -source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036" dependencies = [ + "log", "ucd-trie", ] [[package]] name = "pest_derive" version = "2.1.0" -source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036" dependencies = [ "pest", "pest_generator", @@ -1498,7 +1499,6 @@ dependencies = [ [[package]] name = "pest_generator" version = "2.1.3" -source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036" dependencies = [ "pest", "pest_meta", @@ -1510,7 +1510,6 @@ dependencies = [ [[package]] name = "pest_meta" version = "2.1.3" -source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036" dependencies = [ "cargo", "maplit", diff --git a/imap/Cargo.toml b/imap/Cargo.toml index 610189b..6bf5fe5 100644 --- a/imap/Cargo.toml +++ b/imap/Cargo.toml @@ -14,15 +14,19 @@ maintenance = { status = "passively-maintained" } [dependencies] anyhow = "1.0.38" async-trait = "0.1.42" +bytes = { version = "1.0.1" } derive_builder = "0.9.0" futures = "0.3.12" log = "0.4.14" parking_lot = "0.11.1" -pest = { git = "https://github.com/iptq/pest", rev = "838adc941cc182e2c2d807e245a1e4c3e8594036" } -pest_derive = { git = "https://github.com/iptq/pest", rev = "838adc941cc182e2c2d807e245a1e4c3e8594036" } +# pest = { path = "../../pest/pest" } +# pest_derive = { path = "../../pest/derive" } +pest = { git = "https://github.com/iptq/pest", rev = "6a4d3a3d10e42a3ee605ca979d0fcdac97a83a99" } +pest_derive = { git = "https://github.com/iptq/pest", rev = "6a4d3a3d10e42a3ee605ca979d0fcdac97a83a99" } tokio = { version = "1.1.1", features = ["full"] } tokio-rustls = "0.22.0" tokio-stream = "0.1.3" +tokio-util = { version = "0.6.3" } webpki-roots = "0.21.0" [dev-dependencies] diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index add69d5..430ac2a 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -4,10 +4,9 @@ use std::task::{Context, Poll}; use anyhow::Result; use futures::{ - future::{self, Either, FutureExt, TryFutureExt}, - stream::{Peekable, Stream, StreamExt}, + future::{self, FutureExt, TryFutureExt}, + stream::{Stream, StreamExt}, }; -use parking_lot::Mutex; use tokio::{ io::{ self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, @@ -21,8 +20,9 @@ use tokio::{ use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, }; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_util::codec::FramedRead; +use crate::codec::ImapCodec; use crate::command::Command; use crate::parser::{parse_capability, parse_response}; use crate::response::{Response, ResponseDone}; @@ -189,16 +189,20 @@ async fn listen( where C: AsyncRead + Unpin, { - let mut reader = BufReader::new(conn); + let codec = ImapCodec::default(); + let mut framed = FramedRead::new(conn, codec); + // let mut reader = BufReader::new(conn); let mut greeting_tx = Some(greeting_tx); let mut curr_cmd: Option = None; let mut exit_rx = exit_rx.map_err(|_| ()).shared(); // let mut exit_fut = Some(exit_rx.fuse()); // let mut fut1 = None; + let mut cache = String::new(); loop { - let mut next_line = String::new(); - let read_fut = reader.read_line(&mut next_line).fuse(); + // let mut next_line = String::new(); + // let read_fut = reader.read_line(&mut next_line).fuse(); + let read_fut = framed.next().fuse(); pin_mut!(read_fut); // only listen for a new command if there isn't one already @@ -222,10 +226,24 @@ where } } - len = read_fut => { - trace!("read line {:?}", next_line); + resp = read_fut => { + // trace!("read line {:?}", next_line); // res should not be None here - let resp = parse_response(next_line)?; + // cache += &next_line; + // let resp = match parse_response(&cache) { + // Ok(v) => { + // cache.clear(); + // v + // } + // Err(e) => { + // error!("parse error: {}", e); + // continue; + // } + // }; + let resp = match resp { + Some(Ok(v)) => v, + a => { error!("failed: {:?}", a); bail!("fuck"); }, + }; // if this is the very first response, then it's a greeting if let Some(greeting_tx) = greeting_tx.take() { @@ -244,6 +262,6 @@ where } } - let conn = reader.into_inner(); + let conn = framed.into_inner(); Ok(conn) } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index f01f279..8681b17 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -43,10 +43,9 @@ use tokio::net::TcpStream; use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, }; -use tokio_stream::wrappers::UnboundedReceiverStream; use crate::command::{Command, FetchItems, SearchCriteria}; -use crate::response::{MailboxData, Response, ResponseData, ResponseDone}; +use crate::response::{AttributeValue, MailboxData, Response, ResponseData, ResponseDone}; pub use self::inner::{Client, ResponseStream}; @@ -200,7 +199,7 @@ impl ClientAuthenticated { } /// Runs the UID FETCH command - pub async fn uid_fetch(&mut self, uids: &[u32]) -> Result<()> { + pub async fn uid_fetch(&mut self, uids: &[u32]) -> Result)>> { let cmd = Command::UidFetch { uids: uids.to_vec(), items: FetchItems::All, @@ -208,11 +207,13 @@ impl ClientAuthenticated { debug!("uid fetch: {}", cmd); let stream = self.execute(cmd).await?; let (done, data) = stream.wait().await?; - debug!("done: {:?} {:?}", done, data); - for resp in data { - debug!("uid fetch: {:?}", resp); - } - todo!() + Ok(data + .into_iter() + .filter_map(|resp| match resp { + Response::Fetch(n, attrs) => Some((n, attrs)), + _ => None, + }) + .collect()) } /// Runs the IDLE command diff --git a/imap/src/codec.rs b/imap/src/codec.rs new file mode 100644 index 0000000..911c680 --- /dev/null +++ b/imap/src/codec.rs @@ -0,0 +1,30 @@ +use bytes::{Buf, BytesMut}; +use tokio_util::codec::Decoder; + +use crate::parser::parse_streamed_response; +use crate::response::Response; + +#[derive(Default)] +pub struct ImapCodec; + +impl Decoder for ImapCodec { + type Item = Response; + type Error = anyhow::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + let s = std::str::from_utf8(src)?; + debug!("-------------------------"); + debug!("s: {:?}", s); + match parse_streamed_response(s) { + Ok((resp, len)) => { + src.advance(len); + return Ok(Some(resp)); + } + Err(e) => { + warn!("failed to parse: {:?} {}", e, e); + } + }; + + Ok(None) + } +} diff --git a/imap/src/command/mod.rs b/imap/src/command.rs similarity index 100% rename from imap/src/command/mod.rs rename to imap/src/command.rs diff --git a/imap/src/lib.rs b/imap/src/lib.rs index 5ca59c0..edb58d5 100644 --- a/imap/src/lib.rs +++ b/imap/src/lib.rs @@ -20,6 +20,7 @@ extern crate log; extern crate pest_derive; pub mod client; +pub mod codec; pub mod command; pub mod parser; pub mod response; diff --git a/imap/src/parser/literal.rs b/imap/src/parser/literal.rs index dbda4fd..4ced3ea 100644 --- a/imap/src/parser/literal.rs +++ b/imap/src/parser/literal.rs @@ -1,31 +1,27 @@ -use pest::{ParseResult as PestResult, ParserState}; +use pest::{iterators::QueueableToken, ParseResult as PestResult, ParserState}; use super::Rule; type PSR<'a> = Box>; /// This is a hack around the literal syntax to allow us to parse characters statefully. -pub(crate) fn literal_internal(state: PSR) -> PestResult { +pub(crate) fn literal_internal(mut state: PSR) -> PestResult { + // debug!("STATE: {:?}", state); use pest::Atomicity; // yoinked from the generated code #[inline] #[allow(non_snake_case, unused_variables)] pub fn digit(state: PSR) -> PestResult { - state.rule(Rule::digit, |state| { - state.atomic(Atomicity::Atomic, |state| { - state.match_range('\u{30}'..'\u{39}') - }) - }) + state.match_range('\u{30}'..'\u{39}') } #[inline] #[allow(non_snake_case, unused_variables)] pub fn number(state: PSR) -> PestResult { state.rule(Rule::number, |state| { - state.atomic(Atomicity::Atomic, |state| { - state.sequence(|state| { - digit(state).and_then(|state| state.repeat(|state| digit(state))) - }) + state.sequence(|state| { + debug!("number::atomic::sequence"); + digit(state).and_then(|state| state.repeat(digit)) }) }) } @@ -41,18 +37,49 @@ pub(crate) fn literal_internal(state: PSR) -> PestResult { #[inline] #[allow(non_snake_case, unused_variables)] pub fn crlf(state: PSR) -> PestResult { + debug!( + "running rule 'crlf' {:?}", + state.queue().iter().rev().take(10).collect::>() + ); state.sequence(|state| state.match_string("\r")?.match_string("\n")) } - let state = state.match_string("{").and_then(number)?; + let state: PSR = state.match_string("{").and_then(number)?; let num_chars = { - let mut queue = state.queue().iter().rev(); - println!("QUEUE: {:?}", queue); - let end = queue.next().unwrap(); - let start = queue.next().unwrap(); + let queue = state.queue(); + let (start_idx, end_pos) = queue + .iter() + .rev() + .find_map(|p| match p { + QueueableToken::End { + start_token_index: start, + rule: Rule::number, + input_pos: pos, + } => Some((*start, *pos)), + _ => None, + }) + .unwrap(); + let start_pos = match queue[start_idx] { + QueueableToken::Start { input_pos: pos, .. } => pos, + _ => unreachable!(), + }; + debug!("start_pos: {}, end_pos: {}", start_pos, end_pos); + let inp = state.position().get_str(); - let seg = &inp[start.input_pos()..end.input_pos()]; - seg.parse::().unwrap() + let seg = &inp[start_pos..end_pos]; + match seg.parse::() { + Ok(v) => { + debug!("got length: {}", v); + v + } + Err(e) => { + error!( + "failed to parse int from {}..{} {:?}: {}", + start_pos, end_pos, seg, e + ); + return Err(state); + } + } }; state diff --git a/imap/src/parser/mod.rs b/imap/src/parser/mod.rs index 6128022..8919dae 100644 --- a/imap/src/parser/mod.rs +++ b/imap/src/parser/mod.rs @@ -25,6 +25,15 @@ pub fn parse_capability(s: impl AsRef) -> ParseResult { Ok(build_capability(pair)) } +pub fn parse_streamed_response(s: impl AsRef) -> ParseResult<(Response, usize)> { + let mut pairs = Rfc3501::parse(Rule::streamed_response, s.as_ref())?; + let pair = unwrap1(pairs.next().unwrap()); + let span = pair.as_span(); + let range = span.end() - span.start(); + let response = build_response(pair); + Ok((response, range)) +} + pub fn parse_response(s: impl AsRef) -> ParseResult { let mut pairs = Rfc3501::parse(Rule::response, s.as_ref())?; let pair = pairs.next().unwrap(); @@ -386,6 +395,7 @@ fn build_nstring(pair: Pair) -> Option { if matches!(pair.as_rule(), Rule::nil) { return None; } + Some(build_string(pair)) } diff --git a/imap/src/parser/rfc3501.pest b/imap/src/parser/rfc3501.pest index 44b0f23..662a033 100644 --- a/imap/src/parser/rfc3501.pest +++ b/imap/src/parser/rfc3501.pest @@ -1,3 +1,5 @@ +streamed_response = { response ~ ANY* } + // formal syntax from https://tools.ietf.org/html/rfc3501#section-9 addr_adl = { nstring } addr_host = { nstring } @@ -93,7 +95,7 @@ msg_att_static_internaldate = { ^"INTERNALDATE" ~ sp ~ date_time } msg_att_static_rfc822_size = { ^"RFC822.SIZE" ~ sp ~ number } msg_att_static_uid = { ^"UID" ~ sp ~ uniqueid } nil = { ^"NIL" } -nstring = { string | nil } +nstring = { nil | string } number = @{ digit{1,} } nz_number = @{ digit_nz ~ digit* } quoted = @{ dquote ~ quoted_char* ~ dquote } @@ -124,7 +126,7 @@ section_spec = { section_msgtext | (section_part ~ ("." ~ section_text)?) } section_text = { section_msgtext | "MIME" } status_att = { ^"MESSAGES" | ^"RECENT" | ^"UIDNEXT" | ^"UIDVALIDITY" | ^"UNSEEN" } status_att_list = { status_att ~ sp ~ number ~ (sp ~ status_att ~ sp ~ number)* } -string = @{ quoted | literal } +string = ${ quoted | literal } tag = @{ tag_char{1,} } tag_char = @{ !"+" ~ astring_char } text = @{ text_char{1,} } diff --git a/imap/src/response/mod.rs b/imap/src/response.rs similarity index 100% rename from imap/src/response/mod.rs rename to imap/src/response.rs diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 3951540..4f98b67 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -35,6 +35,9 @@ pub enum MailCommand { pub enum MailEvent { /// Got the list of folders FolderList(Vec), + + /// Got the current list of messages + MessageList(Vec), } /// Main entrypoint for the mail listener. @@ -139,8 +142,13 @@ async fn imap_main(acct: MailAccountConfig, mail2ui_tx: UnboundedSender>) -> Result<()> { .warn(Color::Yellow) .error(Color::Red); let mut logger = fern::Dispatch::new() + .filter(|meta| meta.target() != "tokio_util::codec::framed_impl") .format(move |out, message, record| { out.finish(format_args!( "{}[{}][{}] {}", diff --git a/src/ui/mail_tab.rs b/src/ui/mail_tab.rs index e3e1c4f..3eaa8cb 100644 --- a/src/ui/mail_tab.rs +++ b/src/ui/mail_tab.rs @@ -2,12 +2,13 @@ use tui::{ buffer::Buffer, layout::{Constraint, Direction, Layout, Rect}, style::{Color, Modifier, Style}, + text::{Span, Spans}, widgets::*, }; use super::FrameType; -pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String]) { +pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String], messages: &[String]) { let chunks = Layout::default() .direction(Direction::Horizontal) .margin(0) @@ -25,5 +26,15 @@ pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String]) { .highlight_style(Style::default().add_modifier(Modifier::ITALIC)) .highlight_symbol(">>"); + let rows = messages + .iter() + .map(|s| Row::new(vec![s.as_ref()])) + .collect::>(); + let table = Table::new(rows) + .style(Style::default().fg(Color::White)) + .widths(&[Constraint::Max(5000)]) + .highlight_style(Style::default().add_modifier(Modifier::BOLD)); + f.render_widget(dirlist, chunks[0]); + f.render_widget(table, chunks[1]); } diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 09befa8..d928068 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -44,6 +44,7 @@ pub async fn run_ui( let mut term = Terminal::new(backend)?; let mut folders = Vec::::new(); + let mut messages = Vec::::new(); loop { term.draw(|f| { @@ -58,7 +59,7 @@ pub async fn run_ui( let tabs = Tabs::new(titles); f.render_widget(tabs, chunks[0]); - render_mail_tab(f, chunks[1], &folders); + render_mail_tab(f, chunks[1], &folders, &messages); })?; let event = if event::poll(FRAME_DURATION)? { @@ -88,6 +89,9 @@ pub async fn run_ui( MailEvent::FolderList(new_folders) => { folders = new_folders; } + MailEvent::MessageList(new_messages) => { + messages = new_messages; + } } }