This commit is contained in:
Michael Zhang 2021-08-09 00:36:10 -05:00
parent 9d31a13882
commit b75df0b8d6
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
15 changed files with 266 additions and 404 deletions

52
Cargo.lock generated
View file

@ -25,6 +25,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
@ -438,6 +449,7 @@ dependencies = [
"futures", "futures",
"log", "log",
"nom", "nom",
"stderrlog",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-util", "tokio-util",
@ -609,6 +621,19 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stderrlog"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45a53e2eff3e94a019afa6265e8ee04cb05b9d33fe9f5078b14e4e391d155a38"
dependencies = [
"atty",
"chrono",
"log",
"termcolor",
"thread_local",
]
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.10.0" version = "0.10.0"
@ -632,6 +657,24 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "termcolor"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4"
dependencies = [
"winapi-util",
]
[[package]]
name = "thread_local"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
dependencies = [
"lazy_static",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.44" version = "0.1.44"
@ -822,6 +865,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View file

@ -3,3 +3,6 @@ members = [
# "daemon", # "daemon",
"imap", "imap",
] ]
[profile.release]
lto = true

View file

@ -1,5 +1,5 @@
fmt: fmt:
cargo +nightly fmt --all cargo +nightly fmt --all
greenmail-test: mzhang-test:
cargo run -p imap --bin greenmail-test cargo run -p panorama-imap --bin mzhang-test --features=stderrlog

View file

@ -13,16 +13,14 @@ readme = "README.md"
workspace = ".." workspace = ".."
[[bin]] [[bin]]
name = "greenmail-test" name = "mzhang-test"
path = "bin/greenmail_test.rs" path = "bin/mzhang_test.rs"
required-features = ["stderrlog"]
[features] [features]
default = ["rfc2177-idle"] default = ["rfc2177-idle"]
rfc2177-idle = [] rfc2177-idle = []
[profile.release]
lto = true
[dependencies] [dependencies]
anyhow = "1.0.42" anyhow = "1.0.42"
async-trait = "0.1.51" async-trait = "0.1.51"
@ -34,6 +32,7 @@ futures = "0.3.16"
log = "0.4.14" log = "0.4.14"
nom = "6.2.1" nom = "6.2.1"
tokio = { version = "1.9.0", features = ["full"] } tokio = { version = "1.9.0", features = ["full"] }
tokio-rustls = "0.22.0" tokio-rustls = { version = "0.22.0", features = ["dangerous_configuration"] }
tokio-util = { version = "0.6.7", features = ["codec"] } tokio-util = { version = "0.6.7", features = ["codec"] }
webpki-roots = "0.21.1" webpki-roots = "0.21.1"
stderrlog = { version = "0.5.1", optional = true }

1
imap/bin/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/mzhang_test.rs

View file

@ -1,14 +0,0 @@
use anyhow::Result;
// use panorama_imap::client::ConfigBuilder;
#[tokio::main]
async fn main() -> Result<()> {
// let _client = ConfigBuilder::default()
// .hostname(String::from("localhost"))
// .port(3993)
// .tls(true)
// .connect()
// .await?;
Ok(())
}

View file

@ -1,7 +1,10 @@
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use crate::client::inner::Inner; use crate::client::inner::Inner;
use crate::proto::command::{Command, CommandLogin}; use crate::proto::{
bytes::Bytes,
command::{Command, CommandLogin},
};
pub trait Client: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static {} pub trait Client: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static {}
@ -24,8 +27,8 @@ impl AuthMethod for Login {
C: Client, C: Client,
{ {
let command = Command::Login(CommandLogin { let command = Command::Login(CommandLogin {
username: &self.username, username: Bytes::from(self.username.clone()),
password: &self.password, password: Bytes::from(self.password.clone()),
}); });
let _result = inner.execute(command).await; let _result = inner.execute(command).await;

View file

@ -1,5 +1,4 @@
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::Result; use anyhow::Result;
@ -8,11 +7,10 @@ use futures::{
stream::{Stream, StreamExt}, stream::{Stream, StreamExt},
}; };
use tokio::{net::TcpStream, sync::mpsc}; use tokio::{net::TcpStream, sync::mpsc};
use tokio_rustls::{ use tokio_rustls::client::TlsStream;
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
};
use crate::proto::{ use crate::proto::{
bytes::Bytes,
command::{ command::{
Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems, Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems,
SearchCriteria, SearchCriteria,
@ -25,6 +23,7 @@ use crate::proto::{
use super::inner::Inner; use super::inner::Inner;
use super::response_stream::ResponseStream; use super::response_stream::ResponseStream;
use super::upgrade::upgrade;
/// An IMAP client that hasn't been connected yet. /// An IMAP client that hasn't been connected yet.
#[derive(Builder, Clone, Debug)] #[derive(Builder, Clone, Debug)]
@ -39,11 +38,16 @@ pub struct Config {
/// Whether or not the client is using an encrypted stream. /// Whether or not the client is using an encrypted stream.
/// ///
/// To upgrade the connection later, use the upgrade method. /// To upgrade the connection later, use the upgrade method.
#[builder(default = "true")]
pub tls: bool, pub tls: bool,
/// Whether or not to verify hostname
#[builder(default = "true")]
pub verify_hostname: bool,
} }
impl ConfigBuilder { impl ConfigBuilder {
pub async fn open(self) -> Result<ClientUnauthenticated> { pub async fn open(&self) -> Result<ClientUnauthenticated> {
let config = self.build()?; let config = self.build()?;
let hostname = config.hostname.as_ref(); let hostname = config.hostname.as_ref();
@ -51,14 +55,7 @@ impl ConfigBuilder {
let conn = TcpStream::connect((hostname, port)).await?; let conn = TcpStream::connect((hostname, port)).await?;
if config.tls { if config.tls {
let mut tls_config = RustlsConfig::new(); let conn = upgrade(conn, hostname).await?;
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap();
let conn = tls_config.connect(dnsname, conn).await?;
let mut inner = Inner::new(conn, config).await?; let mut inner = Inner::new(conn, config).await?;
inner.wait_for_greeting().await?; inner.wait_for_greeting().await?;
return Ok(ClientUnauthenticated::Encrypted(inner)); return Ok(ClientUnauthenticated::Encrypted(inner));
@ -86,7 +83,7 @@ impl ClientUnauthenticated {
} }
} }
client_expose!(async execute<'a>(cmd: Command<'a>) -> Result<ResponseStream>); client_expose!(async execute(cmd: Command) -> Result<ResponseStream>);
client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>); client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>);
} }
@ -96,7 +93,7 @@ pub enum ClientAuthenticated {
} }
impl ClientAuthenticated { impl ClientAuthenticated {
client_expose!(async execute<'a>(cmd: Command<'a>) -> Result<ResponseStream>); client_expose!(async execute(cmd: Command) -> Result<ResponseStream>);
client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>); client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>);
fn sender(&self) -> mpsc::UnboundedSender<String> { fn sender(&self) -> mpsc::UnboundedSender<String> {
@ -109,8 +106,8 @@ impl ClientAuthenticated {
/// Runs the LIST command /// Runs the LIST command
pub async fn list(&mut self) -> Result<Vec<Mailbox>> { pub async fn list(&mut self) -> Result<Vec<Mailbox>> {
let cmd = Command::List(CommandList { let cmd = Command::List(CommandList {
reference: "", reference: Bytes::from(""),
mailbox: "*", mailbox: Bytes::from("*"),
}); });
let res = self.execute(cmd).await?; let res = self.execute(cmd).await?;
@ -129,7 +126,7 @@ impl ClientAuthenticated {
/// Runs the SELECT command /// Runs the SELECT command
pub async fn select(&mut self, mailbox: impl AsRef<str>) -> Result<SelectResponse> { pub async fn select(&mut self, mailbox: impl AsRef<str>) -> Result<SelectResponse> {
let cmd = Command::Select(CommandSelect { let cmd = Command::Select(CommandSelect {
mailbox: mailbox.as_ref(), mailbox: Bytes::from(mailbox.as_ref().to_owned()),
}); });
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;

View file

@ -0,0 +1,104 @@
//! Configurable cert verifier for rustls, can disable hostname verification,
//! etc.
//!
//! Based closely on https://github.com/rustls/rustls/blob/v/0.19.0/rustls/src/verify.rs#L253
use tokio_rustls::{
rustls::{
Certificate, OwnedTrustAnchor, RootCertStore, ServerCertVerified, ServerCertVerifier,
TLSError,
},
webpki::{
self, DNSNameRef, EndEntityCert, SignatureAlgorithm, TLSServerTrustAnchors, TrustAnchor,
},
};
type SignatureAlgorithms = &'static [&'static SignatureAlgorithm];
static SUPPORTED_SIG_ALGS: SignatureAlgorithms = &[
&webpki::ECDSA_P256_SHA256,
&webpki::ECDSA_P256_SHA384,
&webpki::ECDSA_P384_SHA256,
&webpki::ECDSA_P384_SHA384,
&webpki::ED25519,
&webpki::RSA_PSS_2048_8192_SHA256_LEGACY_KEY,
&webpki::RSA_PSS_2048_8192_SHA384_LEGACY_KEY,
&webpki::RSA_PSS_2048_8192_SHA512_LEGACY_KEY,
&webpki::RSA_PKCS1_2048_8192_SHA256,
&webpki::RSA_PKCS1_2048_8192_SHA384,
&webpki::RSA_PKCS1_2048_8192_SHA512,
&webpki::RSA_PKCS1_3072_8192_SHA384,
];
fn try_now() -> Result<webpki::Time, TLSError> {
webpki::Time::try_from(std::time::SystemTime::now())
.map_err(|_| TLSError::FailedToGetCurrentTime)
}
#[derive(Builder)]
pub struct ConfigurableCertVerifier {
#[builder(default = "true")]
pub verify_hostname: bool,
#[builder(default = "try_now")]
pub time: fn() -> Result<webpki::Time, TLSError>,
// TODO: add custom signaturue algorithms?
}
impl ServerCertVerifier for ConfigurableCertVerifier {
fn verify_server_cert(
&self,
roots: &RootCertStore,
presented_certs: &[Certificate],
dns_name: DNSNameRef<'_>,
ocsp_response: &[u8],
) -> Result<ServerCertVerified, TLSError> {
let (cert, chain, trustroots) = prepare(roots, presented_certs)?;
let now = (self.time)()?;
let cert = cert
.verify_is_valid_tls_server_cert(
SUPPORTED_SIG_ALGS,
&TLSServerTrustAnchors(&trustroots),
&chain,
now,
)
.map_err(TLSError::WebPKIError)
.map(|_| cert)?;
if !ocsp_response.is_empty() {
trace!("Unvalidated OCSP response: {:?}", ocsp_response.to_vec());
}
cert.verify_is_valid_for_dns_name(dns_name)
.map_err(TLSError::WebPKIError)
.map(|_| ServerCertVerified::assertion())
}
}
type CertChainAndRoots<'a, 'b> = (EndEntityCert<'a>, Vec<&'a [u8]>, Vec<TrustAnchor<'b>>);
fn prepare<'a, 'b>(
roots: &'b RootCertStore,
presented_certs: &'a [Certificate],
) -> Result<CertChainAndRoots<'a, 'b>, TLSError> {
if presented_certs.is_empty() {
return Err(TLSError::NoCertificatesPresented);
}
// EE cert must appear first.
let cert = EndEntityCert::from(&presented_certs[0].0).map_err(TLSError::WebPKIError)?;
let chain: Vec<&'a [u8]> = presented_certs
.iter()
.skip(1)
.map(|cert| cert.0.as_ref())
.collect();
let trustroots: Vec<TrustAnchor> = roots
.roots
.iter()
.map(OwnedTrustAnchor::to_trust_anchor)
.collect();
Ok((cert, chain, trustroots))
}

View file

@ -27,6 +27,8 @@ type ExitListener = oneshot::Receiver<()>;
type GreetingSender = oneshot::Sender<()>; type GreetingSender = oneshot::Sender<()>;
type GreetingWaiter = oneshot::Receiver<()>; type GreetingWaiter = oneshot::Receiver<()>;
/// Low-level client, can directly read from and write to the stream
/// without the additional type-safety of the higher-level state machine.
pub struct Inner<C> { pub struct Inner<C> {
config: Config, config: Config,
tag_number: AtomicU32, tag_number: AtomicU32,
@ -41,6 +43,10 @@ pub struct Inner<C> {
greeting_rx: Option<GreetingWaiter>, greeting_rx: Option<GreetingWaiter>,
} }
struct CommandContainer {
command: Command,
}
impl<C> Inner<C> impl<C> Inner<C>
where where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
@ -57,7 +63,7 @@ where
// spawn the server->client loop // spawn the server->client loop
let (read_exit, exit_rx) = oneshot::channel(); let (read_exit, exit_rx) = oneshot::channel();
let read_handle = tokio::spawn(read_loop(read_half, exit_rx)); let read_handle = tokio::spawn(read_loop(read_half, exit_rx, greeting_tx));
// spawn the client->server loop // spawn the client->server loop
let (write_exit, exit_rx) = oneshot::channel(); let (write_exit, exit_rx) = oneshot::channel();
@ -78,7 +84,7 @@ where
}) })
} }
pub async fn execute<'a>(&mut self, _command: Command<'a>) -> Result<ResponseStream> { todo!() } pub async fn execute(&mut self, _command: Command) -> Result<ResponseStream> { todo!() }
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> { pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
// TODO: cache capabilities if needed? // TODO: cache capabilities if needed?
@ -129,10 +135,19 @@ where
// even requires this loop to stop (for example, TLS upgrade). // even requires this loop to stop (for example, TLS upgrade).
// //
// when the loop exits, the read half of the stream will be returned // when the loop exits, the read half of the stream will be returned
async fn read_loop<C>(stream: ReadHalf<C>, exit: ExitListener) -> ReadHalf<C> async fn read_loop<C>(
stream: ReadHalf<C>,
exit: ExitListener,
greeting_tx: GreetingSender,
) -> ReadHalf<C>
where where
C: AsyncRead, C: AsyncRead,
{ {
// this lets us "use up" the greeting sender
let mut greeting_tx = Some(greeting_tx);
let mut curr_cmd: Option<CommandContainer> = None;
// set up framed communication // set up framed communication
let codec = ImapCodec::default(); let codec = ImapCodec::default();
let mut framed = FramedRead::new(stream, codec); let mut framed = FramedRead::new(stream, codec);
@ -146,6 +161,11 @@ where
select! { select! {
msg = next => { msg = next => {
println!("hellosu {:?}", msg); println!("hellosu {:?}", msg);
// if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() {
greeting_tx.send(()).unwrap();
}
} }
_ = exit => break, _ = exit => break,
} }
@ -171,8 +191,8 @@ where
line = write_fut => { line = write_fut => {
if let Some(line) = line { if let Some(line) = line {
// TODO: handle errors here // TODO: handle errors here
stream.write_all(line.as_bytes()).await; let _ = stream.write_all(line.as_bytes()).await;
stream.flush().await; let _ = stream.flush().await;
trace!("C>>>S: {:?}", line); trace!("C>>>S: {:?}", line);
} }
} }

View file

@ -3,6 +3,11 @@ mod macros;
pub mod auth; pub mod auth;
pub mod client; pub mod client;
pub mod inner; pub mod configurable_cert_verifier;
pub mod response_stream; pub mod response_stream;
pub mod upgrade; pub mod upgrade;
mod inner;
pub use self::client::ConfigBuilder;
pub use self::inner::Inner;

View file

@ -1,9 +1,10 @@
use std::io; use std::io;
use bytes::BytesMut; use bytes::{Buf, BytesMut};
use nom::Needed;
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use crate::proto::{command::Command, response::Response}; use crate::proto::{command::Command, response::Response, rfc3501::response as parse_response};
#[derive(Default)] #[derive(Default)]
pub struct ImapCodec { pub struct ImapCodec {
@ -18,38 +19,35 @@ impl<'a> Decoder for ImapCodec {
return Ok(None); return Ok(None);
} }
todo!() let buf2 = buf.split();
// let (response, rsp_len) = match Response::from_bytes(buf) { let buf3 = buf2.clone().freeze();
// Ok((remaining, response)) => { let (response, len) = match parse_response(buf3.clone().into()) {
// // This SHOULD be acceptable/safe: BytesMut storage memory is Ok((remaining, response)) => (response, buf.len() - remaining.len()),
// // allocated on the heap and should not move. It will not be Err(nom::Err::Incomplete(Needed::Size(min))) => {
// // freed as long as we keep a reference alive, which we do self.decode_need_message_bytes = min.get();
// // by retaining a reference to the split buffer, below. return Ok(None);
// let response = unsafe { mem::transmute(response) }; }
// (response, buf.len() - remaining.len()) Err(nom::Err::Incomplete(_)) => {
// } return Ok(None);
// Err(nom::Err::Incomplete(Needed::Size(min))) => { }
// self.decode_need_message_bytes = min.get(); Err(nom::Err::Error(nom::error::Error { code, .. }))
// return Ok(None); | Err(nom::Err::Failure(nom::error::Error { code, .. })) => {
// } return Err(io::Error::new(
// Err(nom::Err::Incomplete(_)) => { io::ErrorKind::Other,
// return Ok(None); format!("{:?} during parsing of {:?}", code, buf),
// } ));
// Err(nom::Err::Error(nom::error::Error { code, .. })) }
// | Err(nom::Err::Failure(nom::error::Error { code, .. })) => { };
// return Err(io::Error::new(
// io::ErrorKind::Other, buf.unsplit(buf2);
// format!("{:?} during parsing of {:?}", code, buf), buf.advance(len);
// ));
// } self.decode_need_message_bytes = 0;
// }; Ok(Some(response))
// let raw = buf.split_to(rsp_len).freeze();
// self.decode_need_message_bytes = 0;
// Ok(Some(ResponseData { raw, response }))
} }
} }
impl<'a> Encoder<&'a Command<'a>> for ImapCodec { impl<'a> Encoder<&'a Command> for ImapCodec {
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, _msg: &Command, _dst: &mut BytesMut) -> Result<(), io::Error> { fn encode(&mut self, _msg: &Command, _dst: &mut BytesMut) -> Result<(), io::Error> {
todo!() todo!()

View file

@ -1,312 +0,0 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Result;
use futures::{
future::{self, FutureExt, TryFutureExt},
stream::{Stream, StreamExt},
};
use imap_proto::{builders::command::Command, types::Response};
use tokio::{
io::{
self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf,
},
sync::{
mpsc,
oneshot::{self, error::TryRecvError},
},
task::JoinHandle,
};
use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
};
use tokio_util::codec::FramedRead;
use crate::codec::{ImapCodec, ResponseData};
pub const TAG_PREFIX: &str = "ptag";
type Command2 = (String, Command, mpsc::UnboundedSender<ResponseData>);
pub struct Client<C> {
ctr: usize,
config: ClientConfig,
// conn: WriteHalf<C>,
pub(crate) write_tx: mpsc::UnboundedSender<String>,
cmd_tx: mpsc::UnboundedSender<Command2>,
greeting_rx: Option<oneshot::Receiver<()>>,
writer_exit_tx: oneshot::Sender<()>,
writer_handle: JoinHandle<Result<WriteHalf<C>>>,
listener_exit_tx: oneshot::Sender<()>,
listener_handle: JoinHandle<Result<ReadHalf<C>>>,
}
impl<C> Client<C>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
pub fn new(conn: C, config: ClientConfig) -> Self {
let (read_half, mut write_half) = io::split(conn);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (greeting_tx, greeting_rx) = oneshot::channel();
let (writer_exit_tx, exit_rx) = oneshot::channel();
let (write_tx, mut write_rx) = mpsc::unbounded_channel::<String>();
let writer_handle = tokio::spawn(write(write_half, write_rx, exit_rx).map_err(|err| {
error!("Help, the writer loop died: {}", err);
err
}));
let (exit_tx, exit_rx) = oneshot::channel();
let listener_handle = tokio::spawn(
listen(read_half, cmd_rx, write_tx.clone(), greeting_tx, exit_rx).map_err(|err| {
error!("Help, the listener loop died: {:?} {}", err, err);
err
}),
);
Client {
ctr: 0,
// conn: write_half,
config,
cmd_tx,
write_tx,
greeting_rx: Some(greeting_rx),
writer_exit_tx,
listener_exit_tx: exit_tx,
writer_handle,
listener_handle,
}
}
pub async fn wait_for_greeting(&mut self) -> Result<()> {
if let Some(greeting_rx) = self.greeting_rx.take() {
greeting_rx.await?;
}
Ok(())
}
pub async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
let id = self.ctr;
self.ctr += 1;
let tag = format!("{}{}", TAG_PREFIX, id);
// let cmd_str = format!("{} {}\r\n", tag, cmd);
// self.write_tx.send(cmd_str);
// self.conn.write_all(cmd_str.as_bytes()).await?;
// self.conn.flush().await?;
let (tx, rx) = mpsc::unbounded_channel();
self.cmd_tx.send((tag, cmd, tx))?;
let stream = ResponseStream { inner: rx };
Ok(stream)
}
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
// TODO: cache capabilities if needed?
let cap = cap.as_ref();
let cap = parse_capability(cap)?;
let resp = self.execute(Command::Capability).await?;
let (_, data) = resp.wait().await?;
for resp in data {
if let Response::Capabilities(caps) = resp {
return Ok(caps.contains(&cap));
}
// debug!("cap: {:?}", resp);
}
Ok(false)
}
pub async fn upgrade(mut self) -> Result<Client<TlsStream<C>>> {
// TODO: make sure STARTTLS is in the capability list
if !self.has_capability("STARTTLS").await? {
bail!("server doesn't support this capability");
}
// first, send the STARTTLS command
let mut resp = self.execute(Command::Starttls).await?;
let resp = resp.next().await.unwrap();
debug!("server response to starttls: {:?}", resp);
debug!("sending exit for upgrade");
// TODO: check that the channel is still open?
self.listener_exit_tx.send(()).unwrap();
self.writer_exit_tx.send(()).unwrap();
let (reader, writer) = future::join(self.listener_handle, self.writer_handle).await;
let reader = reader??;
let writer = writer??;
// let reader = self.listener_handle.await??;
// let writer = self.conn;
let conn = reader.unsplit(writer);
let server_name = &self.config.hostname;
let mut tls_config = RustlsConfig::new();
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap();
let stream = tls_config.connect(dnsname, conn).await?;
debug!("upgraded, stream is using TLS now");
Ok(Client::new(stream, self.config))
}
}
pub struct ResponseStream {
pub(crate) inner: mpsc::UnboundedReceiver<ResponseData>,
}
impl ResponseStream {
/// Retrieves just the DONE item in the stream, discarding the rest
pub async fn done(mut self) -> Result<Option<ResponseDone>> {
while let Some(resp) = self.inner.recv().await {
if let Response::Done(done) = resp {
return Ok(Some(done));
}
}
Ok(None)
}
/// Waits for the entire stream to finish, returning the DONE status and the stream
pub async fn wait(mut self) -> Result<(Option<ResponseDone>, Vec<ResponseData>)> {
let mut done = None;
let mut vec = Vec::new();
while let Some(resp) = self.inner.recv().await {
if let Response::Done(d) = resp {
done = Some(d);
break;
} else {
vec.push(resp);
}
}
Ok((done, vec))
}
}
impl Stream for ResponseStream {
type Item = ResponseData;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}
#[allow(unreachable_code)]
async fn write<C>(
mut conn: WriteHalf<C>,
mut write_rx: mpsc::UnboundedReceiver<String>,
exit_rx: oneshot::Receiver<()>,
) -> Result<WriteHalf<C>>
where
C: AsyncWrite + Unpin,
{
let mut exit_rx = exit_rx.map_err(|_| ()).shared();
loop {
let write_fut = write_rx.recv().fuse();
pin_mut!(write_fut);
select! {
_ = exit_rx => {
break;
}
line = write_fut => {
if let Some(line) = line {
conn.write_all(line.as_bytes()).await?;
conn.flush().await?;
trace!("C>>>S: {:?}", line);
}
}
}
}
Ok(conn)
}
#[allow(unreachable_code)]
async fn listen<C>(
conn: ReadHalf<C>,
mut cmd_rx: mpsc::UnboundedReceiver<Command2>,
mut write_tx: mpsc::UnboundedSender<String>,
greeting_tx: oneshot::Sender<()>,
exit_rx: oneshot::Receiver<()>,
) -> Result<ReadHalf<C>>
where
C: AsyncRead + Unpin,
{
let codec = ImapCodec::default();
let mut framed = FramedRead::new(conn, codec);
let mut greeting_tx = Some(greeting_tx);
let mut curr_cmd: Option<Command2> = None;
let mut exit_rx = exit_rx.map_err(|_| ()).shared();
loop {
// let mut next_line = String::new();
// let read_fut = reader.read_line(&mut next_line).fuse();
let read_fut = framed.next().fuse();
pin_mut!(read_fut);
// only listen for a new command if there isn't one already
let mut cmd_fut = if let Some(_) = curr_cmd {
// if there is one, just make a future that never resolves so it'll always pick the
// other options in the select.
future::pending().boxed().fuse()
} else {
cmd_rx.recv().boxed().fuse()
};
select! {
_ = exit_rx => {
debug!("exiting the loop");
break;
}
// read a command from the command list
cmd = cmd_fut => {
if curr_cmd.is_none() {
if let Some((ref tag, ref cmd, _)) = cmd {
let cmd_str = format!("{} {}\r\n", tag, cmd);
write_tx.send(cmd_str);
}
curr_cmd = cmd;
}
}
// got a response from the server connection
resp = read_fut => {
let resp = match resp {
Some(Ok(v)) => v,
a => { error!("failed: {:?}", a); bail!("fuck"); },
};
trace!("S>>>C: {:?}", resp);
// if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() {
greeting_tx.send(()).unwrap();
}
if let Response::Done(_) = resp {
// since this is the DONE message, clear curr_cmd so another one can be sent
if let Some((_, _, cmd_tx)) = curr_cmd.take() {
let res = cmd_tx.send(resp);
// debug!("res0: {:?}", res);
}
} else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() {
// we got a response from the server for this command, so send it over the
// channel
// debug!("sending {:?} to tag {}", resp, tag);
let res = cmd_tx.send(resp);
// debug!("res1: {:?}", res);
}
}
}
}
let conn = framed.into_inner();
Ok(conn)
}

View file

@ -21,6 +21,10 @@ impl From<&'static [u8]> for Bytes {
fn from(slice: &'static [u8]) -> Self { Bytes(bytes::Bytes::from(slice)) } fn from(slice: &'static [u8]) -> Self { Bytes(bytes::Bytes::from(slice)) }
} }
impl From<&'static str> for Bytes {
fn from(s: &'static str) -> Self { Bytes(bytes::Bytes::from(s.as_bytes())) }
}
impl From<String> for Bytes { impl From<String> for Bytes {
fn from(slice: String) -> Self { Bytes(bytes::Bytes::from(slice)) } fn from(slice: String) -> Self { Bytes(bytes::Bytes::from(slice)) }
} }

View file

@ -1,24 +1,26 @@
use crate::proto::bytes::Bytes;
#[derive(Debug)] #[derive(Debug)]
pub enum Command<'a> { pub enum Command {
// Any state // Any state
Capability, Capability,
Noop, Noop,
Logout, Logout,
// Not authenticated // Not authenticated
Login(CommandLogin<'a>), Login(CommandLogin),
Starttls, Starttls,
Authenticate, Authenticate,
// Authenticated // Authenticated
Select(CommandSelect<'a>), Select(CommandSelect),
Examine, Examine,
Create, Create,
Delete, Delete,
Rename, Rename,
Subscribe, Subscribe,
Unsubscribe, Unsubscribe,
List(CommandList<'a>), List(CommandList),
Lsub, Lsub,
Status, Status,
Append, Append,
@ -48,15 +50,15 @@ pub struct CommandFetch {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct CommandList<'a> { pub struct CommandList {
pub reference: &'a str, pub reference: Bytes,
pub mailbox: &'a str, pub mailbox: Bytes,
} }
#[derive(Debug)] #[derive(Debug)]
pub struct CommandLogin<'a> { pub struct CommandLogin {
pub username: &'a str, pub username: Bytes,
pub password: &'a str, pub password: Bytes,
} }
#[derive(Debug)] #[derive(Debug)]
@ -65,8 +67,8 @@ pub struct CommandSearch {
} }
#[derive(Debug)] #[derive(Debug)]
pub struct CommandSelect<'a> { pub struct CommandSelect {
pub mailbox: &'a str, pub mailbox: Bytes,
} }
// //