fuck this is definitely wrong

This commit is contained in:
Michael Zhang 2021-02-21 07:42:40 -06:00
parent f6de40251b
commit 417c008190
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
10 changed files with 256 additions and 57 deletions

16
Cargo.lock generated
View file

@ -730,6 +730,15 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "owning_ref"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ff55baddef9e4ad00f88b6c743a2a8062d4c6ade126c2a528644b8e444d52ce"
dependencies = [
"stable_deref_trait",
]
[[package]] [[package]]
name = "panorama" name = "panorama"
version = "0.0.1" version = "0.0.1"
@ -770,6 +779,7 @@ dependencies = [
"derive_builder", "derive_builder",
"futures", "futures",
"nom 6.1.2", "nom 6.1.2",
"owning_ref",
"panorama-strings", "panorama-strings",
"parking_lot", "parking_lot",
"tokio", "tokio",
@ -1190,6 +1200,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3"
[[package]] [[package]]
name = "strsim" name = "strsim"
version = "0.8.0" version = "0.8.0"

View file

@ -12,8 +12,6 @@ license = "GPL-3.0-or-later"
members = ["imap", "strings"] members = ["imap", "strings"]
[dependencies] [dependencies]
# log = "0.4.14"
# fern = "0.6.0"
anyhow = "1.0.38" anyhow = "1.0.38"
async-trait = "0.1.42" async-trait = "0.1.42"
cfg-if = "1.0.0" cfg-if = "1.0.0"

View file

@ -22,6 +22,7 @@ webpki-roots = "0.21.0"
panorama-strings = { path = "../strings", version = "0" } panorama-strings = { path = "../strings", version = "0" }
parking_lot = "0.11.1" parking_lot = "0.11.1"
tracing = "0.1.23" tracing = "0.1.23"
owning_ref = "0.4.1"
[dev-dependencies] [dev-dependencies]
assert_matches = "1.3" assert_matches = "1.3"

View file

@ -1,36 +1,52 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll, Waker};
use anyhow::Result; use anyhow::{Context as AnyhowContext, Result};
use futures::future::{Future, FutureExt}; use futures::future::{self, Either, Future, FutureExt};
use panorama_strings::{StringEntry, StringStore}; use panorama_strings::{StringEntry, StringStore};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use tokio::{ use tokio::{
io::{ io::{
self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader,
WriteHalf, ReadHalf, WriteHalf,
}, },
sync::{mpsc, oneshot},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_rustls::{client::TlsStream, rustls::ClientConfig, webpki::DNSNameRef, TlsConnector};
use crate::command::Command; use crate::command::Command;
use crate::response::Response; use crate::types::Response;
use super::ClientNotConnected;
pub type BoxedFunc = Box<dyn Fn()>; pub type BoxedFunc = Box<dyn Fn()>;
pub type ResultMap = Arc<RwLock<HashMap<usize, (Option<String>, Option<Waker>)>>>;
pub type GreetingRx = Arc<RwLock<(bool, Option<Waker>)>>;
pub const TAG_PREFIX: &str = "panorama"; pub const TAG_PREFIX: &str = "panorama";
/// The private Client struct, that is shared by all of the exported structs in the state machine. /// The lower-level Client struct, that is shared by all of the exported structs in the state machine.
pub struct Client<C> { pub struct Client<C> {
config: ClientNotConnected,
conn: WriteHalf<C>, conn: WriteHalf<C>,
symbols: StringStore, symbols: StringStore,
id: usize, id: usize,
results: ResultMap, results: ResultMap,
/// cached set of capabilities
caps: Vec<StringEntry>, caps: Vec<StringEntry>,
handle: JoinHandle<Result<()>>,
/// join handle for the listener thread
listener_handle: JoinHandle<Result<ReadHalf<C>>>,
/// used for telling the listener thread to stop and return the read half
exit_tx: mpsc::Sender<()>,
/// used for receiving the greeting
greeting: GreetingRx,
} }
impl<C> Client<C> impl<C> Client<C>
@ -38,23 +54,38 @@ where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
/// Creates a new client that wraps a connection /// Creates a new client that wraps a connection
pub fn new(conn: C) -> Self { pub fn new(conn: C, config: ClientNotConnected) -> Self {
let (read_half, write_half) = io::split(conn); let (read_half, write_half) = io::split(conn);
let results = Arc::new(RwLock::new(HashMap::new())); let results = Arc::new(RwLock::new(HashMap::new()));
let listen_fut = tokio::spawn(listen(read_half, results.clone())); let (exit_tx, exit_rx) = mpsc::channel(1);
let greeting = Arc::new(RwLock::new((false, None)));
let listen_fut = tokio::spawn(listen(
read_half,
results.clone(),
exit_rx,
greeting.clone(),
));
Client { Client {
config,
conn: write_half, conn: write_half,
symbols: StringStore::new(256), symbols: StringStore::new(256),
id: 0, id: 0,
results, results,
caps: Vec::new(), caps: Vec::new(),
handle: listen_fut, listener_handle: listen_fut,
exit_tx,
greeting,
} }
} }
pub fn wait_for_greeting(&self) -> GreetingHandler {
debug!("waiting for greeting");
GreetingHandler(self.greeting.clone())
}
/// Sends a command to the server and returns a handle to retrieve the result /// Sends a command to the server and returns a handle to retrieve the result
pub async fn execute(&mut self, cmd: Command) -> Result<Response> { pub async fn execute(&mut self, cmd: Command) -> Result<String> {
debug!("executing command {:?}", cmd); debug!("executing command {:?}", cmd);
let id = self.id; let id = self.id;
self.id += 1; self.id += 1;
@ -73,17 +104,70 @@ where
let mut handlers = self.results.write(); let mut handlers = self.results.write();
handlers.remove(&id).unwrap().0.unwrap() handlers.remove(&id).unwrap().0.unwrap()
}; };
Ok(Response(resp)) Ok(resp)
} }
/// Executes the CAPABILITY command /// Executes the CAPABILITY command
pub async fn supports(&mut self) -> Result<()> { pub async fn capabilities(&mut self) -> Result<()> {
let cmd = Command::Capability; let cmd = Command::Capability;
debug!("sending: {:?} {:?}", cmd, cmd.to_string()); debug!("sending: {:?} {:?}", cmd, cmd.to_string());
let result = self.execute(cmd).await?; let result = self
debug!("result from supports: {:?}", result); .execute(cmd)
.await
.context("error executing CAPABILITY command")?;
let (_, resp) = Response::from_bytes(result.as_bytes())
.map_err(|err| anyhow!(""))
.context("error parsing response from CAPABILITY")?;
debug!("cap resp: {:?}", resp);
if let Response::Capabilities(caps) = resp {
debug!("capabilities: {:?}", caps);
}
Ok(()) Ok(())
} }
/// Attempts to upgrade this connection using STARTTLS
pub async fn upgrade(mut self) -> Result<Client<TlsStream<C>>> {
// TODO: make sure STARTTLS is in the capability list
// first, send the STARTTLS command
let resp = self.execute(Command::Starttls).await?;
debug!("server response to starttls: {:?}", resp);
debug!("sending exit ()");
self.exit_tx.send(()).await?;
let reader = self.listener_handle.await??;
let writer = self.conn;
let conn = reader.unsplit(writer);
let server_name = &self.config.hostname;
let mut tls_config = ClientConfig::new();
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap();
let stream = tls_config.connect(dnsname, conn).await?;
Ok(Client::new(stream, self.config))
}
}
pub struct GreetingHandler(GreetingRx);
impl Future for GreetingHandler {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let (state, waker) = &mut *self.0.write();
if waker.is_none() {
*waker = Some(cx.waker().clone());
}
match state {
true => Poll::Ready(()),
false => Poll::Pending,
}
}
} }
pub struct ExecHandle<'a, C>(&'a Client<C>, usize); pub struct ExecHandle<'a, C>(&'a Client<C>, usize);
@ -108,33 +192,67 @@ impl<'a, C> Future for ExecHandle<'a, C> {
} }
} }
use std::task::Waker; /// Main listen loop for the application
pub type ResultMap = Arc<RwLock<HashMap<usize, (Option<String>, Option<Waker>)>>>; async fn listen<C>(
conn: C,
async fn listen(conn: impl AsyncRead + Unpin, results: ResultMap) -> Result<()> { results: ResultMap,
mut exit: mpsc::Receiver<()>,
greeting: GreetingRx,
) -> Result<C>
where
C: AsyncRead + Unpin,
{
debug!("amogus"); debug!("amogus");
let mut reader = BufReader::new(conn); let mut reader = BufReader::new(conn);
let mut greeting = Some(greeting);
loop { loop {
let mut next_line = String::new(); let mut next_line = String::new();
reader.read_line(&mut next_line).await?; let fut = reader.read_line(&mut next_line).fuse();
let next_line = next_line.trim_end_matches('\r'); pin_mut!(fut);
let fut2 = exit.recv().fuse();
pin_mut!(fut2);
// debug!("line: {:?}", next_line); match future::select(fut, fut2).await {
let mut parts = next_line.split(" "); Either::Left((_, _)) => {
let tag = parts.next().unwrap(); let next_line = next_line.trim_end_matches('\n').trim_end_matches('\r');
let rest = parts.collect::<Vec<_>>().join(" ");
if tag == "*" { let mut parts = next_line.split(" ");
debug!("UNTAGGED {:?}", rest); let tag = parts.next().unwrap();
} else if tag.starts_with(TAG_PREFIX) { let rest = parts.collect::<Vec<_>>().join(" ");
let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
debug!("set {} to {:?}", id, rest); if tag == "*" {
let mut results = results.write(); debug!("UNTAGGED {:?}", rest);
if let Some((c, w)) = results.get_mut(&id) { if let Some(greeting) = greeting.take() {
*c = Some(rest.to_string()); let (greeting, waker) = &mut *greeting.write();
let w = w.take().unwrap(); debug!("got greeting");
w.wake(); *greeting = true;
if let Some(waker) = waker.take() {
waker.wake();
}
}
} else if tag.starts_with(TAG_PREFIX) {
let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
debug!("set {} to {:?}", id, rest);
let mut results = results.write();
if let Some((c, w)) = results.get_mut(&id) {
// *c = Some(rest.to_string());
*c = Some(next_line.to_owned());
if let Some(waker) = w.take() {
waker.wake();
}
}
}
}
Either::Right((_, _)) => {
debug!("exiting read loop");
break;
} }
} }
reader.read_line(&mut next_line).await?;
} }
let conn = reader.into_inner();
Ok(conn)
} }

View file

@ -21,7 +21,7 @@ use tokio::{
}; };
use tokio_rustls::{client::TlsStream, rustls::ClientConfig, webpki::DNSNameRef, TlsConnector}; use tokio_rustls::{client::TlsStream, rustls::ClientConfig, webpki::DNSNameRef, TlsConnector};
use self::inner::Client; pub use self::inner::Client;
/// Struct used to start building the config for a client. /// Struct used to start building the config for a client.
/// ///
@ -48,7 +48,7 @@ pub struct ClientNotConnected {
} }
impl ClientNotConnected { impl ClientNotConnected {
pub async fn connect(self) -> Result<ClientUnauthenticated> { pub async fn open(self) -> Result<ClientUnauthenticated> {
let hostname = self.hostname.as_ref(); let hostname = self.hostname.as_ref();
let port = self.port; let port = self.port;
let conn = TcpStream::connect((hostname, port)).await?; let conn = TcpStream::connect((hostname, port)).await?;
@ -62,16 +62,18 @@ impl ClientNotConnected {
let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap(); let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap();
let conn = tls_config.connect(dnsname, conn).await?; let conn = tls_config.connect(dnsname, conn).await?;
let inner = Client::new(conn); let inner = Client::new(conn, self);
inner.wait_for_greeting().await;
return Ok(ClientUnauthenticated::Encrypted( return Ok(ClientUnauthenticated::Encrypted(
ClientUnauthenticatedEncrypted { inner }, ClientUnauthenticatedEncrypted { inner },
)); ));
} else {
let inner = Client::new(conn, self);
inner.wait_for_greeting().await;
return Ok(ClientUnauthenticated::Unencrypted(
ClientUnauthenticatedUnencrypted { inner },
));
} }
let inner = Client::new(conn);
return Ok(ClientUnauthenticated::Unencrypted(
ClientUnauthenticatedUnencrypted { inner },
));
} }
} }
@ -81,10 +83,23 @@ pub enum ClientUnauthenticated {
} }
impl ClientUnauthenticated { impl ClientUnauthenticated {
pub async fn supports(&mut self) -> Result<()> { pub async fn upgrade(self) -> Result<ClientUnauthenticated> {
match self { match self {
ClientUnauthenticated::Encrypted(e) => e.inner.supports().await?, // this is a no-op, we don't need to upgrade
ClientUnauthenticated::Unencrypted(e) => e.inner.supports().await?, ClientUnauthenticated::Encrypted(_) => Ok(self),
ClientUnauthenticated::Unencrypted(e) => {
let client = ClientUnauthenticatedEncrypted {
inner: e.inner.upgrade().await?,
};
Ok(ClientUnauthenticated::Encrypted(client))
}
}
}
pub async fn capabilities(&mut self) -> Result<()> {
match self {
ClientUnauthenticated::Encrypted(e) => e.inner.capabilities().await?,
ClientUnauthenticated::Unencrypted(e) => e.inner.capabilities().await?,
} }
Ok(()) Ok(())
} }

View file

@ -4,12 +4,14 @@ use std::fmt;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum Command { pub enum Command {
Capability, Capability,
Starttls,
} }
impl fmt::Display for Command { impl fmt::Display for Command {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
Command::Capability => write!(f, "CAPABILITY"), Command::Capability => write!(f, "CAPABILITY"),
Command::Starttls => write!(f, "STARTTLS"),
} }
} }
} }

View file

@ -1,6 +1,10 @@
#[macro_use] #[macro_use]
extern crate anyhow;
#[macro_use]
extern crate derive_builder; extern crate derive_builder;
#[macro_use] #[macro_use]
extern crate futures;
#[macro_use]
extern crate tracing; extern crate tracing;
pub mod builders; pub mod builders;

View file

@ -1,2 +1,26 @@
#[derive(Clone, Debug)] use std::str::FromStr;
pub struct Response(pub String);
pub enum Response {
Capabilities(Vec<Capability>),
Done {
tag: RequestId,
status: Status,
code: Option<ResponseCode>,
information: Option<String>,
},
}
pub enum Capability {
Imap4rev1,
Auth(String),
Atom(String),
}
pub struct RequestId(pub String);
pub enum Status {
Ok,
No,
}
pub enum ResponseCode {}

View file

View file

@ -5,7 +5,10 @@ mod imap2;
use anyhow::Result; use anyhow::Result;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use panorama_imap::{client::ClientBuilder, command::Command as ImapCommand}; use panorama_imap::{
client::{ClientBuilder, ClientNotConnected},
command::Command as ImapCommand,
};
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle}; use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle};
use tokio_stream::wrappers::WatchStream; use tokio_stream::wrappers::WatchStream;
@ -48,7 +51,12 @@ pub async fn run_mail(
let handle = tokio::spawn(async { let handle = tokio::spawn(async {
for acct in config.mail_accounts.into_iter() { for acct in config.mail_accounts.into_iter() {
debug!("opening imap connection for {:?}", acct); debug!("opening imap connection for {:?}", acct);
osu(acct).await.unwrap(); match imap_main(acct).await {
Ok(_) => {}
Err(err) => {
error!("IMAP Error: {}", err);
}
}
// open_imap_connection(acct.imap).await.unwrap(); // open_imap_connection(acct.imap).await.unwrap();
} }
}); });
@ -59,8 +67,9 @@ pub async fn run_mail(
Ok(()) Ok(())
} }
async fn osu(acct: MailAccountConfig) -> Result<()> { /// The main sequence of steps for the IMAP thread to follow
let builder = ClientBuilder::default() async fn imap_main(acct: MailAccountConfig) -> Result<()> {
let builder: ClientNotConnected = ClientBuilder::default()
.hostname(acct.imap.server.clone()) .hostname(acct.imap.server.clone())
.port(acct.imap.port) .port(acct.imap.port)
.tls(matches!(acct.imap.tls, TlsMethod::On)) .tls(matches!(acct.imap.tls, TlsMethod::On))
@ -68,10 +77,22 @@ async fn osu(acct: MailAccountConfig) -> Result<()> {
.map_err(|err| anyhow!("err: {}", err))?; .map_err(|err| anyhow!("err: {}", err))?;
debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port); debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port);
let mut unauth = builder.connect().await?; let unauth = builder.open().await?;
debug!("sending CAPABILITY"); let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) {
unauth.supports().await?; debug!("attempting to upgrade");
let client = unauth.upgrade().await?;
debug!("upgrade successful");
client
} else {
unauth
};
debug!("preparing to auth");
// check if the authentication method is supported
// debug!("sending CAPABILITY");
// let result = unauth.capabilities().await?;
Ok(()) Ok(())
} }