yay new emails are automatically sent to the UI and send notifications to notifyd

This commit is contained in:
Michael Zhang 2021-03-09 08:05:31 -06:00
parent 61a6a45b6c
commit 290cefc3a2
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
6 changed files with 215 additions and 31 deletions

View file

@ -30,15 +30,18 @@ use crate::response::{Response, ResponseDone};
use super::ClientConfig;
pub const TAG_PREFIX: &str = "ptag";
type Command2 = (Command, mpsc::UnboundedSender<Response>);
type Command2 = (String, Command, mpsc::UnboundedSender<Response>);
pub struct Client<C> {
ctr: usize,
config: ClientConfig,
conn: WriteHalf<C>,
// conn: WriteHalf<C>,
pub(crate) write_tx: mpsc::UnboundedSender<String>,
cmd_tx: mpsc::UnboundedSender<Command2>,
greeting_rx: Option<oneshot::Receiver<()>>,
exit_tx: oneshot::Sender<()>,
writer_exit_tx: oneshot::Sender<()>,
writer_handle: JoinHandle<Result<WriteHalf<C>>>,
listener_exit_tx: oneshot::Sender<()>,
listener_handle: JoinHandle<Result<ReadHalf<C>>>,
}
@ -47,25 +50,36 @@ where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
pub fn new(conn: C, config: ClientConfig) -> Self {
let (read_half, write_half) = io::split(conn);
let (read_half, mut write_half) = io::split(conn);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (greeting_tx, greeting_rx) = oneshot::channel();
let (writer_exit_tx, exit_rx) = oneshot::channel();
let (write_tx, mut write_rx) = mpsc::unbounded_channel::<String>();
let writer_handle = tokio::spawn(write(write_half, write_rx, exit_rx).map_err(|err| {
error!("Help, the writer loop died: {}", err);
err
}));
let (exit_tx, exit_rx) = oneshot::channel();
let handle = tokio::spawn(
listen(read_half, cmd_rx, greeting_tx, exit_rx).map_err(|err| {
error!("Help, the listener loop died: {}", err);
let listener_handle = tokio::spawn(
listen(read_half, cmd_rx, write_tx.clone(), greeting_tx, exit_rx).map_err(|err| {
error!("Help, the listener loop died: {:?} {}", err, err);
err
}),
);
Client {
ctr: 0,
conn: write_half,
// conn: write_half,
config,
cmd_tx,
write_tx,
greeting_rx: Some(greeting_rx),
exit_tx,
listener_handle: handle,
writer_exit_tx,
listener_exit_tx: exit_tx,
writer_handle,
listener_handle,
}
}
@ -80,14 +94,17 @@ where
let id = self.ctr;
self.ctr += 1;
let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd);
self.conn.write_all(cmd_str.as_bytes()).await?;
self.conn.flush().await?;
let tag = format!("{}{}", TAG_PREFIX, id);
// let cmd_str = format!("{} {}\r\n", tag, cmd);
// self.write_tx.send(cmd_str);
// self.conn.write_all(cmd_str.as_bytes()).await?;
// self.conn.flush().await?;
let (tx, rx) = mpsc::unbounded_channel();
self.cmd_tx.send((cmd, tx))?;
self.cmd_tx.send((tag, cmd, tx))?;
Ok(ResponseStream { inner: rx })
let stream = ResponseStream { inner: rx };
Ok(stream)
}
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
@ -121,9 +138,13 @@ where
debug!("sending exit for upgrade");
// TODO: check that the channel is still open?
self.exit_tx.send(()).unwrap();
let reader = self.listener_handle.await??;
let writer = self.conn;
self.listener_exit_tx.send(()).unwrap();
self.writer_exit_tx.send(()).unwrap();
let (reader, writer) = future::join(self.listener_handle, self.writer_handle).await;
let reader = reader??;
let writer = writer??;
// let reader = self.listener_handle.await??;
// let writer = self.conn;
let conn = reader.unsplit(writer);
let server_name = &self.config.hostname;
@ -142,7 +163,7 @@ where
}
pub struct ResponseStream {
inner: mpsc::UnboundedReceiver<Response>,
pub(crate) inner: mpsc::UnboundedReceiver<Response>,
}
impl ResponseStream {
@ -179,10 +200,44 @@ impl Stream for ResponseStream {
}
}
#[allow(unreachable_code)]
async fn write<C>(
mut conn: WriteHalf<C>,
mut write_rx: mpsc::UnboundedReceiver<String>,
exit_rx: oneshot::Receiver<()>,
) -> Result<WriteHalf<C>>
where
C: AsyncWrite + Unpin,
{
let mut exit_rx = exit_rx.map_err(|_| ()).shared();
loop {
let write_fut = write_rx.recv().fuse();
pin_mut!(write_fut);
select! {
_ = exit_rx => {
break;
}
line = write_fut => {
if let Some(line) = line {
trace!("got line {:?}", line);
conn.write_all(line.as_bytes()).await?;
conn.flush().await?;
trace!("C>>>S: {:?}", line);
}
}
}
}
Ok(conn)
}
#[allow(unreachable_code)]
async fn listen<C>(
conn: ReadHalf<C>,
mut cmd_rx: mpsc::UnboundedReceiver<Command2>,
mut write_tx: mpsc::UnboundedSender<String>,
greeting_tx: oneshot::Sender<()>,
exit_rx: oneshot::Receiver<()>,
) -> Result<ReadHalf<C>>
@ -219,6 +274,10 @@ where
// read a command from the command list
cmd = cmd_fut => {
if curr_cmd.is_none() {
if let Some((ref tag, ref cmd, _)) = cmd {
let cmd_str = format!("{} {}\r\n", tag, cmd);
write_tx.send(cmd_str);
}
curr_cmd = cmd;
}
}
@ -237,11 +296,16 @@ where
if let Response::Done(_) = resp {
// since this is the DONE message, clear curr_cmd so another one can be sent
if let Some((_, cmd_tx)) = curr_cmd.take() {
cmd_tx.send(resp)?;
if let Some((_, _, cmd_tx)) = curr_cmd.take() {
let res = cmd_tx.send(resp);
debug!("res0: {:?}", res);
}
} else if let Some((ref cmd, ref mut cmd_tx)) = curr_cmd {
cmd_tx.send(resp)?;
} else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() {
// we got a response from the server for this command, so send it over the
// channel
debug!("sending {:?} to tag {}", resp, tag);
let res = cmd_tx.send(resp);
debug!("res1: {:?}", res);
}
}
}

View file

@ -36,14 +36,20 @@
pub mod auth;
mod inner;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Result;
use futures::{
future::{self, FutureExt},
stream::{Stream, StreamExt},
};
use tokio::net::TcpStream;
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
task::JoinHandle,
};
use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
};
@ -152,6 +158,13 @@ impl ClientAuthenticated {
}
}
fn sender(&self) -> mpsc::UnboundedSender<String> {
match self {
ClientAuthenticated::Encrypted(e) => e.write_tx.clone(),
ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(),
}
}
/// Checks if the server that the client is talking to has support for the given capability.
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
match self {
@ -212,6 +225,25 @@ impl ClientAuthenticated {
bail!("could not find the SEARCH response")
}
/// Runs the FETCH command
pub async fn fetch(
&mut self,
uids: &[u32],
) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> {
let cmd = Command::Fetch {
uids: uids.to_vec(),
items: FetchItems::All,
};
debug!("fetch: {}", cmd);
let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(),
}))
}
/// Runs the UID FETCH command
pub async fn uid_fetch(
&mut self,
@ -234,13 +266,39 @@ impl ClientAuthenticated {
/// Runs the IDLE command
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
pub async fn idle(&mut self) -> Result<ResponseStream> {
pub async fn idle(&mut self) -> Result<IdleToken> {
let cmd = Command::Idle;
let stream = self.execute(cmd).await?;
Ok(stream)
let sender = self.sender();
Ok(IdleToken { stream, sender })
}
fn nuke_capabilities(&mut self) {
// TODO: do something here
}
}
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
pub struct IdleToken {
pub stream: ResponseStream,
sender: mpsc::UnboundedSender<String>,
}
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
impl Drop for IdleToken {
fn drop(&mut self) {
self.sender.send(format!("DONE\r\n"));
}
}
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
impl Stream for IdleToken {
type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let stream = Pin::new(&mut self.stream);
Stream::poll_next(stream, cx)
}
}

View file

@ -19,6 +19,11 @@ pub enum Command {
Search {
criteria: SearchCriteria,
},
Fetch {
// TODO: do sequence-set
uids: Vec<u32>,
items: FetchItems,
},
UidSearch {
criteria: SearchCriteria,
},
@ -31,6 +36,10 @@ pub enum Command {
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
Idle,
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
Done,
}
impl fmt::Debug for Command {
@ -54,6 +63,15 @@ impl fmt::Display for Command {
Search { criteria } => write!(f, "SEARCH {}", criteria),
UidSearch { criteria } => write!(f, "UID SEARCH {}", criteria),
List { reference, mailbox } => write!(f, "LIST {:?} {:?}", reference, mailbox),
Fetch { uids, items } => write!(
f,
"FETCH {} {}",
uids.iter()
.map(|s| s.to_string())
.collect::<Vec<_>>()
.join(","),
items
),
UidFetch { uids, items } => write!(
f,
"UID FETCH {} {}",
@ -66,6 +84,8 @@ impl fmt::Display for Command {
#[cfg(feature = "rfc2177-idle")]
Idle => write!(f, "IDLE"),
#[cfg(feature = "rfc2177-idle")]
Done => write!(f, "DONE"),
}
}
}

View file

@ -5,13 +5,14 @@ use futures::{
future::FutureExt,
stream::{Stream, StreamExt},
};
use notify_rust::{Notification, Timeout};
use panorama_imap::{
client::{
auth::{self, Auth},
ClientBuilder, ClientConfig,
},
command::Command as ImapCommand,
response::{AttributeValue, Envelope},
response::{AttributeValue, Envelope, MailboxData, Response},
};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
@ -47,6 +48,9 @@ pub enum MailEvent {
/// Update the given UID with the given attribute list
UpdateUid(u32, Vec<AttributeValue>),
/// New message came in with given UID
NewUid(u32),
}
/// Main entrypoint for the mail listener.
@ -169,11 +173,41 @@ async fn imap_main(acct: MailAccountConfig, mail2ui_tx: UnboundedSender<MailEven
let mut idle_stream = authed.idle().await?;
loop {
let evt = idle_stream.next().await;
let evt = match idle_stream.next().await {
Some(v) => v,
None => break,
};
debug!("got an event: {:?}", evt);
if false {
break;
match evt {
Response::MailboxData(MailboxData::Exists(uid)) => {
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
// send DONE to stop the idle
std::mem::drop(idle_stream);
let handle = Notification::new()
.summary("New Email")
.body("holy Shit,")
.icon("firefox")
.timeout(Timeout::Milliseconds(6000))
.show()?;
let message_uids = authed.uid_search().await?;
let message_uids =
message_uids.into_iter().take(20).collect::<Vec<_>>();
let _ = mail2ui_tx.send(MailEvent::MessageUids(message_uids.clone()));
// TODO: make this happen concurrently with the main loop?
let mut message_list = authed.uid_fetch(&message_uids).await.unwrap();
while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(uid, attrs);
debug!("sent {:?}", evt);
mail2ui_tx.send(evt);
}
idle_stream = authed.idle().await?;
}
_ => {}
}
}
} else {

View file

@ -109,7 +109,10 @@ fn setup_logger(log_file: Option<impl AsRef<Path>>) -> Result<()> {
.warn(Color::Yellow)
.error(Color::Red);
let mut logger = fern::Dispatch::new()
.filter(|meta| meta.target() != "tokio_util::codec::framed_impl")
.filter(|meta| {
meta.target() != "tokio_util::codec::framed_impl"
&& !meta.target().starts_with("rustls::client")
})
.format(move |out, message, record| {
out.finish(format_args!(
"{}[{}][{}] {}",

View file

@ -104,6 +104,7 @@ pub async fn run_ui(
MailEvent::MessageUids(new_uids) => {
mail_tab.message_uids = new_uids;
}
MailEvent::UpdateUid(_, attrs) => {
let mut uid = None;
let mut date = None;
@ -136,6 +137,10 @@ pub async fn run_ui(
mail_tab.message_map.insert(uid, meta);
}
}
MailEvent::NewUid(uid) => {
debug!("new msg!");
mail_tab.message_uids.push(uid);
}
_ => {}
}
}