Nuke the original imap impls

This commit is contained in:
Michael Zhang 2021-02-21 08:23:51 -06:00
parent 18ca5e540d
commit a5dc3eebcb
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
5 changed files with 45 additions and 448 deletions

View file

@ -9,9 +9,13 @@ Goals:
- Never have to actually close the application.
- Handles email, calendar, and address books using open standards.
- Unified "feed" that any app can submit to.
- Hot-reload on-disk config.
Stretch goals:
- Unified "feed" that any app can submit to.
- Submit notifications to gotify-shaped notification servers.
- RSS aggregator.
- IRC client??
Credits
-------

View file

@ -10,6 +10,13 @@ pub enum Response {
},
}
impl FromStr for Response {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
todo!()
}
}
pub enum Capability {
Imap4rev1,
Auth(String),

View file

@ -1,231 +0,0 @@
use std::collections::HashMap;
use std::fmt::Display;
use std::sync::Arc;
use anyhow::Result;
use futures::{
future::{self, Either},
pin_mut,
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use panorama_imap::{
builders::command::Command,
parser::parse_response,
types::{Capability, RequestId, Response, ResponseCode, State, Status},
};
use tokio::{net::TcpStream, sync::mpsc};
use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef, TlsConnector};
use tokio_util::codec::{Decoder, LinesCodec, LinesCodecError};
use crate::config::ImapConfig;
pub async fn open_imap_connection(config: ImapConfig) -> Result<()> {
debug!(
"Opening imap connection to {}:{}",
config.server, config.port
);
let server = config.server.as_str();
let port = config.port;
let client = TcpStream::connect((server, port)).await?;
let codec = LinesCodec::new();
let framed = codec.framed(client);
let mut state = State::NotAuthenticated;
let (sink, stream) = framed.split::<String>();
let result = listen_loop(config.clone(), &mut state, sink, stream, false).await?;
if let LoopExit::NegotiateTls(stream, sink) = result {
debug!("negotiating tls");
let mut tls_config = ClientConfig::new();
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(server).unwrap();
// reconstruct the original stream
let stream = stream.reunite(sink)?.into_inner();
// let stream = TcpStream::connect((server, port)).await?;
let stream = tls_config.connect(dnsname, stream).await?;
let codec = LinesCodec::new();
let framed = codec.framed(stream);
let (sink, stream) = framed.split::<String>();
listen_loop(config.clone(), &mut state, sink, stream, true).await?;
}
Ok(())
}
/// Action that should be taken after the connection loop quits.
enum LoopExit<S, S2> {
/// Used in case the STARTTLS command is issued; perform TLS negotiation on top of the current
/// stream.
///
/// S and S2 are stream and sink types, respectively.
NegotiateTls(S, S2),
Closed,
}
async fn listen_loop<S, S2>(
config: ImapConfig,
st: &mut State,
sink: S2,
mut stream: S,
with_ssl: bool,
) -> Result<LoopExit<S, S2>>
where
S: Stream<Item = Result<String, LinesCodecError>> + Unpin,
S2: Sink<String> + Unpin,
S2::Error: Display,
{
let (tx, mut rx) = mpsc::unbounded_channel::<()>();
let mut cmd_mgr = CommandManager::new(sink);
if with_ssl {
let cmd = Command {
args: b"CAPABILITY".to_vec(),
next_state: Some(State::Authenticated),
};
cmd_mgr.send(cmd, |_| {}).await?;
}
loop {
let fut1 = stream.next();
let fut2 = rx.recv();
pin_mut!(fut1);
pin_mut!(fut2);
debug!("waiting for next select");
match future::select(fut1, fut2).await {
Either::Left((line, _)) => {
let mut line = match line {
Some(v) => v?,
None => break,
};
line += "\r\n";
let (_, resp) = match parse_response(line.as_bytes()) {
Ok(v) => v,
Err(e) => bail!(e.to_string()),
};
debug!("<<< {:?}", resp);
match st {
State::Authenticated => {}
State::NotAuthenticated => match resp {
Response::Data {
status: Status::Ok,
code: Some(ResponseCode::Capabilities(caps)),
..
} => {
if !with_ssl {
// prepare to do TLS negotiation
let mut has_starttls = false;
for cap in caps {
if let Capability::Atom("STARTTLS") = cap {
has_starttls = true;
}
}
if has_starttls {
let cmd = Command {
args: b"STARTTLS".to_vec(),
next_state: None,
};
let tx = tx.clone();
cmd_mgr
.send(cmd, move |_| {
tx.send(()).unwrap();
})
.await?;
}
}
}
Response::Capabilities(_caps) => {
if with_ssl {
// send authentication information
let cmd = Command {
args: format!("LOGIN {} {}", config.username, config.password)
.as_bytes()
.to_vec(),
next_state: Some(State::Authenticated),
};
cmd_mgr.send(cmd, |_| {}).await?;
}
}
Response::Done { tag, code, .. } => {
cmd_mgr.process_done(tag, code)?;
}
_ => {}
},
_ => {}
}
}
Either::Right((_, _)) => {
debug!("ENCOUNTERED EXIT");
let sink = cmd_mgr.decompose();
return Ok(LoopExit::NegotiateTls(stream, sink));
}
}
}
Ok(LoopExit::Closed)
}
type InFlightFunc = Box<dyn Fn(Option<ResponseCode>) + Send>;
/// A struct in charge of managing multiple in-flight commands.
struct CommandManager<S> {
tag_idx: usize,
in_flight: HashMap<String, InFlightFunc>,
sink: S,
}
impl<S> CommandManager<S>
where
S: Sink<String> + Unpin,
{
pub fn new(sink: S) -> Self {
CommandManager {
tag_idx: 0,
in_flight: HashMap::new(),
sink,
}
}
pub fn decompose(self) -> S {
self.sink
}
pub async fn send(
&mut self,
cmd: Command,
cb: impl Fn(Option<ResponseCode>) + Send + 'static,
) -> Result<()> {
let tag_idx = self.tag_idx;
self.tag_idx += 1;
let cb = Box::new(cb);
let tag_str = format!("t{}", tag_idx);
let cmd_str = std::str::from_utf8(&cmd.args)?;
let full_str = format!("{} {}", tag_str, cmd_str);
self.in_flight.insert(tag_str.clone(), cb);
debug!(">>> {:?}", full_str);
self.sink
.send(full_str)
.await
.map_err(|_| anyhow!("failed to send command"))
}
pub fn process_done(&mut self, id: RequestId, code: Option<ResponseCode>) -> Result<()> {
let name = std::str::from_utf8(id.as_bytes())?;
if let Some(cb) = self.in_flight.remove(name) {
cb(code);
}
Ok(())
}
}

View file

@ -1,177 +0,0 @@
// let's try this again
use std::collections::HashMap;
use std::sync::Arc;
use anyhow::Result;
use futures::{
future::{self, BoxFuture, Future, FutureExt, TryFuture},
sink::{Sink, SinkExt},
stream::{Stream, StreamExt},
};
use panorama_imap::builders::command::Command;
use parking_lot::Mutex;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
sync::{oneshot, Notify},
};
use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef, TlsConnector};
use tokio_util::codec::{Decoder, Framed, FramedRead, FramedWrite, LinesCodec, LinesCodecError};
use crate::config::{ImapConfig, TlsMethod};
pub async fn open_imap_connection(config: ImapConfig) -> Result<()> {
let server = config.server.as_str();
let port = config.port;
let stream = TcpStream::connect((server, port)).await?;
debug!("hellosu");
match config.tls {
TlsMethod::Off => begin_authentication(config, stream).await,
TlsMethod::On => {
let stream = perform_tls_negotiation(server.to_owned(), stream).await?;
begin_authentication(config, stream).await
}
TlsMethod::Starttls => {
let (stream, cmd_mgr) = CommandManager::new(stream);
let flights = cmd_mgr.flights();
// listen(stream, flights).await?;
// async move {
// let mut cmd_mgr = cmd_mgr;
// cmd_mgr.capabilities().await;
// }
// .await;
todo!()
}
}
}
/// Performs TLS negotiation, using the webpki_roots and verifying the server name
#[instrument(skip(server_name, stream))]
async fn perform_tls_negotiation(
server_name: impl AsRef<str>,
stream: impl AsyncRead + AsyncWrite + Unpin,
) -> Result<impl AsyncRead + AsyncWrite> {
let server_name = server_name.as_ref();
let mut tls_config = ClientConfig::new();
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap();
let stream = tls_config.connect(dnsname, stream).await?;
Ok(stream)
}
async fn fetch_capabilities(stream: impl AsyncRead + AsyncWrite) -> Result<Vec<String>> {
let codec = LinesCodec::new();
let framed = codec.framed(stream);
todo!()
}
#[instrument(skip(config, stream))]
async fn begin_authentication(
config: ImapConfig,
stream: impl AsyncRead + AsyncWrite,
) -> Result<()> {
Ok(())
}
pub async fn listen(
mut stream: impl Stream<Item = Result<String, LinesCodecError>> + Unpin,
in_flight: InFlight,
) -> Result<()> {
debug!("listening for messages from server");
loop {
let line = match stream.next().await {
Some(v) => v?,
None => break,
};
debug!("line: {:?}", line);
let mut parts = line.split(' ');
let tag = parts.next().unwrap().parse()?; // TODO: handle empty
{
let mut in_flight = in_flight.lock();
if let Some(sender) = in_flight.remove(&tag) {
sender.send(()).unwrap();
}
}
}
Ok(())
}
// trait ImapStream: AsyncRead + AsyncWrite + Send + Unpin {}
// impl<T: AsyncRead + AsyncWrite + Send + Unpin> ImapStream for T {}
trait ImapSink: Sink<String, Error = LinesCodecError> + Unpin {}
impl<T: Sink<String, Error = LinesCodecError> + Unpin> ImapSink for T {}
type InFlightMap = HashMap<usize, oneshot::Sender<()>>;
type InFlight = Arc<Mutex<InFlightMap>>;
struct CommandManager<'a> {
id: usize,
in_flight: Arc<Mutex<HashMap<usize, oneshot::Sender<()>>>>,
sink: Box<dyn ImapSink + 'a>,
}
impl<'a> CommandManager<'a> {
pub fn new(
stream: impl AsyncRead + AsyncWrite + 'a,
) -> (impl Stream<Item = Result<String, LinesCodecError>>, Self) {
let codec = LinesCodec::new();
let framed = codec.framed(stream);
let (framed_sink, framed_stream) = framed.split();
let cmd_mgr = CommandManager {
id: 0,
in_flight: Arc::new(Mutex::new(HashMap::new())),
sink: Box::new(framed_sink),
};
(framed_stream, cmd_mgr)
}
pub fn flights(&self) -> Arc<Mutex<HashMap<usize, oneshot::Sender<()>>>> {
self.in_flight.clone()
}
pub fn decompose(self) -> impl ImapSink + Unpin + 'a {
self.sink
}
pub async fn capabilities(&mut self) -> Result<Vec<String>> {
self.exec(Command {
args: b"CAPABILITY".to_vec(),
next_state: None,
})
.await?;
Ok(vec![])
}
pub async fn exec(&mut self, command: Command) -> Result<()> {
let id = self.id;
self.id += 1;
let cmd_str = String::from_utf8(command.args)?;
self.sink.send(cmd_str).await?;
let (tx, rx) = oneshot::channel();
{
let mut in_flight = self.in_flight.lock();
in_flight.insert(id, tx);
}
rx.await?;
Ok(())
}
}

View file

@ -1,8 +1,5 @@
//! Mail
mod imap;
mod imap2;
use anyhow::Result;
use futures::stream::StreamExt;
use panorama_imap::{
@ -14,8 +11,6 @@ use tokio_stream::wrappers::WatchStream;
use crate::config::{Config, ConfigWatcher, MailAccountConfig, TlsMethod};
use self::imap2::open_imap_connection;
/// Command sent to the mail thread by something else (i.e. UI)
pub enum MailCommand {
/// Refresh the list
@ -30,7 +25,7 @@ pub async fn run_mail(
mut config_watcher: ConfigWatcher,
_cmd_in: UnboundedReceiver<MailCommand>,
) -> Result<()> {
let mut curr_conn: Option<JoinHandle<_>> = None;
let mut curr_conn: Vec<JoinHandle<_>> = Vec::new();
// let mut config_watcher = WatchStream::new(config_watcher);
loop {
@ -43,13 +38,13 @@ pub async fn run_mail(
// TODO: gracefully shut down connection
// just gonna drop the connection for now
if let Some(curr_conn) = curr_conn.take() {
debug!("dropping connection...");
curr_conn.abort();
debug!("dropping all connections...");
for conn in curr_conn.drain(0..) {
conn.abort();
}
let handle = tokio::spawn(async {
for acct in config.mail_accounts.into_iter() {
for acct in config.mail_accounts.into_iter() {
let handle = tokio::spawn(async move {
debug!("opening imap connection for {:?}", acct);
match imap_main(acct).await {
Ok(_) => {}
@ -57,11 +52,9 @@ pub async fn run_mail(
error!("IMAP Error: {}", err);
}
}
// open_imap_connection(acct.imap).await.unwrap();
}
});
curr_conn = Some(handle);
});
curr_conn.push(handle);
}
}
Ok(())
@ -69,31 +62,32 @@ pub async fn run_mail(
/// The main sequence of steps for the IMAP thread to follow
async fn imap_main(acct: MailAccountConfig) -> Result<()> {
let builder: ClientConfig = ClientBuilder::default()
.hostname(acct.imap.server.clone())
.port(acct.imap.port)
.tls(matches!(acct.imap.tls, TlsMethod::On))
.build()
.map_err(|err| anyhow!("err: {}", err))?;
// loop ensures that the connection is retried after it dies
loop {
let builder: ClientConfig = ClientBuilder::default()
.hostname(acct.imap.server.clone())
.port(acct.imap.port)
.tls(matches!(acct.imap.tls, TlsMethod::On))
.build()
.map_err(|err| anyhow!("err: {}", err))?;
debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port);
let unauth = builder.open().await?;
debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port);
let unauth = builder.open().await?;
let mut unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) {
debug!("attempting to upgrade");
let client = unauth.upgrade().await?;
debug!("upgrade successful");
client
} else {
unauth
};
let mut unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) {
debug!("attempting to upgrade");
let client = unauth.upgrade().await?;
debug!("upgrade successful");
client
} else {
unauth
};
debug!("preparing to auth");
// check if the authentication method is supported
unauth.capabilities().await?;
debug!("preparing to auth");
// check if the authentication method is supported
unauth.capabilities().await?;
// debug!("sending CAPABILITY");
// let result = unauth.capabilities().await?;
Ok(())
// debug!("sending CAPABILITY");
// let result = unauth.capabilities().await?;
}
}