From 417c008190d446ec79abaa64725dac82626ad445 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Sun, 21 Feb 2021 07:42:40 -0600 Subject: [PATCH] fuck this is definitely wrong --- Cargo.lock | 16 +++ Cargo.toml | 2 - imap/Cargo.toml | 1 + imap/src/client/inner.rs | 188 +++++++++++++++++++++++++++++------- imap/src/client/mod.rs | 37 ++++--- imap/src/command/mod.rs | 2 + imap/src/lib.rs | 4 + imap/src/response/mod.rs | 28 +++++- imap/src/response/parser.rs | 0 src/mail/mod.rs | 35 +++++-- 10 files changed, 256 insertions(+), 57 deletions(-) create mode 100644 imap/src/response/parser.rs diff --git a/Cargo.lock b/Cargo.lock index 42bb40e..675b0ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -730,6 +730,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "owning_ref" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "panorama" version = "0.0.1" @@ -770,6 +779,7 @@ dependencies = [ "derive_builder", "futures", "nom 6.1.2", + "owning_ref", "panorama-strings", "parking_lot", "tokio", @@ -1190,6 +1200,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "strsim" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index e57011e..90d077b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,6 @@ license = "GPL-3.0-or-later" members = ["imap", "strings"] [dependencies] -# log = "0.4.14" -# fern = "0.6.0" anyhow = "1.0.38" async-trait = "0.1.42" cfg-if = "1.0.0" diff --git a/imap/Cargo.toml b/imap/Cargo.toml index d78bea1..105f2fc 100644 --- a/imap/Cargo.toml +++ b/imap/Cargo.toml @@ -22,6 +22,7 @@ webpki-roots = "0.21.0" panorama-strings = { path = "../strings", version = "0" } parking_lot = "0.11.1" tracing = "0.1.23" +owning_ref = "0.4.1" [dev-dependencies] assert_matches = "1.3" diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index a44b3fa..345ad8b 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -1,36 +1,52 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; -use std::task::{Context, Poll}; +use std::task::{Context, Poll, Waker}; -use anyhow::Result; -use futures::future::{Future, FutureExt}; +use anyhow::{Context as AnyhowContext, Result}; +use futures::future::{self, Either, Future, FutureExt}; use panorama_strings::{StringEntry, StringStore}; use parking_lot::{Mutex, RwLock}; use tokio::{ io::{ self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, - WriteHalf, + ReadHalf, WriteHalf, }, + sync::{mpsc, oneshot}, task::JoinHandle, }; +use tokio_rustls::{client::TlsStream, rustls::ClientConfig, webpki::DNSNameRef, TlsConnector}; use crate::command::Command; -use crate::response::Response; +use crate::types::Response; + +use super::ClientNotConnected; pub type BoxedFunc = Box; +pub type ResultMap = Arc, Option)>>>; +pub type GreetingRx = Arc)>>; pub const TAG_PREFIX: &str = "panorama"; -/// The private Client struct, that is shared by all of the exported structs in the state machine. +/// The lower-level Client struct, that is shared by all of the exported structs in the state machine. pub struct Client { + config: ClientNotConnected, conn: WriteHalf, symbols: StringStore, id: usize, results: ResultMap, + /// cached set of capabilities caps: Vec, - handle: JoinHandle>, + + /// join handle for the listener thread + listener_handle: JoinHandle>>, + + /// used for telling the listener thread to stop and return the read half + exit_tx: mpsc::Sender<()>, + + /// used for receiving the greeting + greeting: GreetingRx, } impl Client @@ -38,23 +54,38 @@ where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { /// Creates a new client that wraps a connection - pub fn new(conn: C) -> Self { + pub fn new(conn: C, config: ClientNotConnected) -> Self { let (read_half, write_half) = io::split(conn); let results = Arc::new(RwLock::new(HashMap::new())); - let listen_fut = tokio::spawn(listen(read_half, results.clone())); + let (exit_tx, exit_rx) = mpsc::channel(1); + let greeting = Arc::new(RwLock::new((false, None))); + let listen_fut = tokio::spawn(listen( + read_half, + results.clone(), + exit_rx, + greeting.clone(), + )); Client { + config, conn: write_half, symbols: StringStore::new(256), id: 0, results, caps: Vec::new(), - handle: listen_fut, + listener_handle: listen_fut, + exit_tx, + greeting, } } + pub fn wait_for_greeting(&self) -> GreetingHandler { + debug!("waiting for greeting"); + GreetingHandler(self.greeting.clone()) + } + /// Sends a command to the server and returns a handle to retrieve the result - pub async fn execute(&mut self, cmd: Command) -> Result { + pub async fn execute(&mut self, cmd: Command) -> Result { debug!("executing command {:?}", cmd); let id = self.id; self.id += 1; @@ -73,17 +104,70 @@ where let mut handlers = self.results.write(); handlers.remove(&id).unwrap().0.unwrap() }; - Ok(Response(resp)) + Ok(resp) } /// Executes the CAPABILITY command - pub async fn supports(&mut self) -> Result<()> { + pub async fn capabilities(&mut self) -> Result<()> { let cmd = Command::Capability; debug!("sending: {:?} {:?}", cmd, cmd.to_string()); - let result = self.execute(cmd).await?; - debug!("result from supports: {:?}", result); + let result = self + .execute(cmd) + .await + .context("error executing CAPABILITY command")?; + let (_, resp) = Response::from_bytes(result.as_bytes()) + .map_err(|err| anyhow!("")) + .context("error parsing response from CAPABILITY")?; + debug!("cap resp: {:?}", resp); + if let Response::Capabilities(caps) = resp { + debug!("capabilities: {:?}", caps); + } Ok(()) } + + /// Attempts to upgrade this connection using STARTTLS + pub async fn upgrade(mut self) -> Result>> { + // TODO: make sure STARTTLS is in the capability list + // first, send the STARTTLS command + let resp = self.execute(Command::Starttls).await?; + debug!("server response to starttls: {:?}", resp); + + debug!("sending exit ()"); + self.exit_tx.send(()).await?; + let reader = self.listener_handle.await??; + let writer = self.conn; + + let conn = reader.unsplit(writer); + + let server_name = &self.config.hostname; + + let mut tls_config = ClientConfig::new(); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let tls_config = TlsConnector::from(Arc::new(tls_config)); + let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap(); + let stream = tls_config.connect(dnsname, conn).await?; + + Ok(Client::new(stream, self.config)) + } +} + +pub struct GreetingHandler(GreetingRx); + +impl Future for GreetingHandler { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let (state, waker) = &mut *self.0.write(); + if waker.is_none() { + *waker = Some(cx.waker().clone()); + } + + match state { + true => Poll::Ready(()), + false => Poll::Pending, + } + } } pub struct ExecHandle<'a, C>(&'a Client, usize); @@ -108,33 +192,67 @@ impl<'a, C> Future for ExecHandle<'a, C> { } } -use std::task::Waker; -pub type ResultMap = Arc, Option)>>>; - -async fn listen(conn: impl AsyncRead + Unpin, results: ResultMap) -> Result<()> { +/// Main listen loop for the application +async fn listen( + conn: C, + results: ResultMap, + mut exit: mpsc::Receiver<()>, + greeting: GreetingRx, +) -> Result +where + C: AsyncRead + Unpin, +{ debug!("amogus"); let mut reader = BufReader::new(conn); + let mut greeting = Some(greeting); + loop { let mut next_line = String::new(); - reader.read_line(&mut next_line).await?; - let next_line = next_line.trim_end_matches('\r'); + let fut = reader.read_line(&mut next_line).fuse(); + pin_mut!(fut); + let fut2 = exit.recv().fuse(); + pin_mut!(fut2); - // debug!("line: {:?}", next_line); - let mut parts = next_line.split(" "); - let tag = parts.next().unwrap(); - let rest = parts.collect::>().join(" "); + match future::select(fut, fut2).await { + Either::Left((_, _)) => { + let next_line = next_line.trim_end_matches('\n').trim_end_matches('\r'); - if tag == "*" { - debug!("UNTAGGED {:?}", rest); - } else if tag.starts_with(TAG_PREFIX) { - let id = tag.trim_start_matches(TAG_PREFIX).parse::()?; - debug!("set {} to {:?}", id, rest); - let mut results = results.write(); - if let Some((c, w)) = results.get_mut(&id) { - *c = Some(rest.to_string()); - let w = w.take().unwrap(); - w.wake(); + let mut parts = next_line.split(" "); + let tag = parts.next().unwrap(); + let rest = parts.collect::>().join(" "); + + if tag == "*" { + debug!("UNTAGGED {:?}", rest); + if let Some(greeting) = greeting.take() { + let (greeting, waker) = &mut *greeting.write(); + debug!("got greeting"); + *greeting = true; + if let Some(waker) = waker.take() { + waker.wake(); + } + } + } else if tag.starts_with(TAG_PREFIX) { + let id = tag.trim_start_matches(TAG_PREFIX).parse::()?; + debug!("set {} to {:?}", id, rest); + let mut results = results.write(); + if let Some((c, w)) = results.get_mut(&id) { + // *c = Some(rest.to_string()); + *c = Some(next_line.to_owned()); + if let Some(waker) = w.take() { + waker.wake(); + } + } + } + } + Either::Right((_, _)) => { + debug!("exiting read loop"); + break; } } + + reader.read_line(&mut next_line).await?; } + + let conn = reader.into_inner(); + Ok(conn) } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index dfced4d..c5aaac1 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -21,7 +21,7 @@ use tokio::{ }; use tokio_rustls::{client::TlsStream, rustls::ClientConfig, webpki::DNSNameRef, TlsConnector}; -use self::inner::Client; +pub use self::inner::Client; /// Struct used to start building the config for a client. /// @@ -48,7 +48,7 @@ pub struct ClientNotConnected { } impl ClientNotConnected { - pub async fn connect(self) -> Result { + pub async fn open(self) -> Result { let hostname = self.hostname.as_ref(); let port = self.port; let conn = TcpStream::connect((hostname, port)).await?; @@ -62,16 +62,18 @@ impl ClientNotConnected { let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap(); let conn = tls_config.connect(dnsname, conn).await?; - let inner = Client::new(conn); + let inner = Client::new(conn, self); + inner.wait_for_greeting().await; return Ok(ClientUnauthenticated::Encrypted( ClientUnauthenticatedEncrypted { inner }, )); + } else { + let inner = Client::new(conn, self); + inner.wait_for_greeting().await; + return Ok(ClientUnauthenticated::Unencrypted( + ClientUnauthenticatedUnencrypted { inner }, + )); } - - let inner = Client::new(conn); - return Ok(ClientUnauthenticated::Unencrypted( - ClientUnauthenticatedUnencrypted { inner }, - )); } } @@ -81,10 +83,23 @@ pub enum ClientUnauthenticated { } impl ClientUnauthenticated { - pub async fn supports(&mut self) -> Result<()> { + pub async fn upgrade(self) -> Result { match self { - ClientUnauthenticated::Encrypted(e) => e.inner.supports().await?, - ClientUnauthenticated::Unencrypted(e) => e.inner.supports().await?, + // this is a no-op, we don't need to upgrade + ClientUnauthenticated::Encrypted(_) => Ok(self), + ClientUnauthenticated::Unencrypted(e) => { + let client = ClientUnauthenticatedEncrypted { + inner: e.inner.upgrade().await?, + }; + Ok(ClientUnauthenticated::Encrypted(client)) + } + } + } + + pub async fn capabilities(&mut self) -> Result<()> { + match self { + ClientUnauthenticated::Encrypted(e) => e.inner.capabilities().await?, + ClientUnauthenticated::Unencrypted(e) => e.inner.capabilities().await?, } Ok(()) } diff --git a/imap/src/command/mod.rs b/imap/src/command/mod.rs index 1018026..5bbcb54 100644 --- a/imap/src/command/mod.rs +++ b/imap/src/command/mod.rs @@ -4,12 +4,14 @@ use std::fmt; #[derive(Clone, Debug)] pub enum Command { Capability, + Starttls, } impl fmt::Display for Command { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Command::Capability => write!(f, "CAPABILITY"), + Command::Starttls => write!(f, "STARTTLS"), } } } diff --git a/imap/src/lib.rs b/imap/src/lib.rs index e8b873e..cd844cf 100644 --- a/imap/src/lib.rs +++ b/imap/src/lib.rs @@ -1,6 +1,10 @@ #[macro_use] +extern crate anyhow; +#[macro_use] extern crate derive_builder; #[macro_use] +extern crate futures; +#[macro_use] extern crate tracing; pub mod builders; diff --git a/imap/src/response/mod.rs b/imap/src/response/mod.rs index 2a6bc74..86d6c1a 100644 --- a/imap/src/response/mod.rs +++ b/imap/src/response/mod.rs @@ -1,2 +1,26 @@ -#[derive(Clone, Debug)] -pub struct Response(pub String); +use std::str::FromStr; + +pub enum Response { + Capabilities(Vec), + Done { + tag: RequestId, + status: Status, + code: Option, + information: Option, + }, +} + +pub enum Capability { + Imap4rev1, + Auth(String), + Atom(String), +} + +pub struct RequestId(pub String); + +pub enum Status { + Ok, + No, +} + +pub enum ResponseCode {} diff --git a/imap/src/response/parser.rs b/imap/src/response/parser.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/mail/mod.rs b/src/mail/mod.rs index d523429..259f347 100644 --- a/src/mail/mod.rs +++ b/src/mail/mod.rs @@ -5,7 +5,10 @@ mod imap2; use anyhow::Result; use futures::stream::StreamExt; -use panorama_imap::{client::ClientBuilder, command::Command as ImapCommand}; +use panorama_imap::{ + client::{ClientBuilder, ClientNotConnected}, + command::Command as ImapCommand, +}; use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle}; use tokio_stream::wrappers::WatchStream; @@ -48,7 +51,12 @@ pub async fn run_mail( let handle = tokio::spawn(async { for acct in config.mail_accounts.into_iter() { debug!("opening imap connection for {:?}", acct); - osu(acct).await.unwrap(); + match imap_main(acct).await { + Ok(_) => {} + Err(err) => { + error!("IMAP Error: {}", err); + } + } // open_imap_connection(acct.imap).await.unwrap(); } }); @@ -59,8 +67,9 @@ pub async fn run_mail( Ok(()) } -async fn osu(acct: MailAccountConfig) -> Result<()> { - let builder = ClientBuilder::default() +/// The main sequence of steps for the IMAP thread to follow +async fn imap_main(acct: MailAccountConfig) -> Result<()> { + let builder: ClientNotConnected = ClientBuilder::default() .hostname(acct.imap.server.clone()) .port(acct.imap.port) .tls(matches!(acct.imap.tls, TlsMethod::On)) @@ -68,10 +77,22 @@ async fn osu(acct: MailAccountConfig) -> Result<()> { .map_err(|err| anyhow!("err: {}", err))?; debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port); - let mut unauth = builder.connect().await?; + let unauth = builder.open().await?; - debug!("sending CAPABILITY"); - unauth.supports().await?; + let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { + debug!("attempting to upgrade"); + let client = unauth.upgrade().await?; + debug!("upgrade successful"); + client + } else { + unauth + }; + + debug!("preparing to auth"); + // check if the authentication method is supported + + // debug!("sending CAPABILITY"); + // let result = unauth.capabilities().await?; Ok(()) }