This commit is contained in:
Michael Zhang 2021-03-08 17:07:44 -06:00
parent 7153b30a43
commit 7cd69bd6a8
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
15 changed files with 169 additions and 53 deletions

13
Cargo.lock generated
View file

@ -212,7 +212,7 @@ dependencies = [
"git2", "git2",
"git2-curl", "git2-curl",
"glob", "glob",
"hex 0.4.2", "hex 0.4.3",
"home", "home",
"humantime", "humantime",
"ignore", "ignore",
@ -949,9 +949,9 @@ checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.4.2" version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "home" name = "home"
@ -1425,6 +1425,7 @@ dependencies = [
"anyhow", "anyhow",
"assert_matches", "assert_matches",
"async-trait", "async-trait",
"bytes",
"derive_builder", "derive_builder",
"futures", "futures",
"log", "log",
@ -1434,6 +1435,7 @@ dependencies = [
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-stream", "tokio-stream",
"tokio-util",
"webpki-roots", "webpki-roots",
] ]
@ -1481,15 +1483,14 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]] [[package]]
name = "pest" name = "pest"
version = "2.1.3" version = "2.1.3"
source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036"
dependencies = [ dependencies = [
"log",
"ucd-trie", "ucd-trie",
] ]
[[package]] [[package]]
name = "pest_derive" name = "pest_derive"
version = "2.1.0" version = "2.1.0"
source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036"
dependencies = [ dependencies = [
"pest", "pest",
"pest_generator", "pest_generator",
@ -1498,7 +1499,6 @@ dependencies = [
[[package]] [[package]]
name = "pest_generator" name = "pest_generator"
version = "2.1.3" version = "2.1.3"
source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036"
dependencies = [ dependencies = [
"pest", "pest",
"pest_meta", "pest_meta",
@ -1510,7 +1510,6 @@ dependencies = [
[[package]] [[package]]
name = "pest_meta" name = "pest_meta"
version = "2.1.3" version = "2.1.3"
source = "git+https://github.com/iptq/pest?rev=838adc941cc182e2c2d807e245a1e4c3e8594036#838adc941cc182e2c2d807e245a1e4c3e8594036"
dependencies = [ dependencies = [
"cargo", "cargo",
"maplit", "maplit",

View file

@ -14,15 +14,19 @@ maintenance = { status = "passively-maintained" }
[dependencies] [dependencies]
anyhow = "1.0.38" anyhow = "1.0.38"
async-trait = "0.1.42" async-trait = "0.1.42"
bytes = { version = "1.0.1" }
derive_builder = "0.9.0" derive_builder = "0.9.0"
futures = "0.3.12" futures = "0.3.12"
log = "0.4.14" log = "0.4.14"
parking_lot = "0.11.1" parking_lot = "0.11.1"
pest = { git = "https://github.com/iptq/pest", rev = "838adc941cc182e2c2d807e245a1e4c3e8594036" } # pest = { path = "../../pest/pest" }
pest_derive = { git = "https://github.com/iptq/pest", rev = "838adc941cc182e2c2d807e245a1e4c3e8594036" } # pest_derive = { path = "../../pest/derive" }
pest = { git = "https://github.com/iptq/pest", rev = "6a4d3a3d10e42a3ee605ca979d0fcdac97a83a99" }
pest_derive = { git = "https://github.com/iptq/pest", rev = "6a4d3a3d10e42a3ee605ca979d0fcdac97a83a99" }
tokio = { version = "1.1.1", features = ["full"] } tokio = { version = "1.1.1", features = ["full"] }
tokio-rustls = "0.22.0" tokio-rustls = "0.22.0"
tokio-stream = "0.1.3" tokio-stream = "0.1.3"
tokio-util = { version = "0.6.3" }
webpki-roots = "0.21.0" webpki-roots = "0.21.0"
[dev-dependencies] [dev-dependencies]

View file

@ -4,10 +4,9 @@ use std::task::{Context, Poll};
use anyhow::Result; use anyhow::Result;
use futures::{ use futures::{
future::{self, Either, FutureExt, TryFutureExt}, future::{self, FutureExt, TryFutureExt},
stream::{Peekable, Stream, StreamExt}, stream::{Stream, StreamExt},
}; };
use parking_lot::Mutex;
use tokio::{ use tokio::{
io::{ io::{
self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf,
@ -21,8 +20,9 @@ use tokio::{
use tokio_rustls::{ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_util::codec::FramedRead;
use crate::codec::ImapCodec;
use crate::command::Command; use crate::command::Command;
use crate::parser::{parse_capability, parse_response}; use crate::parser::{parse_capability, parse_response};
use crate::response::{Response, ResponseDone}; use crate::response::{Response, ResponseDone};
@ -189,16 +189,20 @@ async fn listen<C>(
where where
C: AsyncRead + Unpin, C: AsyncRead + Unpin,
{ {
let mut reader = BufReader::new(conn); let codec = ImapCodec::default();
let mut framed = FramedRead::new(conn, codec);
// let mut reader = BufReader::new(conn);
let mut greeting_tx = Some(greeting_tx); let mut greeting_tx = Some(greeting_tx);
let mut curr_cmd: Option<Command2> = None; let mut curr_cmd: Option<Command2> = None;
let mut exit_rx = exit_rx.map_err(|_| ()).shared(); let mut exit_rx = exit_rx.map_err(|_| ()).shared();
// let mut exit_fut = Some(exit_rx.fuse()); // let mut exit_fut = Some(exit_rx.fuse());
// let mut fut1 = None; // let mut fut1 = None;
let mut cache = String::new();
loop { loop {
let mut next_line = String::new(); // let mut next_line = String::new();
let read_fut = reader.read_line(&mut next_line).fuse(); // let read_fut = reader.read_line(&mut next_line).fuse();
let read_fut = framed.next().fuse();
pin_mut!(read_fut); pin_mut!(read_fut);
// only listen for a new command if there isn't one already // only listen for a new command if there isn't one already
@ -222,10 +226,24 @@ where
} }
} }
len = read_fut => { resp = read_fut => {
trace!("read line {:?}", next_line); // trace!("read line {:?}", next_line);
// res should not be None here // res should not be None here
let resp = parse_response(next_line)?; // cache += &next_line;
// let resp = match parse_response(&cache) {
// Ok(v) => {
// cache.clear();
// v
// }
// Err(e) => {
// error!("parse error: {}", e);
// continue;
// }
// };
let resp = match resp {
Some(Ok(v)) => v,
a => { error!("failed: {:?}", a); bail!("fuck"); },
};
// if this is the very first response, then it's a greeting // if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() { if let Some(greeting_tx) = greeting_tx.take() {
@ -244,6 +262,6 @@ where
} }
} }
let conn = reader.into_inner(); let conn = framed.into_inner();
Ok(conn) Ok(conn)
} }

View file

@ -43,10 +43,9 @@ use tokio::net::TcpStream;
use tokio_rustls::{ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::{Command, FetchItems, SearchCriteria}; use crate::command::{Command, FetchItems, SearchCriteria};
use crate::response::{MailboxData, Response, ResponseData, ResponseDone}; use crate::response::{AttributeValue, MailboxData, Response, ResponseData, ResponseDone};
pub use self::inner::{Client, ResponseStream}; pub use self::inner::{Client, ResponseStream};
@ -200,7 +199,7 @@ impl ClientAuthenticated {
} }
/// Runs the UID FETCH command /// Runs the UID FETCH command
pub async fn uid_fetch(&mut self, uids: &[u32]) -> Result<()> { pub async fn uid_fetch(&mut self, uids: &[u32]) -> Result<Vec<(u32, Vec<AttributeValue>)>> {
let cmd = Command::UidFetch { let cmd = Command::UidFetch {
uids: uids.to_vec(), uids: uids.to_vec(),
items: FetchItems::All, items: FetchItems::All,
@ -208,11 +207,13 @@ impl ClientAuthenticated {
debug!("uid fetch: {}", cmd); debug!("uid fetch: {}", cmd);
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
let (done, data) = stream.wait().await?; let (done, data) = stream.wait().await?;
debug!("done: {:?} {:?}", done, data); Ok(data
for resp in data { .into_iter()
debug!("uid fetch: {:?}", resp); .filter_map(|resp| match resp {
} Response::Fetch(n, attrs) => Some((n, attrs)),
todo!() _ => None,
})
.collect())
} }
/// Runs the IDLE command /// Runs the IDLE command

30
imap/src/codec.rs Normal file
View file

@ -0,0 +1,30 @@
use bytes::{Buf, BytesMut};
use tokio_util::codec::Decoder;
use crate::parser::parse_streamed_response;
use crate::response::Response;
#[derive(Default)]
pub struct ImapCodec;
impl Decoder for ImapCodec {
type Item = Response;
type Error = anyhow::Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let s = std::str::from_utf8(src)?;
debug!("-------------------------");
debug!("s: {:?}", s);
match parse_streamed_response(s) {
Ok((resp, len)) => {
src.advance(len);
return Ok(Some(resp));
}
Err(e) => {
warn!("failed to parse: {:?} {}", e, e);
}
};
Ok(None)
}
}

View file

@ -20,6 +20,7 @@ extern crate log;
extern crate pest_derive; extern crate pest_derive;
pub mod client; pub mod client;
pub mod codec;
pub mod command; pub mod command;
pub mod parser; pub mod parser;
pub mod response; pub mod response;

View file

@ -1,31 +1,27 @@
use pest::{ParseResult as PestResult, ParserState}; use pest::{iterators::QueueableToken, ParseResult as PestResult, ParserState};
use super::Rule; use super::Rule;
type PSR<'a> = Box<ParserState<'a, Rule>>; type PSR<'a> = Box<ParserState<'a, Rule>>;
/// This is a hack around the literal syntax to allow us to parse characters statefully. /// This is a hack around the literal syntax to allow us to parse characters statefully.
pub(crate) fn literal_internal(state: PSR) -> PestResult<PSR> { pub(crate) fn literal_internal(mut state: PSR) -> PestResult<PSR> {
// debug!("STATE: {:?}", state);
use pest::Atomicity; use pest::Atomicity;
// yoinked from the generated code // yoinked from the generated code
#[inline] #[inline]
#[allow(non_snake_case, unused_variables)] #[allow(non_snake_case, unused_variables)]
pub fn digit(state: PSR) -> PestResult<PSR> { pub fn digit(state: PSR) -> PestResult<PSR> {
state.rule(Rule::digit, |state| {
state.atomic(Atomicity::Atomic, |state| {
state.match_range('\u{30}'..'\u{39}') state.match_range('\u{30}'..'\u{39}')
})
})
} }
#[inline] #[inline]
#[allow(non_snake_case, unused_variables)] #[allow(non_snake_case, unused_variables)]
pub fn number(state: PSR) -> PestResult<PSR> { pub fn number(state: PSR) -> PestResult<PSR> {
state.rule(Rule::number, |state| { state.rule(Rule::number, |state| {
state.atomic(Atomicity::Atomic, |state| {
state.sequence(|state| { state.sequence(|state| {
digit(state).and_then(|state| state.repeat(|state| digit(state))) debug!("number::atomic::sequence");
}) digit(state).and_then(|state| state.repeat(digit))
}) })
}) })
} }
@ -41,18 +37,49 @@ pub(crate) fn literal_internal(state: PSR) -> PestResult<PSR> {
#[inline] #[inline]
#[allow(non_snake_case, unused_variables)] #[allow(non_snake_case, unused_variables)]
pub fn crlf(state: PSR) -> PestResult<PSR> { pub fn crlf(state: PSR) -> PestResult<PSR> {
debug!(
"running rule 'crlf' {:?}",
state.queue().iter().rev().take(10).collect::<Vec<_>>()
);
state.sequence(|state| state.match_string("\r")?.match_string("\n")) state.sequence(|state| state.match_string("\r")?.match_string("\n"))
} }
let state = state.match_string("{").and_then(number)?; let state: PSR = state.match_string("{").and_then(number)?;
let num_chars = { let num_chars = {
let mut queue = state.queue().iter().rev(); let queue = state.queue();
println!("QUEUE: {:?}", queue); let (start_idx, end_pos) = queue
let end = queue.next().unwrap(); .iter()
let start = queue.next().unwrap(); .rev()
.find_map(|p| match p {
QueueableToken::End {
start_token_index: start,
rule: Rule::number,
input_pos: pos,
} => Some((*start, *pos)),
_ => None,
})
.unwrap();
let start_pos = match queue[start_idx] {
QueueableToken::Start { input_pos: pos, .. } => pos,
_ => unreachable!(),
};
debug!("start_pos: {}, end_pos: {}", start_pos, end_pos);
let inp = state.position().get_str(); let inp = state.position().get_str();
let seg = &inp[start.input_pos()..end.input_pos()]; let seg = &inp[start_pos..end_pos];
seg.parse::<usize>().unwrap() match seg.parse::<usize>() {
Ok(v) => {
debug!("got length: {}", v);
v
}
Err(e) => {
error!(
"failed to parse int from {}..{} {:?}: {}",
start_pos, end_pos, seg, e
);
return Err(state);
}
}
}; };
state state

View file

@ -25,6 +25,15 @@ pub fn parse_capability(s: impl AsRef<str>) -> ParseResult<Capability> {
Ok(build_capability(pair)) Ok(build_capability(pair))
} }
pub fn parse_streamed_response(s: impl AsRef<str>) -> ParseResult<(Response, usize)> {
let mut pairs = Rfc3501::parse(Rule::streamed_response, s.as_ref())?;
let pair = unwrap1(pairs.next().unwrap());
let span = pair.as_span();
let range = span.end() - span.start();
let response = build_response(pair);
Ok((response, range))
}
pub fn parse_response(s: impl AsRef<str>) -> ParseResult<Response> { pub fn parse_response(s: impl AsRef<str>) -> ParseResult<Response> {
let mut pairs = Rfc3501::parse(Rule::response, s.as_ref())?; let mut pairs = Rfc3501::parse(Rule::response, s.as_ref())?;
let pair = pairs.next().unwrap(); let pair = pairs.next().unwrap();
@ -386,6 +395,7 @@ fn build_nstring(pair: Pair<Rule>) -> Option<String> {
if matches!(pair.as_rule(), Rule::nil) { if matches!(pair.as_rule(), Rule::nil) {
return None; return None;
} }
Some(build_string(pair)) Some(build_string(pair))
} }

View file

@ -1,3 +1,5 @@
streamed_response = { response ~ ANY* }
// formal syntax from https://tools.ietf.org/html/rfc3501#section-9 // formal syntax from https://tools.ietf.org/html/rfc3501#section-9
addr_adl = { nstring } addr_adl = { nstring }
addr_host = { nstring } addr_host = { nstring }
@ -93,7 +95,7 @@ msg_att_static_internaldate = { ^"INTERNALDATE" ~ sp ~ date_time }
msg_att_static_rfc822_size = { ^"RFC822.SIZE" ~ sp ~ number } msg_att_static_rfc822_size = { ^"RFC822.SIZE" ~ sp ~ number }
msg_att_static_uid = { ^"UID" ~ sp ~ uniqueid } msg_att_static_uid = { ^"UID" ~ sp ~ uniqueid }
nil = { ^"NIL" } nil = { ^"NIL" }
nstring = { string | nil } nstring = { nil | string }
number = @{ digit{1,} } number = @{ digit{1,} }
nz_number = @{ digit_nz ~ digit* } nz_number = @{ digit_nz ~ digit* }
quoted = @{ dquote ~ quoted_char* ~ dquote } quoted = @{ dquote ~ quoted_char* ~ dquote }
@ -124,7 +126,7 @@ section_spec = { section_msgtext | (section_part ~ ("." ~ section_text)?) }
section_text = { section_msgtext | "MIME" } section_text = { section_msgtext | "MIME" }
status_att = { ^"MESSAGES" | ^"RECENT" | ^"UIDNEXT" | ^"UIDVALIDITY" | ^"UNSEEN" } status_att = { ^"MESSAGES" | ^"RECENT" | ^"UIDNEXT" | ^"UIDVALIDITY" | ^"UNSEEN" }
status_att_list = { status_att ~ sp ~ number ~ (sp ~ status_att ~ sp ~ number)* } status_att_list = { status_att ~ sp ~ number ~ (sp ~ status_att ~ sp ~ number)* }
string = @{ quoted | literal } string = ${ quoted | literal }
tag = @{ tag_char{1,} } tag = @{ tag_char{1,} }
tag_char = @{ !"+" ~ astring_char } tag_char = @{ !"+" ~ astring_char }
text = @{ text_char{1,} } text = @{ text_char{1,} }

View file

@ -35,6 +35,9 @@ pub enum MailCommand {
pub enum MailEvent { pub enum MailEvent {
/// Got the list of folders /// Got the list of folders
FolderList(Vec<String>), FolderList(Vec<String>),
/// Got the current list of messages
MessageList(Vec<String>),
} }
/// Main entrypoint for the mail listener. /// Main entrypoint for the mail listener.
@ -139,8 +142,13 @@ async fn imap_main(acct: MailAccountConfig, mail2ui_tx: UnboundedSender<MailEven
debug!("mailbox list: {:?}", folder_list); debug!("mailbox list: {:?}", folder_list);
let _ = mail2ui_tx.send(MailEvent::FolderList(folder_list)); let _ = mail2ui_tx.send(MailEvent::FolderList(folder_list));
let message_list = authed.uid_search().await?; let message_uids = authed.uid_search().await?;
authed.uid_fetch(&message_list).await?; let message_list = authed.uid_fetch(&message_uids).await?;
let mut messages = Vec::new();
for (_, attrs) in message_list {
messages.push(format!("{:?}", attrs));
}
let _ = mail2ui_tx.send(MailEvent::MessageList(messages));
let mut idle_stream = authed.idle().await?; let mut idle_stream = authed.idle().await?;

View file

@ -3,7 +3,7 @@ use std::thread;
use anyhow::Result; use anyhow::Result;
use fern::colors::{Color, ColoredLevelConfig}; use fern::colors::{Color, ColoredLevelConfig};
use futures::future::TryFutureExt; use futures::future::{FutureExt, TryFutureExt};
use panorama::{ use panorama::{
config::spawn_config_watcher_system, config::spawn_config_watcher_system,
mail::{self, MailEvent}, mail::{self, MailEvent},
@ -109,6 +109,7 @@ fn setup_logger(log_file: Option<impl AsRef<Path>>) -> Result<()> {
.warn(Color::Yellow) .warn(Color::Yellow)
.error(Color::Red); .error(Color::Red);
let mut logger = fern::Dispatch::new() let mut logger = fern::Dispatch::new()
.filter(|meta| meta.target() != "tokio_util::codec::framed_impl")
.format(move |out, message, record| { .format(move |out, message, record| {
out.finish(format_args!( out.finish(format_args!(
"{}[{}][{}] {}", "{}[{}][{}] {}",

View file

@ -2,12 +2,13 @@ use tui::{
buffer::Buffer, buffer::Buffer,
layout::{Constraint, Direction, Layout, Rect}, layout::{Constraint, Direction, Layout, Rect},
style::{Color, Modifier, Style}, style::{Color, Modifier, Style},
text::{Span, Spans},
widgets::*, widgets::*,
}; };
use super::FrameType; use super::FrameType;
pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String]) { pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String], messages: &[String]) {
let chunks = Layout::default() let chunks = Layout::default()
.direction(Direction::Horizontal) .direction(Direction::Horizontal)
.margin(0) .margin(0)
@ -25,5 +26,15 @@ pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String]) {
.highlight_style(Style::default().add_modifier(Modifier::ITALIC)) .highlight_style(Style::default().add_modifier(Modifier::ITALIC))
.highlight_symbol(">>"); .highlight_symbol(">>");
let rows = messages
.iter()
.map(|s| Row::new(vec![s.as_ref()]))
.collect::<Vec<_>>();
let table = Table::new(rows)
.style(Style::default().fg(Color::White))
.widths(&[Constraint::Max(5000)])
.highlight_style(Style::default().add_modifier(Modifier::BOLD));
f.render_widget(dirlist, chunks[0]); f.render_widget(dirlist, chunks[0]);
f.render_widget(table, chunks[1]);
} }

View file

@ -44,6 +44,7 @@ pub async fn run_ui(
let mut term = Terminal::new(backend)?; let mut term = Terminal::new(backend)?;
let mut folders = Vec::<String>::new(); let mut folders = Vec::<String>::new();
let mut messages = Vec::<String>::new();
loop { loop {
term.draw(|f| { term.draw(|f| {
@ -58,7 +59,7 @@ pub async fn run_ui(
let tabs = Tabs::new(titles); let tabs = Tabs::new(titles);
f.render_widget(tabs, chunks[0]); f.render_widget(tabs, chunks[0]);
render_mail_tab(f, chunks[1], &folders); render_mail_tab(f, chunks[1], &folders, &messages);
})?; })?;
let event = if event::poll(FRAME_DURATION)? { let event = if event::poll(FRAME_DURATION)? {
@ -88,6 +89,9 @@ pub async fn run_ui(
MailEvent::FolderList(new_folders) => { MailEvent::FolderList(new_folders) => {
folders = new_folders; folders = new_folders;
} }
MailEvent::MessageList(new_messages) => {
messages = new_messages;
}
} }
} }