From b75df0b8d6bd11494ab9df209042749054ee26a3 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Mon, 9 Aug 2021 00:36:10 -0500 Subject: [PATCH] update --- Cargo.lock | 52 +++ Cargo.toml | 3 + Justfile | 4 +- imap/Cargo.toml | 11 +- imap/bin/.gitignore | 1 + imap/bin/greenmail_test.rs | 14 - imap/src/client/auth.rs | 9 +- imap/src/client/client.rs | 33 +- imap/src/client/configurable_cert_verifier.rs | 104 ++++++ imap/src/client/inner.rs | 30 +- imap/src/client/mod.rs | 7 +- imap/src/codec.rs | 60 ++-- imap/src/inner.rs | 312 ------------------ imap/src/proto/bytes.rs | 4 + imap/src/proto/command.rs | 26 +- 15 files changed, 266 insertions(+), 404 deletions(-) create mode 100644 imap/bin/.gitignore delete mode 100644 imap/bin/greenmail_test.rs create mode 100644 imap/src/client/configurable_cert_verifier.rs delete mode 100644 imap/src/inner.rs diff --git a/Cargo.lock b/Cargo.lock index 535e7fc..ec00bf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -25,6 +25,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -438,6 +449,7 @@ dependencies = [ "futures", "log", "nom", + "stderrlog", "tokio", "tokio-rustls", "tokio-util", @@ -609,6 +621,19 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stderrlog" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45a53e2eff3e94a019afa6265e8ee04cb05b9d33fe9f5078b14e4e391d155a38" +dependencies = [ + "atty", + "chrono", + "log", + "termcolor", + "thread_local", +] + [[package]] name = "strsim" version = "0.10.0" @@ -632,6 +657,24 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + [[package]] name = "time" version = "0.1.44" @@ -822,6 +865,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 58e8519..f3fcbb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,3 +3,6 @@ members = [ # "daemon", "imap", ] + +[profile.release] +lto = true \ No newline at end of file diff --git a/Justfile b/Justfile index 1d5672d..0563576 100644 --- a/Justfile +++ b/Justfile @@ -1,5 +1,5 @@ fmt: cargo +nightly fmt --all -greenmail-test: - cargo run -p imap --bin greenmail-test +mzhang-test: + cargo run -p panorama-imap --bin mzhang-test --features=stderrlog diff --git a/imap/Cargo.toml b/imap/Cargo.toml index be5abc7..85c185e 100644 --- a/imap/Cargo.toml +++ b/imap/Cargo.toml @@ -13,16 +13,14 @@ readme = "README.md" workspace = ".." [[bin]] -name = "greenmail-test" -path = "bin/greenmail_test.rs" +name = "mzhang-test" +path = "bin/mzhang_test.rs" +required-features = ["stderrlog"] [features] default = ["rfc2177-idle"] rfc2177-idle = [] -[profile.release] -lto = true - [dependencies] anyhow = "1.0.42" async-trait = "0.1.51" @@ -34,6 +32,7 @@ futures = "0.3.16" log = "0.4.14" nom = "6.2.1" tokio = { version = "1.9.0", features = ["full"] } -tokio-rustls = "0.22.0" +tokio-rustls = { version = "0.22.0", features = ["dangerous_configuration"] } tokio-util = { version = "0.6.7", features = ["codec"] } webpki-roots = "0.21.1" +stderrlog = { version = "0.5.1", optional = true } diff --git a/imap/bin/.gitignore b/imap/bin/.gitignore new file mode 100644 index 0000000..62f96c7 --- /dev/null +++ b/imap/bin/.gitignore @@ -0,0 +1 @@ +/mzhang_test.rs \ No newline at end of file diff --git a/imap/bin/greenmail_test.rs b/imap/bin/greenmail_test.rs deleted file mode 100644 index bbfffd1..0000000 --- a/imap/bin/greenmail_test.rs +++ /dev/null @@ -1,14 +0,0 @@ -use anyhow::Result; -// use panorama_imap::client::ConfigBuilder; - -#[tokio::main] -async fn main() -> Result<()> { - // let _client = ConfigBuilder::default() - // .hostname(String::from("localhost")) - // .port(3993) - // .tls(true) - // .connect() - // .await?; - - Ok(()) -} diff --git a/imap/src/client/auth.rs b/imap/src/client/auth.rs index 775da45..904f933 100644 --- a/imap/src/client/auth.rs +++ b/imap/src/client/auth.rs @@ -1,7 +1,10 @@ use tokio::io::{AsyncRead, AsyncWrite}; use crate::client::inner::Inner; -use crate::proto::command::{Command, CommandLogin}; +use crate::proto::{ + bytes::Bytes, + command::{Command, CommandLogin}, +}; pub trait Client: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static {} @@ -24,8 +27,8 @@ impl AuthMethod for Login { C: Client, { let command = Command::Login(CommandLogin { - username: &self.username, - password: &self.password, + username: Bytes::from(self.username.clone()), + password: Bytes::from(self.password.clone()), }); let _result = inner.execute(command).await; diff --git a/imap/src/client/client.rs b/imap/src/client/client.rs index 9881408..9d77b6b 100644 --- a/imap/src/client/client.rs +++ b/imap/src/client/client.rs @@ -1,5 +1,4 @@ use std::pin::Pin; -use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::Result; @@ -8,11 +7,10 @@ use futures::{ stream::{Stream, StreamExt}, }; use tokio::{net::TcpStream, sync::mpsc}; -use tokio_rustls::{ - client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, -}; +use tokio_rustls::client::TlsStream; use crate::proto::{ + bytes::Bytes, command::{ Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems, SearchCriteria, @@ -25,6 +23,7 @@ use crate::proto::{ use super::inner::Inner; use super::response_stream::ResponseStream; +use super::upgrade::upgrade; /// An IMAP client that hasn't been connected yet. #[derive(Builder, Clone, Debug)] @@ -39,11 +38,16 @@ pub struct Config { /// Whether or not the client is using an encrypted stream. /// /// To upgrade the connection later, use the upgrade method. + #[builder(default = "true")] pub tls: bool, + + /// Whether or not to verify hostname + #[builder(default = "true")] + pub verify_hostname: bool, } impl ConfigBuilder { - pub async fn open(self) -> Result { + pub async fn open(&self) -> Result { let config = self.build()?; let hostname = config.hostname.as_ref(); @@ -51,14 +55,7 @@ impl ConfigBuilder { let conn = TcpStream::connect((hostname, port)).await?; if config.tls { - let mut tls_config = RustlsConfig::new(); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - let tls_config = TlsConnector::from(Arc::new(tls_config)); - let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap(); - let conn = tls_config.connect(dnsname, conn).await?; - + let conn = upgrade(conn, hostname).await?; let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; return Ok(ClientUnauthenticated::Encrypted(inner)); @@ -86,7 +83,7 @@ impl ClientUnauthenticated { } } - client_expose!(async execute<'a>(cmd: Command<'a>) -> Result); + client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); } @@ -96,7 +93,7 @@ pub enum ClientAuthenticated { } impl ClientAuthenticated { - client_expose!(async execute<'a>(cmd: Command<'a>) -> Result); + client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); fn sender(&self) -> mpsc::UnboundedSender { @@ -109,8 +106,8 @@ impl ClientAuthenticated { /// Runs the LIST command pub async fn list(&mut self) -> Result> { let cmd = Command::List(CommandList { - reference: "", - mailbox: "*", + reference: Bytes::from(""), + mailbox: Bytes::from("*"), }); let res = self.execute(cmd).await?; @@ -129,7 +126,7 @@ impl ClientAuthenticated { /// Runs the SELECT command pub async fn select(&mut self, mailbox: impl AsRef) -> Result { let cmd = Command::Select(CommandSelect { - mailbox: mailbox.as_ref(), + mailbox: Bytes::from(mailbox.as_ref().to_owned()), }); let stream = self.execute(cmd).await?; diff --git a/imap/src/client/configurable_cert_verifier.rs b/imap/src/client/configurable_cert_verifier.rs new file mode 100644 index 0000000..4e90170 --- /dev/null +++ b/imap/src/client/configurable_cert_verifier.rs @@ -0,0 +1,104 @@ +//! Configurable cert verifier for rustls, can disable hostname verification, +//! etc. +//! +//! Based closely on https://github.com/rustls/rustls/blob/v/0.19.0/rustls/src/verify.rs#L253 + +use tokio_rustls::{ + rustls::{ + Certificate, OwnedTrustAnchor, RootCertStore, ServerCertVerified, ServerCertVerifier, + TLSError, + }, + webpki::{ + self, DNSNameRef, EndEntityCert, SignatureAlgorithm, TLSServerTrustAnchors, TrustAnchor, + }, +}; + +type SignatureAlgorithms = &'static [&'static SignatureAlgorithm]; + +static SUPPORTED_SIG_ALGS: SignatureAlgorithms = &[ + &webpki::ECDSA_P256_SHA256, + &webpki::ECDSA_P256_SHA384, + &webpki::ECDSA_P384_SHA256, + &webpki::ECDSA_P384_SHA384, + &webpki::ED25519, + &webpki::RSA_PSS_2048_8192_SHA256_LEGACY_KEY, + &webpki::RSA_PSS_2048_8192_SHA384_LEGACY_KEY, + &webpki::RSA_PSS_2048_8192_SHA512_LEGACY_KEY, + &webpki::RSA_PKCS1_2048_8192_SHA256, + &webpki::RSA_PKCS1_2048_8192_SHA384, + &webpki::RSA_PKCS1_2048_8192_SHA512, + &webpki::RSA_PKCS1_3072_8192_SHA384, +]; + +fn try_now() -> Result { + webpki::Time::try_from(std::time::SystemTime::now()) + .map_err(|_| TLSError::FailedToGetCurrentTime) +} + +#[derive(Builder)] +pub struct ConfigurableCertVerifier { + #[builder(default = "true")] + pub verify_hostname: bool, + + #[builder(default = "try_now")] + pub time: fn() -> Result, + // TODO: add custom signaturue algorithms? +} + +impl ServerCertVerifier for ConfigurableCertVerifier { + fn verify_server_cert( + &self, + roots: &RootCertStore, + presented_certs: &[Certificate], + dns_name: DNSNameRef<'_>, + ocsp_response: &[u8], + ) -> Result { + let (cert, chain, trustroots) = prepare(roots, presented_certs)?; + let now = (self.time)()?; + let cert = cert + .verify_is_valid_tls_server_cert( + SUPPORTED_SIG_ALGS, + &TLSServerTrustAnchors(&trustroots), + &chain, + now, + ) + .map_err(TLSError::WebPKIError) + .map(|_| cert)?; + + if !ocsp_response.is_empty() { + trace!("Unvalidated OCSP response: {:?}", ocsp_response.to_vec()); + } + + cert.verify_is_valid_for_dns_name(dns_name) + .map_err(TLSError::WebPKIError) + .map(|_| ServerCertVerified::assertion()) + } +} + +type CertChainAndRoots<'a, 'b> = (EndEntityCert<'a>, Vec<&'a [u8]>, Vec>); + +fn prepare<'a, 'b>( + roots: &'b RootCertStore, + presented_certs: &'a [Certificate], +) -> Result, TLSError> { + if presented_certs.is_empty() { + return Err(TLSError::NoCertificatesPresented); + } + + // EE cert must appear first. + let cert = EndEntityCert::from(&presented_certs[0].0).map_err(TLSError::WebPKIError)?; + + let chain: Vec<&'a [u8]> = presented_certs + .iter() + .skip(1) + .map(|cert| cert.0.as_ref()) + .collect(); + + let trustroots: Vec = roots + .roots + .iter() + .map(OwnedTrustAnchor::to_trust_anchor) + .collect(); + + Ok((cert, chain, trustroots)) +} diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index c34f6b1..c78be57 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -27,6 +27,8 @@ type ExitListener = oneshot::Receiver<()>; type GreetingSender = oneshot::Sender<()>; type GreetingWaiter = oneshot::Receiver<()>; +/// Low-level client, can directly read from and write to the stream +/// without the additional type-safety of the higher-level state machine. pub struct Inner { config: Config, tag_number: AtomicU32, @@ -41,6 +43,10 @@ pub struct Inner { greeting_rx: Option, } +struct CommandContainer { + command: Command, +} + impl Inner where C: AsyncRead + AsyncWrite + Unpin + Send + 'static, @@ -57,7 +63,7 @@ where // spawn the server->client loop let (read_exit, exit_rx) = oneshot::channel(); - let read_handle = tokio::spawn(read_loop(read_half, exit_rx)); + let read_handle = tokio::spawn(read_loop(read_half, exit_rx, greeting_tx)); // spawn the client->server loop let (write_exit, exit_rx) = oneshot::channel(); @@ -78,7 +84,7 @@ where }) } - pub async fn execute<'a>(&mut self, _command: Command<'a>) -> Result { todo!() } + pub async fn execute(&mut self, _command: Command) -> Result { todo!() } pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { // TODO: cache capabilities if needed? @@ -129,10 +135,19 @@ where // even requires this loop to stop (for example, TLS upgrade). // // when the loop exits, the read half of the stream will be returned -async fn read_loop(stream: ReadHalf, exit: ExitListener) -> ReadHalf +async fn read_loop( + stream: ReadHalf, + exit: ExitListener, + greeting_tx: GreetingSender, +) -> ReadHalf where C: AsyncRead, { + // this lets us "use up" the greeting sender + let mut greeting_tx = Some(greeting_tx); + + let mut curr_cmd: Option = None; + // set up framed communication let codec = ImapCodec::default(); let mut framed = FramedRead::new(stream, codec); @@ -146,6 +161,11 @@ where select! { msg = next => { println!("hellosu {:?}", msg); + + // if this is the very first response, then it's a greeting + if let Some(greeting_tx) = greeting_tx.take() { + greeting_tx.send(()).unwrap(); + } } _ = exit => break, } @@ -171,8 +191,8 @@ where line = write_fut => { if let Some(line) = line { // TODO: handle errors here - stream.write_all(line.as_bytes()).await; - stream.flush().await; + let _ = stream.write_all(line.as_bytes()).await; + let _ = stream.flush().await; trace!("C>>>S: {:?}", line); } } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index baeb9fc..2ef71c9 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -3,6 +3,11 @@ mod macros; pub mod auth; pub mod client; -pub mod inner; +pub mod configurable_cert_verifier; pub mod response_stream; pub mod upgrade; + +mod inner; + +pub use self::client::ConfigBuilder; +pub use self::inner::Inner; diff --git a/imap/src/codec.rs b/imap/src/codec.rs index 4d62f8a..ecc39da 100644 --- a/imap/src/codec.rs +++ b/imap/src/codec.rs @@ -1,9 +1,10 @@ use std::io; -use bytes::BytesMut; +use bytes::{Buf, BytesMut}; +use nom::Needed; use tokio_util::codec::{Decoder, Encoder}; -use crate::proto::{command::Command, response::Response}; +use crate::proto::{command::Command, response::Response, rfc3501::response as parse_response}; #[derive(Default)] pub struct ImapCodec { @@ -18,38 +19,35 @@ impl<'a> Decoder for ImapCodec { return Ok(None); } - todo!() - // let (response, rsp_len) = match Response::from_bytes(buf) { - // Ok((remaining, response)) => { - // // This SHOULD be acceptable/safe: BytesMut storage memory is - // // allocated on the heap and should not move. It will not be - // // freed as long as we keep a reference alive, which we do - // // by retaining a reference to the split buffer, below. - // let response = unsafe { mem::transmute(response) }; - // (response, buf.len() - remaining.len()) - // } - // Err(nom::Err::Incomplete(Needed::Size(min))) => { - // self.decode_need_message_bytes = min.get(); - // return Ok(None); - // } - // Err(nom::Err::Incomplete(_)) => { - // return Ok(None); - // } - // Err(nom::Err::Error(nom::error::Error { code, .. })) - // | Err(nom::Err::Failure(nom::error::Error { code, .. })) => { - // return Err(io::Error::new( - // io::ErrorKind::Other, - // format!("{:?} during parsing of {:?}", code, buf), - // )); - // } - // }; - // let raw = buf.split_to(rsp_len).freeze(); - // self.decode_need_message_bytes = 0; - // Ok(Some(ResponseData { raw, response })) + let buf2 = buf.split(); + let buf3 = buf2.clone().freeze(); + let (response, len) = match parse_response(buf3.clone().into()) { + Ok((remaining, response)) => (response, buf.len() - remaining.len()), + Err(nom::Err::Incomplete(Needed::Size(min))) => { + self.decode_need_message_bytes = min.get(); + return Ok(None); + } + Err(nom::Err::Incomplete(_)) => { + return Ok(None); + } + Err(nom::Err::Error(nom::error::Error { code, .. })) + | Err(nom::Err::Failure(nom::error::Error { code, .. })) => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("{:?} during parsing of {:?}", code, buf), + )); + } + }; + + buf.unsplit(buf2); + buf.advance(len); + + self.decode_need_message_bytes = 0; + Ok(Some(response)) } } -impl<'a> Encoder<&'a Command<'a>> for ImapCodec { +impl<'a> Encoder<&'a Command> for ImapCodec { type Error = io::Error; fn encode(&mut self, _msg: &Command, _dst: &mut BytesMut) -> Result<(), io::Error> { todo!() diff --git a/imap/src/inner.rs b/imap/src/inner.rs deleted file mode 100644 index 880a61e..0000000 --- a/imap/src/inner.rs +++ /dev/null @@ -1,312 +0,0 @@ -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll}; - -use anyhow::Result; -use futures::{ - future::{self, FutureExt, TryFutureExt}, - stream::{Stream, StreamExt}, -}; -use imap_proto::{builders::command::Command, types::Response}; -use tokio::{ - io::{ - self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, - }, - sync::{ - mpsc, - oneshot::{self, error::TryRecvError}, - }, - task::JoinHandle, -}; -use tokio_rustls::{ - client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, -}; -use tokio_util::codec::FramedRead; - -use crate::codec::{ImapCodec, ResponseData}; - -pub const TAG_PREFIX: &str = "ptag"; -type Command2 = (String, Command, mpsc::UnboundedSender); - -pub struct Client { - ctr: usize, - config: ClientConfig, - // conn: WriteHalf, - pub(crate) write_tx: mpsc::UnboundedSender, - cmd_tx: mpsc::UnboundedSender, - greeting_rx: Option>, - writer_exit_tx: oneshot::Sender<()>, - writer_handle: JoinHandle>>, - listener_exit_tx: oneshot::Sender<()>, - listener_handle: JoinHandle>>, -} - -impl Client -where - C: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - pub fn new(conn: C, config: ClientConfig) -> Self { - let (read_half, mut write_half) = io::split(conn); - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); - let (greeting_tx, greeting_rx) = oneshot::channel(); - - let (writer_exit_tx, exit_rx) = oneshot::channel(); - let (write_tx, mut write_rx) = mpsc::unbounded_channel::(); - let writer_handle = tokio::spawn(write(write_half, write_rx, exit_rx).map_err(|err| { - error!("Help, the writer loop died: {}", err); - err - })); - - let (exit_tx, exit_rx) = oneshot::channel(); - let listener_handle = tokio::spawn( - listen(read_half, cmd_rx, write_tx.clone(), greeting_tx, exit_rx).map_err(|err| { - error!("Help, the listener loop died: {:?} {}", err, err); - err - }), - ); - - Client { - ctr: 0, - // conn: write_half, - config, - cmd_tx, - write_tx, - greeting_rx: Some(greeting_rx), - writer_exit_tx, - listener_exit_tx: exit_tx, - writer_handle, - listener_handle, - } - } - - pub async fn wait_for_greeting(&mut self) -> Result<()> { - if let Some(greeting_rx) = self.greeting_rx.take() { - greeting_rx.await?; - } - Ok(()) - } - - pub async fn execute(&mut self, cmd: Command) -> Result { - let id = self.ctr; - self.ctr += 1; - - let tag = format!("{}{}", TAG_PREFIX, id); - // let cmd_str = format!("{} {}\r\n", tag, cmd); - // self.write_tx.send(cmd_str); - // self.conn.write_all(cmd_str.as_bytes()).await?; - // self.conn.flush().await?; - - let (tx, rx) = mpsc::unbounded_channel(); - self.cmd_tx.send((tag, cmd, tx))?; - - let stream = ResponseStream { inner: rx }; - Ok(stream) - } - - pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { - // TODO: cache capabilities if needed? - let cap = cap.as_ref(); - let cap = parse_capability(cap)?; - - let resp = self.execute(Command::Capability).await?; - let (_, data) = resp.wait().await?; - - for resp in data { - if let Response::Capabilities(caps) = resp { - return Ok(caps.contains(&cap)); - } - // debug!("cap: {:?}", resp); - } - - Ok(false) - } - - pub async fn upgrade(mut self) -> Result>> { - // TODO: make sure STARTTLS is in the capability list - if !self.has_capability("STARTTLS").await? { - bail!("server doesn't support this capability"); - } - - // first, send the STARTTLS command - let mut resp = self.execute(Command::Starttls).await?; - let resp = resp.next().await.unwrap(); - debug!("server response to starttls: {:?}", resp); - - debug!("sending exit for upgrade"); - // TODO: check that the channel is still open? - self.listener_exit_tx.send(()).unwrap(); - self.writer_exit_tx.send(()).unwrap(); - let (reader, writer) = future::join(self.listener_handle, self.writer_handle).await; - let reader = reader??; - let writer = writer??; - // let reader = self.listener_handle.await??; - // let writer = self.conn; - - let conn = reader.unsplit(writer); - let server_name = &self.config.hostname; - - let mut tls_config = RustlsConfig::new(); - tls_config - .root_store - .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); - let tls_config = TlsConnector::from(Arc::new(tls_config)); - let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap(); - let stream = tls_config.connect(dnsname, conn).await?; - debug!("upgraded, stream is using TLS now"); - - Ok(Client::new(stream, self.config)) - } -} - -pub struct ResponseStream { - pub(crate) inner: mpsc::UnboundedReceiver, -} - -impl ResponseStream { - /// Retrieves just the DONE item in the stream, discarding the rest - pub async fn done(mut self) -> Result> { - while let Some(resp) = self.inner.recv().await { - if let Response::Done(done) = resp { - return Ok(Some(done)); - } - } - Ok(None) - } - - /// Waits for the entire stream to finish, returning the DONE status and the stream - pub async fn wait(mut self) -> Result<(Option, Vec)> { - let mut done = None; - let mut vec = Vec::new(); - while let Some(resp) = self.inner.recv().await { - if let Response::Done(d) = resp { - done = Some(d); - break; - } else { - vec.push(resp); - } - } - Ok((done, vec)) - } -} - -impl Stream for ResponseStream { - type Item = ResponseData; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.inner.poll_recv(cx) - } -} - -#[allow(unreachable_code)] -async fn write( - mut conn: WriteHalf, - mut write_rx: mpsc::UnboundedReceiver, - exit_rx: oneshot::Receiver<()>, -) -> Result> -where - C: AsyncWrite + Unpin, -{ - let mut exit_rx = exit_rx.map_err(|_| ()).shared(); - loop { - let write_fut = write_rx.recv().fuse(); - pin_mut!(write_fut); - - select! { - _ = exit_rx => { - break; - } - - line = write_fut => { - if let Some(line) = line { - conn.write_all(line.as_bytes()).await?; - conn.flush().await?; - trace!("C>>>S: {:?}", line); - } - } - } - } - - Ok(conn) -} - -#[allow(unreachable_code)] -async fn listen( - conn: ReadHalf, - mut cmd_rx: mpsc::UnboundedReceiver, - mut write_tx: mpsc::UnboundedSender, - greeting_tx: oneshot::Sender<()>, - exit_rx: oneshot::Receiver<()>, -) -> Result> -where - C: AsyncRead + Unpin, -{ - let codec = ImapCodec::default(); - let mut framed = FramedRead::new(conn, codec); - let mut greeting_tx = Some(greeting_tx); - let mut curr_cmd: Option = None; - let mut exit_rx = exit_rx.map_err(|_| ()).shared(); - - loop { - // let mut next_line = String::new(); - // let read_fut = reader.read_line(&mut next_line).fuse(); - let read_fut = framed.next().fuse(); - pin_mut!(read_fut); - - // only listen for a new command if there isn't one already - let mut cmd_fut = if let Some(_) = curr_cmd { - // if there is one, just make a future that never resolves so it'll always pick the - // other options in the select. - future::pending().boxed().fuse() - } else { - cmd_rx.recv().boxed().fuse() - }; - - select! { - _ = exit_rx => { - debug!("exiting the loop"); - break; - } - - // read a command from the command list - cmd = cmd_fut => { - if curr_cmd.is_none() { - if let Some((ref tag, ref cmd, _)) = cmd { - let cmd_str = format!("{} {}\r\n", tag, cmd); - write_tx.send(cmd_str); - } - curr_cmd = cmd; - } - } - - // got a response from the server connection - resp = read_fut => { - let resp = match resp { - Some(Ok(v)) => v, - a => { error!("failed: {:?}", a); bail!("fuck"); }, - }; - trace!("S>>>C: {:?}", resp); - - // if this is the very first response, then it's a greeting - if let Some(greeting_tx) = greeting_tx.take() { - greeting_tx.send(()).unwrap(); - } - - if let Response::Done(_) = resp { - // since this is the DONE message, clear curr_cmd so another one can be sent - if let Some((_, _, cmd_tx)) = curr_cmd.take() { - let res = cmd_tx.send(resp); - // debug!("res0: {:?}", res); - } - } else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() { - // we got a response from the server for this command, so send it over the - // channel - // debug!("sending {:?} to tag {}", resp, tag); - let res = cmd_tx.send(resp); - // debug!("res1: {:?}", res); - } - } - } - } - - let conn = framed.into_inner(); - Ok(conn) -} diff --git a/imap/src/proto/bytes.rs b/imap/src/proto/bytes.rs index 132f676..babc5a2 100644 --- a/imap/src/proto/bytes.rs +++ b/imap/src/proto/bytes.rs @@ -21,6 +21,10 @@ impl From<&'static [u8]> for Bytes { fn from(slice: &'static [u8]) -> Self { Bytes(bytes::Bytes::from(slice)) } } +impl From<&'static str> for Bytes { + fn from(s: &'static str) -> Self { Bytes(bytes::Bytes::from(s.as_bytes())) } +} + impl From for Bytes { fn from(slice: String) -> Self { Bytes(bytes::Bytes::from(slice)) } } diff --git a/imap/src/proto/command.rs b/imap/src/proto/command.rs index 0718701..30f5e91 100644 --- a/imap/src/proto/command.rs +++ b/imap/src/proto/command.rs @@ -1,24 +1,26 @@ +use crate::proto::bytes::Bytes; + #[derive(Debug)] -pub enum Command<'a> { +pub enum Command { // Any state Capability, Noop, Logout, // Not authenticated - Login(CommandLogin<'a>), + Login(CommandLogin), Starttls, Authenticate, // Authenticated - Select(CommandSelect<'a>), + Select(CommandSelect), Examine, Create, Delete, Rename, Subscribe, Unsubscribe, - List(CommandList<'a>), + List(CommandList), Lsub, Status, Append, @@ -48,15 +50,15 @@ pub struct CommandFetch { } #[derive(Debug)] -pub struct CommandList<'a> { - pub reference: &'a str, - pub mailbox: &'a str, +pub struct CommandList { + pub reference: Bytes, + pub mailbox: Bytes, } #[derive(Debug)] -pub struct CommandLogin<'a> { - pub username: &'a str, - pub password: &'a str, +pub struct CommandLogin { + pub username: Bytes, + pub password: Bytes, } #[derive(Debug)] @@ -65,8 +67,8 @@ pub struct CommandSearch { } #[derive(Debug)] -pub struct CommandSelect<'a> { - pub mailbox: &'a str, +pub struct CommandSelect { + pub mailbox: Bytes, } //