diff --git a/README.md b/README.md index 6ef4ef5..f85f709 100644 --- a/README.md +++ b/README.md @@ -9,9 +9,13 @@ Goals: - Never have to actually close the application. - Handles email, calendar, and address books using open standards. -- Unified "feed" that any app can submit to. - Hot-reload on-disk config. + +Stretch goals: +- Unified "feed" that any app can submit to. - Submit notifications to gotify-shaped notification servers. +- RSS aggregator. +- IRC client?? Credits ------- diff --git a/imap/src/response/mod.rs b/imap/src/response/mod.rs index 86d6c1a..375d6e7 100644 --- a/imap/src/response/mod.rs +++ b/imap/src/response/mod.rs @@ -10,6 +10,13 @@ pub enum Response { }, } +impl FromStr for Response { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result { + todo!() + } +} + pub enum Capability { Imap4rev1, Auth(String), diff --git a/src/mail/imap.rs b/src/mail/imap.rs deleted file mode 100644 index 20ed62f..0000000 --- a/src/mail/imap.rs +++ /dev/null @@ -1,231 +0,0 @@ -use std::collections::HashMap; -use std::fmt::Display; -use std::sync::Arc; - -use anyhow::Result; -use futures::{ - future::{self, Either}, - pin_mut, - sink::{Sink, SinkExt}, - stream::{Stream, StreamExt}, -}; -use panorama_imap::{ - builders::command::Command, - parser::parse_response, - types::{Capability, RequestId, Response, ResponseCode, State, Status}, -}; -use tokio::{net::TcpStream, sync::mpsc}; -use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef, TlsConnector}; -use tokio_util::codec::{Decoder, LinesCodec, LinesCodecError}; - -use crate::config::ImapConfig; - -pub async fn open_imap_connection(config: ImapConfig) -> Result<()> { - debug!( - "Opening imap connection to {}:{}", - config.server, config.port - ); - - let server = config.server.as_str(); - let port = config.port; - - let client = TcpStream::connect((server, port)).await?; - let codec = LinesCodec::new(); - let framed = codec.framed(client); - let mut state = State::NotAuthenticated; - let (sink, stream) = framed.split::(); - - let result = listen_loop(config.clone(), &mut state, sink, stream, false).await?; - - if let LoopExit::NegotiateTls(stream, sink) = result { - debug!("negotiating tls"); - let mut tls_config = ClientConfig::new(); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - let tls_config = TlsConnector::from(Arc::new(tls_config)); - let dnsname = DNSNameRef::try_from_ascii_str(server).unwrap(); - - // reconstruct the original stream - let stream = stream.reunite(sink)?.into_inner(); - // let stream = TcpStream::connect((server, port)).await?; - let stream = tls_config.connect(dnsname, stream).await?; - - let codec = LinesCodec::new(); - let framed = codec.framed(stream); - let (sink, stream) = framed.split::(); - listen_loop(config.clone(), &mut state, sink, stream, true).await?; - } - - Ok(()) -} - -/// Action that should be taken after the connection loop quits. -enum LoopExit { - /// Used in case the STARTTLS command is issued; perform TLS negotiation on top of the current - /// stream. - /// - /// S and S2 are stream and sink types, respectively. - NegotiateTls(S, S2), - Closed, -} - -async fn listen_loop( - config: ImapConfig, - st: &mut State, - sink: S2, - mut stream: S, - with_ssl: bool, -) -> Result> -where - S: Stream> + Unpin, - S2: Sink + Unpin, - S2::Error: Display, -{ - let (tx, mut rx) = mpsc::unbounded_channel::<()>(); - let mut cmd_mgr = CommandManager::new(sink); - - if with_ssl { - let cmd = Command { - args: b"CAPABILITY".to_vec(), - next_state: Some(State::Authenticated), - }; - cmd_mgr.send(cmd, |_| {}).await?; - } - - loop { - let fut1 = stream.next(); - let fut2 = rx.recv(); - pin_mut!(fut1); - pin_mut!(fut2); - - debug!("waiting for next select"); - match future::select(fut1, fut2).await { - Either::Left((line, _)) => { - let mut line = match line { - Some(v) => v?, - None => break, - }; - line += "\r\n"; - let (_, resp) = match parse_response(line.as_bytes()) { - Ok(v) => v, - Err(e) => bail!(e.to_string()), - }; - debug!("<<< {:?}", resp); - - match st { - State::Authenticated => {} - State::NotAuthenticated => match resp { - Response::Data { - status: Status::Ok, - code: Some(ResponseCode::Capabilities(caps)), - .. - } => { - if !with_ssl { - // prepare to do TLS negotiation - let mut has_starttls = false; - for cap in caps { - if let Capability::Atom("STARTTLS") = cap { - has_starttls = true; - } - } - if has_starttls { - let cmd = Command { - args: b"STARTTLS".to_vec(), - next_state: None, - }; - let tx = tx.clone(); - cmd_mgr - .send(cmd, move |_| { - tx.send(()).unwrap(); - }) - .await?; - } - } - } - - Response::Capabilities(_caps) => { - if with_ssl { - // send authentication information - let cmd = Command { - args: format!("LOGIN {} {}", config.username, config.password) - .as_bytes() - .to_vec(), - next_state: Some(State::Authenticated), - }; - cmd_mgr.send(cmd, |_| {}).await?; - } - } - - Response::Done { tag, code, .. } => { - cmd_mgr.process_done(tag, code)?; - } - _ => {} - }, - _ => {} - } - } - Either::Right((_, _)) => { - debug!("ENCOUNTERED EXIT"); - let sink = cmd_mgr.decompose(); - return Ok(LoopExit::NegotiateTls(stream, sink)); - } - } - } - - Ok(LoopExit::Closed) -} - -type InFlightFunc = Box) + Send>; - -/// A struct in charge of managing multiple in-flight commands. -struct CommandManager { - tag_idx: usize, - in_flight: HashMap, - sink: S, -} - -impl CommandManager -where - S: Sink + Unpin, -{ - pub fn new(sink: S) -> Self { - CommandManager { - tag_idx: 0, - in_flight: HashMap::new(), - sink, - } - } - - pub fn decompose(self) -> S { - self.sink - } - - pub async fn send( - &mut self, - cmd: Command, - cb: impl Fn(Option) + Send + 'static, - ) -> Result<()> { - let tag_idx = self.tag_idx; - self.tag_idx += 1; - let cb = Box::new(cb); - let tag_str = format!("t{}", tag_idx); - let cmd_str = std::str::from_utf8(&cmd.args)?; - let full_str = format!("{} {}", tag_str, cmd_str); - self.in_flight.insert(tag_str.clone(), cb); - - debug!(">>> {:?}", full_str); - self.sink - .send(full_str) - .await - .map_err(|_| anyhow!("failed to send command")) - } - - pub fn process_done(&mut self, id: RequestId, code: Option) -> Result<()> { - let name = std::str::from_utf8(id.as_bytes())?; - if let Some(cb) = self.in_flight.remove(name) { - cb(code); - } - Ok(()) - } -} diff --git a/src/mail/imap2.rs b/src/mail/imap2.rs deleted file mode 100644 index 4b7a15b..0000000 --- a/src/mail/imap2.rs +++ /dev/null @@ -1,177 +0,0 @@ -// let's try this again - -use std::collections::HashMap; -use std::sync::Arc; - -use anyhow::Result; -use futures::{ - future::{self, BoxFuture, Future, FutureExt, TryFuture}, - sink::{Sink, SinkExt}, - stream::{Stream, StreamExt}, -}; -use panorama_imap::builders::command::Command; -use parking_lot::Mutex; -use tokio::{ - io::{AsyncRead, AsyncWrite}, - net::TcpStream, - sync::{oneshot, Notify}, -}; -use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef, TlsConnector}; -use tokio_util::codec::{Decoder, Framed, FramedRead, FramedWrite, LinesCodec, LinesCodecError}; - -use crate::config::{ImapConfig, TlsMethod}; - -pub async fn open_imap_connection(config: ImapConfig) -> Result<()> { - let server = config.server.as_str(); - let port = config.port; - - let stream = TcpStream::connect((server, port)).await?; - - debug!("hellosu"); - match config.tls { - TlsMethod::Off => begin_authentication(config, stream).await, - TlsMethod::On => { - let stream = perform_tls_negotiation(server.to_owned(), stream).await?; - begin_authentication(config, stream).await - } - TlsMethod::Starttls => { - let (stream, cmd_mgr) = CommandManager::new(stream); - let flights = cmd_mgr.flights(); - - // listen(stream, flights).await?; - // async move { - // let mut cmd_mgr = cmd_mgr; - // cmd_mgr.capabilities().await; - // } - // .await; - - todo!() - } - } -} - -/// Performs TLS negotiation, using the webpki_roots and verifying the server name -#[instrument(skip(server_name, stream))] -async fn perform_tls_negotiation( - server_name: impl AsRef, - stream: impl AsyncRead + AsyncWrite + Unpin, -) -> Result { - let server_name = server_name.as_ref(); - - let mut tls_config = ClientConfig::new(); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - let tls_config = TlsConnector::from(Arc::new(tls_config)); - let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap(); - let stream = tls_config.connect(dnsname, stream).await?; - - Ok(stream) -} - -async fn fetch_capabilities(stream: impl AsyncRead + AsyncWrite) -> Result> { - let codec = LinesCodec::new(); - let framed = codec.framed(stream); - - todo!() -} - -#[instrument(skip(config, stream))] -async fn begin_authentication( - config: ImapConfig, - stream: impl AsyncRead + AsyncWrite, -) -> Result<()> { - Ok(()) -} - -pub async fn listen( - mut stream: impl Stream> + Unpin, - in_flight: InFlight, -) -> Result<()> { - debug!("listening for messages from server"); - loop { - let line = match stream.next().await { - Some(v) => v?, - None => break, - }; - debug!("line: {:?}", line); - - let mut parts = line.split(' '); - let tag = parts.next().unwrap().parse()?; // TODO: handle empty - - { - let mut in_flight = in_flight.lock(); - if let Some(sender) = in_flight.remove(&tag) { - sender.send(()).unwrap(); - } - } - } - - Ok(()) -} - -// trait ImapStream: AsyncRead + AsyncWrite + Send + Unpin {} -// impl ImapStream for T {} - -trait ImapSink: Sink + Unpin {} -impl + Unpin> ImapSink for T {} - -type InFlightMap = HashMap>; -type InFlight = Arc>; - -struct CommandManager<'a> { - id: usize, - in_flight: Arc>>>, - sink: Box, -} - -impl<'a> CommandManager<'a> { - pub fn new( - stream: impl AsyncRead + AsyncWrite + 'a, - ) -> (impl Stream>, Self) { - let codec = LinesCodec::new(); - let framed = codec.framed(stream); - let (framed_sink, framed_stream) = framed.split(); - - let cmd_mgr = CommandManager { - id: 0, - in_flight: Arc::new(Mutex::new(HashMap::new())), - sink: Box::new(framed_sink), - }; - (framed_stream, cmd_mgr) - } - - pub fn flights(&self) -> Arc>>> { - self.in_flight.clone() - } - - pub fn decompose(self) -> impl ImapSink + Unpin + 'a { - self.sink - } - - pub async fn capabilities(&mut self) -> Result> { - self.exec(Command { - args: b"CAPABILITY".to_vec(), - next_state: None, - }) - .await?; - Ok(vec![]) - } - - pub async fn exec(&mut self, command: Command) -> Result<()> { - let id = self.id; - self.id += 1; - - let cmd_str = String::from_utf8(command.args)?; - self.sink.send(cmd_str).await?; - - let (tx, rx) = oneshot::channel(); - { - let mut in_flight = self.in_flight.lock(); - in_flight.insert(id, tx); - } - - rx.await?; - Ok(()) - } -} diff --git a/src/mail/mod.rs b/src/mail/mod.rs index 7bb1d49..7a33c09 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -1,8 +1,5 @@ //! Mail -mod imap; -mod imap2; - use anyhow::Result; use futures::stream::StreamExt; use panorama_imap::{ @@ -14,8 +11,6 @@ use tokio_stream::wrappers::WatchStream; use crate::config::{Config, ConfigWatcher, MailAccountConfig, TlsMethod}; -use self::imap2::open_imap_connection; - /// Command sent to the mail thread by something else (i.e. UI) pub enum MailCommand { /// Refresh the list @@ -30,7 +25,7 @@ pub async fn run_mail( mut config_watcher: ConfigWatcher, _cmd_in: UnboundedReceiver, ) -> Result<()> { - let mut curr_conn: Option> = None; + let mut curr_conn: Vec> = Vec::new(); // let mut config_watcher = WatchStream::new(config_watcher); loop { @@ -43,13 +38,13 @@ pub async fn run_mail( // TODO: gracefully shut down connection // just gonna drop the connection for now - if let Some(curr_conn) = curr_conn.take() { - debug!("dropping connection..."); - curr_conn.abort(); + debug!("dropping all connections..."); + for conn in curr_conn.drain(0..) { + conn.abort(); } - let handle = tokio::spawn(async { - for acct in config.mail_accounts.into_iter() { + for acct in config.mail_accounts.into_iter() { + let handle = tokio::spawn(async move { debug!("opening imap connection for {:?}", acct); match imap_main(acct).await { Ok(_) => {} @@ -57,11 +52,9 @@ pub async fn run_mail( error!("IMAP Error: {}", err); } } - // open_imap_connection(acct.imap).await.unwrap(); - } - }); - - curr_conn = Some(handle); + }); + curr_conn.push(handle); + } } Ok(()) @@ -69,31 +62,32 @@ pub async fn run_mail( /// The main sequence of steps for the IMAP thread to follow async fn imap_main(acct: MailAccountConfig) -> Result<()> { - let builder: ClientConfig = ClientBuilder::default() - .hostname(acct.imap.server.clone()) - .port(acct.imap.port) - .tls(matches!(acct.imap.tls, TlsMethod::On)) - .build() - .map_err(|err| anyhow!("err: {}", err))?; + // loop ensures that the connection is retried after it dies + loop { + let builder: ClientConfig = ClientBuilder::default() + .hostname(acct.imap.server.clone()) + .port(acct.imap.port) + .tls(matches!(acct.imap.tls, TlsMethod::On)) + .build() + .map_err(|err| anyhow!("err: {}", err))?; - debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port); - let unauth = builder.open().await?; + debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port); + let unauth = builder.open().await?; - let mut unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { - debug!("attempting to upgrade"); - let client = unauth.upgrade().await?; - debug!("upgrade successful"); - client - } else { - unauth - }; + let mut unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { + debug!("attempting to upgrade"); + let client = unauth.upgrade().await?; + debug!("upgrade successful"); + client + } else { + unauth + }; - debug!("preparing to auth"); - // check if the authentication method is supported - unauth.capabilities().await?; + debug!("preparing to auth"); + // check if the authentication method is supported + unauth.capabilities().await?; - // debug!("sending CAPABILITY"); - // let result = unauth.capabilities().await?; - - Ok(()) + // debug!("sending CAPABILITY"); + // let result = unauth.capabilities().await?; + } }