From 290cefc3a25ba132e63b92effe9ab54ac2a25187 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Tue, 9 Mar 2021 08:05:31 -0600 Subject: [PATCH] yay new emails are automatically sent to the UI and send notifications to notifyd --- imap/src/client/inner.rs | 110 +++++++++++++++++++++++++++++++-------- imap/src/client/mod.rs | 64 +++++++++++++++++++++-- imap/src/command.rs | 20 +++++++ src/mail/mod.rs | 42 +++++++++++++-- src/main.rs | 5 +- src/ui/mod.rs | 5 ++ 6 files changed, 215 insertions(+), 31 deletions(-) diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index 633d12f..84b2fe2 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -30,15 +30,18 @@ use crate::response::{Response, ResponseDone}; use super::ClientConfig; pub const TAG_PREFIX: &str = "ptag"; -type Command2 = (Command, mpsc::UnboundedSender); +type Command2 = (String, Command, mpsc::UnboundedSender); pub struct Client { ctr: usize, config: ClientConfig, - conn: WriteHalf, + // conn: WriteHalf, + pub(crate) write_tx: mpsc::UnboundedSender, cmd_tx: mpsc::UnboundedSender, greeting_rx: Option>, - exit_tx: oneshot::Sender<()>, + writer_exit_tx: oneshot::Sender<()>, + writer_handle: JoinHandle>>, + listener_exit_tx: oneshot::Sender<()>, listener_handle: JoinHandle>>, } @@ -47,25 +50,36 @@ where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { pub fn new(conn: C, config: ClientConfig) -> Self { - let (read_half, write_half) = io::split(conn); + let (read_half, mut write_half) = io::split(conn); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let (greeting_tx, greeting_rx) = oneshot::channel(); + + let (writer_exit_tx, exit_rx) = oneshot::channel(); + let (write_tx, mut write_rx) = mpsc::unbounded_channel::(); + let writer_handle = tokio::spawn(write(write_half, write_rx, exit_rx).map_err(|err| { + error!("Help, the writer loop died: {}", err); + err + })); + let (exit_tx, exit_rx) = oneshot::channel(); - let handle = tokio::spawn( - listen(read_half, cmd_rx, greeting_tx, exit_rx).map_err(|err| { - error!("Help, the listener loop died: {}", err); + let listener_handle = tokio::spawn( + listen(read_half, cmd_rx, write_tx.clone(), greeting_tx, exit_rx).map_err(|err| { + error!("Help, the listener loop died: {:?} {}", err, err); err }), ); Client { ctr: 0, - conn: write_half, + // conn: write_half, config, cmd_tx, + write_tx, greeting_rx: Some(greeting_rx), - exit_tx, - listener_handle: handle, + writer_exit_tx, + listener_exit_tx: exit_tx, + writer_handle, + listener_handle, } } @@ -80,14 +94,17 @@ where let id = self.ctr; self.ctr += 1; - let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd); - self.conn.write_all(cmd_str.as_bytes()).await?; - self.conn.flush().await?; + let tag = format!("{}{}", TAG_PREFIX, id); + // let cmd_str = format!("{} {}\r\n", tag, cmd); + // self.write_tx.send(cmd_str); + // self.conn.write_all(cmd_str.as_bytes()).await?; + // self.conn.flush().await?; let (tx, rx) = mpsc::unbounded_channel(); - self.cmd_tx.send((cmd, tx))?; + self.cmd_tx.send((tag, cmd, tx))?; - Ok(ResponseStream { inner: rx }) + let stream = ResponseStream { inner: rx }; + Ok(stream) } pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { @@ -121,9 +138,13 @@ where debug!("sending exit for upgrade"); // TODO: check that the channel is still open? - self.exit_tx.send(()).unwrap(); - let reader = self.listener_handle.await??; - let writer = self.conn; + self.listener_exit_tx.send(()).unwrap(); + self.writer_exit_tx.send(()).unwrap(); + let (reader, writer) = future::join(self.listener_handle, self.writer_handle).await; + let reader = reader??; + let writer = writer??; + // let reader = self.listener_handle.await??; + // let writer = self.conn; let conn = reader.unsplit(writer); let server_name = &self.config.hostname; @@ -142,7 +163,7 @@ where } pub struct ResponseStream { - inner: mpsc::UnboundedReceiver, + pub(crate) inner: mpsc::UnboundedReceiver, } impl ResponseStream { @@ -179,10 +200,44 @@ impl Stream for ResponseStream { } } +#[allow(unreachable_code)] +async fn write( + mut conn: WriteHalf, + mut write_rx: mpsc::UnboundedReceiver, + exit_rx: oneshot::Receiver<()>, +) -> Result> +where + C: AsyncWrite + Unpin, +{ + let mut exit_rx = exit_rx.map_err(|_| ()).shared(); + loop { + let write_fut = write_rx.recv().fuse(); + pin_mut!(write_fut); + + select! { + _ = exit_rx => { + break; + } + + line = write_fut => { + if let Some(line) = line { + trace!("got line {:?}", line); + conn.write_all(line.as_bytes()).await?; + conn.flush().await?; + trace!("C>>>S: {:?}", line); + } + } + } + } + + Ok(conn) +} + #[allow(unreachable_code)] async fn listen( conn: ReadHalf, mut cmd_rx: mpsc::UnboundedReceiver, + mut write_tx: mpsc::UnboundedSender, greeting_tx: oneshot::Sender<()>, exit_rx: oneshot::Receiver<()>, ) -> Result> @@ -219,6 +274,10 @@ where // read a command from the command list cmd = cmd_fut => { if curr_cmd.is_none() { + if let Some((ref tag, ref cmd, _)) = cmd { + let cmd_str = format!("{} {}\r\n", tag, cmd); + write_tx.send(cmd_str); + } curr_cmd = cmd; } } @@ -237,11 +296,16 @@ where if let Response::Done(_) = resp { // since this is the DONE message, clear curr_cmd so another one can be sent - if let Some((_, cmd_tx)) = curr_cmd.take() { - cmd_tx.send(resp)?; + if let Some((_, _, cmd_tx)) = curr_cmd.take() { + let res = cmd_tx.send(resp); + debug!("res0: {:?}", res); } - } else if let Some((ref cmd, ref mut cmd_tx)) = curr_cmd { - cmd_tx.send(resp)?; + } else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() { + // we got a response from the server for this command, so send it over the + // channel + debug!("sending {:?} to tag {}", resp, tag); + let res = cmd_tx.send(resp); + debug!("res1: {:?}", res); } } } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index 960cc4a..5717b16 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -36,14 +36,20 @@ pub mod auth; mod inner; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; use anyhow::Result; use futures::{ future::{self, FutureExt}, stream::{Stream, StreamExt}, }; -use tokio::net::TcpStream; +use tokio::{ + net::TcpStream, + sync::{mpsc, oneshot}, + task::JoinHandle, +}; use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, }; @@ -152,6 +158,13 @@ impl ClientAuthenticated { } } + fn sender(&self) -> mpsc::UnboundedSender { + match self { + ClientAuthenticated::Encrypted(e) => e.write_tx.clone(), + ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(), + } + } + /// Checks if the server that the client is talking to has support for the given capability. pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { match self { @@ -212,6 +225,25 @@ impl ClientAuthenticated { bail!("could not find the SEARCH response") } + /// Runs the FETCH command + pub async fn fetch( + &mut self, + uids: &[u32], + ) -> Result)>> { + let cmd = Command::Fetch { + uids: uids.to_vec(), + items: FetchItems::All, + }; + debug!("fetch: {}", cmd); + let stream = self.execute(cmd).await?; + // let (done, data) = stream.wait().await?; + Ok(stream.filter_map(|resp| match resp { + Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), + Response::Done(_) => future::ready(None).boxed(), + _ => future::pending().boxed(), + })) + } + /// Runs the UID FETCH command pub async fn uid_fetch( &mut self, @@ -234,13 +266,39 @@ impl ClientAuthenticated { /// Runs the IDLE command #[cfg(feature = "rfc2177-idle")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] - pub async fn idle(&mut self) -> Result { + pub async fn idle(&mut self) -> Result { let cmd = Command::Idle; let stream = self.execute(cmd).await?; - Ok(stream) + let sender = self.sender(); + Ok(IdleToken { stream, sender }) } fn nuke_capabilities(&mut self) { // TODO: do something here } } + +#[cfg(feature = "rfc2177-idle")] +#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] +pub struct IdleToken { + pub stream: ResponseStream, + sender: mpsc::UnboundedSender, +} + +#[cfg(feature = "rfc2177-idle")] +#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] +impl Drop for IdleToken { + fn drop(&mut self) { + self.sender.send(format!("DONE\r\n")); + } +} + +#[cfg(feature = "rfc2177-idle")] +#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] +impl Stream for IdleToken { + type Item = Response; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let stream = Pin::new(&mut self.stream); + Stream::poll_next(stream, cx) + } +} diff --git a/imap/src/command.rs b/imap/src/command.rs index 4c72431..619645c 100644 --- a/imap/src/command.rs +++ b/imap/src/command.rs @@ -19,6 +19,11 @@ pub enum Command { Search { criteria: SearchCriteria, }, + Fetch { + // TODO: do sequence-set + uids: Vec, + items: FetchItems, + }, UidSearch { criteria: SearchCriteria, }, @@ -31,6 +36,10 @@ pub enum Command { #[cfg(feature = "rfc2177-idle")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] Idle, + + #[cfg(feature = "rfc2177-idle")] + #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] + Done, } impl fmt::Debug for Command { @@ -54,6 +63,15 @@ impl fmt::Display for Command { Search { criteria } => write!(f, "SEARCH {}", criteria), UidSearch { criteria } => write!(f, "UID SEARCH {}", criteria), List { reference, mailbox } => write!(f, "LIST {:?} {:?}", reference, mailbox), + Fetch { uids, items } => write!( + f, + "FETCH {} {}", + uids.iter() + .map(|s| s.to_string()) + .collect::>() + .join(","), + items + ), UidFetch { uids, items } => write!( f, "UID FETCH {} {}", @@ -66,6 +84,8 @@ impl fmt::Display for Command { #[cfg(feature = "rfc2177-idle")] Idle => write!(f, "IDLE"), + #[cfg(feature = "rfc2177-idle")] + Done => write!(f, "DONE"), } } } diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 2edbf36..96b012e 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -5,13 +5,14 @@ use futures::{ future::FutureExt, stream::{Stream, StreamExt}, }; +use notify_rust::{Notification, Timeout}; use panorama_imap::{ client::{ auth::{self, Auth}, ClientBuilder, ClientConfig, }, command::Command as ImapCommand, - response::{AttributeValue, Envelope}, + response::{AttributeValue, Envelope, MailboxData, Response}, }; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender}, @@ -47,6 +48,9 @@ pub enum MailEvent { /// Update the given UID with the given attribute list UpdateUid(u32, Vec), + + /// New message came in with given UID + NewUid(u32), } /// Main entrypoint for the mail listener. @@ -169,11 +173,41 @@ async fn imap_main(acct: MailAccountConfig, mail2ui_tx: UnboundedSender v, + None => break, + }; debug!("got an event: {:?}", evt); - if false { - break; + match evt { + Response::MailboxData(MailboxData::Exists(uid)) => { + debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); + // send DONE to stop the idle + std::mem::drop(idle_stream); + + let handle = Notification::new() + .summary("New Email") + .body("holy Shit,") + .icon("firefox") + .timeout(Timeout::Milliseconds(6000)) + .show()?; + + let message_uids = authed.uid_search().await?; + let message_uids = + message_uids.into_iter().take(20).collect::>(); + let _ = mail2ui_tx.send(MailEvent::MessageUids(message_uids.clone())); + + // TODO: make this happen concurrently with the main loop? + let mut message_list = authed.uid_fetch(&message_uids).await.unwrap(); + while let Some((uid, attrs)) = message_list.next().await { + let evt = MailEvent::UpdateUid(uid, attrs); + debug!("sent {:?}", evt); + mail2ui_tx.send(evt); + } + + idle_stream = authed.idle().await?; + } + _ => {} } } } else { diff --git a/src/main.rs b/src/main.rs index 2b8cb7f..340f1ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -109,7 +109,10 @@ fn setup_logger(log_file: Option>) -> Result<()> { .warn(Color::Yellow) .error(Color::Red); let mut logger = fern::Dispatch::new() - .filter(|meta| meta.target() != "tokio_util::codec::framed_impl") + .filter(|meta| { + meta.target() != "tokio_util::codec::framed_impl" + && !meta.target().starts_with("rustls::client") + }) .format(move |out, message, record| { out.finish(format_args!( "{}[{}][{}] {}", diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 5690c17..d602771 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -104,6 +104,7 @@ pub async fn run_ui( MailEvent::MessageUids(new_uids) => { mail_tab.message_uids = new_uids; } + MailEvent::UpdateUid(_, attrs) => { let mut uid = None; let mut date = None; @@ -136,6 +137,10 @@ pub async fn run_ui( mail_tab.message_map.insert(uid, meta); } } + MailEvent::NewUid(uid) => { + debug!("new msg!"); + mail_tab.message_uids.push(uid); + } _ => {} } }