actually use the newly parsed responses instead of the original shitty ones

This commit is contained in:
Michael Zhang 2021-02-22 03:06:40 -06:00
parent 68b0b70ffd
commit 534833d616
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
3 changed files with 101 additions and 44 deletions

View file

@ -19,10 +19,12 @@ use tokio_rustls::{
}; };
use crate::command::Command; use crate::command::Command;
use crate::response::Response; use crate::response::{Capability, Response, ResponseCode};
use crate::types::Status;
use super::ClientConfig; use super::ClientConfig;
pub type CapsLock = Arc<RwLock<Option<Vec<Capability>>>>;
pub type ResultMap = Arc<RwLock<HashMap<usize, (Option<Response>, Option<Waker>)>>>; pub type ResultMap = Arc<RwLock<HashMap<usize, (Option<Response>, Option<Waker>)>>>;
pub type GreetingState = Arc<RwLock<(bool, Option<Waker>)>>; pub type GreetingState = Arc<RwLock<(bool, Option<Waker>)>>;
pub const TAG_PREFIX: &str = "panorama"; pub const TAG_PREFIX: &str = "panorama";
@ -36,7 +38,7 @@ pub struct Client<C> {
results: ResultMap, results: ResultMap,
/// cached set of capabilities /// cached set of capabilities
caps: Vec<String>, caps: CapsLock,
/// join handle for the listener thread /// join handle for the listener thread
listener_handle: JoinHandle<Result<ReadHalf<C>>>, listener_handle: JoinHandle<Result<ReadHalf<C>>>,
@ -58,8 +60,11 @@ where
let results = Arc::new(RwLock::new(HashMap::new())); let results = Arc::new(RwLock::new(HashMap::new()));
let (exit_tx, exit_rx) = mpsc::channel(1); let (exit_tx, exit_rx) = mpsc::channel(1);
let greeting = Arc::new(RwLock::new((false, None))); let greeting = Arc::new(RwLock::new((false, None)));
let listen_fut = tokio::spawn(listen( let caps: CapsLock = Arc::new(RwLock::new(None));
let listener_handle = tokio::spawn(listen(
read_half, read_half,
caps.clone(),
results.clone(), results.clone(),
exit_rx, exit_rx,
greeting.clone(), greeting.clone(),
@ -70,8 +75,8 @@ where
conn: write_half, conn: write_half,
id: 0, id: 0,
results, results,
caps: Vec::new(), listener_handle,
listener_handle: listen_fut, caps,
exit_tx, exit_tx,
greeting, greeting,
} }
@ -129,7 +134,7 @@ where
let resp = self.execute(Command::Starttls).await?; let resp = self.execute(Command::Starttls).await?;
debug!("server response to starttls: {:?}", resp); debug!("server response to starttls: {:?}", resp);
debug!("sending exit ()"); debug!("sending exit for upgrade");
self.exit_tx.send(()).await?; self.exit_tx.send(()).await?;
let reader = self.listener_handle.await??; let reader = self.listener_handle.await??;
let writer = self.conn; let writer = self.conn;
@ -145,6 +150,7 @@ where
let tls_config = TlsConnector::from(Arc::new(tls_config)); let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap(); let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap();
let stream = tls_config.connect(dnsname, conn).await?; let stream = tls_config.connect(dnsname, conn).await?;
debug!("upgraded, stream is using TLS now");
Ok(Client::new(stream, self.config)) Ok(Client::new(stream, self.config))
} }
@ -156,6 +162,7 @@ impl Future for GreetingWaiter {
type Output = (); type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let (state, waker) = &mut *self.0.write(); let (state, waker) = &mut *self.0.write();
debug!("g {:?}", state);
if waker.is_none() { if waker.is_none() {
*waker = Some(cx.waker().clone()); *waker = Some(cx.waker().clone());
} }
@ -192,6 +199,7 @@ impl<'a, C> Future for ExecWaiter<'a, C> {
/// Main listen loop for the application /// Main listen loop for the application
async fn listen<C>( async fn listen<C>(
conn: C, conn: C,
caps: CapsLock,
results: ResultMap, results: ResultMap,
mut exit: mpsc::Receiver<()>, mut exit: mpsc::Receiver<()>,
greeting: GreetingState, greeting: GreetingState,
@ -212,7 +220,11 @@ where
match future::select(fut, fut2).await { match future::select(fut, fut2).await {
Either::Left((res, _)) => { Either::Left((res, _)) => {
res.context("read failed")?; let bytes = res.context("read failed")?;
if bytes == 0 {
bail!("connection probably died");
}
debug!("got a new line {:?}", next_line); debug!("got a new line {:?}", next_line);
let (_, resp) = match crate::parser::parse_response(next_line.as_bytes()) { let (_, resp) = match crate::parser::parse_response(next_line.as_bytes()) {
Ok(v) => v, Ok(v) => v,
@ -221,39 +233,80 @@ where
continue; continue;
} }
}; };
let resp = Response::from(resp);
debug!("parsed as: {:?}", resp);
let next_line = next_line.trim_end_matches('\n').trim_end_matches('\r');
let mut parts = next_line.split(" "); if let Some(greeting) = greeting.take() {
let tag = parts.next().unwrap(); let (greeting, waker) = &mut *greeting.write();
let rest = parts.collect::<Vec<_>>().join(" "); debug!("received greeting!");
*greeting = true;
if tag == "*" { if let Some(waker) = waker.take() {
debug!("UNTAGGED {:?}", rest); waker.wake();
// TODO: verify that the greeting is actually an OK
if let Some(greeting) = greeting.take() {
let (greeting, waker) = &mut *greeting.write();
debug!("got greeting");
*greeting = true;
if let Some(waker) = waker.take() {
waker.wake();
}
}
} else if tag.starts_with(TAG_PREFIX) {
let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
debug!("set {} to {:?}", id, rest);
let mut results = results.write();
if let Some((c, w)) = results.get_mut(&id) {
// *c = Some(rest.to_string());
*c = Some(resp);
if let Some(waker) = w.take() {
waker.wake();
}
} }
} }
let resp = Response::from(resp);
debug!("resp: {:?}", resp);
match &resp {
Response::Capabilities(new_caps)
| Response::Data {
status: Status::Ok,
code: Some(ResponseCode::Capabilities(new_caps)),
..
} => {
let caps = &mut *caps.write();
*caps = Some(new_caps.clone());
debug!("new caps: {:?}", caps);
}
Response::Done { tag, .. } => {
let tag_str = &tag.0;
if tag_str.starts_with(TAG_PREFIX) {
let id = tag_str.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
let mut results = results.write();
if let Some((c, waker)) = results.get_mut(&id) {
*c = Some(resp);
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
}
_ => todo!("unhandled response: {:?}", resp),
}
// debug!("parsed as: {:?}", resp);
// let next_line = next_line.trim_end_matches('\n').trim_end_matches('\r');
// let mut parts = next_line.split(" ");
// let tag = parts.next().unwrap();
// let rest = parts.collect::<Vec<_>>().join(" ");
// if tag == "*" {
// debug!("UNTAGGED {:?}", rest);
// // TODO: verify that the greeting is actually an OK
// if let Some(greeting) = greeting.take() {
// let (greeting, waker) = &mut *greeting.write();
// debug!("got greeting");
// *greeting = true;
// if let Some(waker) = waker.take() {
// waker.wake();
// }
// }
// } else if tag.starts_with(TAG_PREFIX) {
// let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
// debug!("set {} to {:?}", id, rest);
// let mut results = results.write();
// if let Some((c, w)) = results.get_mut(&id) {
// // *c = Some(rest.to_string());
// *c = Some(resp);
// if let Some(waker) = w.take() {
// waker.wake();
// }
// }
// }
} }
Either::Right((_, _)) => { Either::Right((_, _)) => {
debug!("exiting read loop"); debug!("exiting read loop");
break; break;

View file

@ -1,7 +1,7 @@
//! Mail //! Mail
use anyhow::Result; use anyhow::Result;
use futures::stream::StreamExt; use futures::{future::FutureExt, stream::StreamExt};
use panorama_imap::{ use panorama_imap::{
client::{ClientBuilder, ClientConfig}, client::{ClientBuilder, ClientConfig},
command::Command as ImapCommand, command::Command as ImapCommand,
@ -46,11 +46,15 @@ pub async fn run_mail(
for acct in config.mail_accounts.into_iter() { for acct in config.mail_accounts.into_iter() {
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
debug!("opening imap connection for {:?}", acct); debug!("opening imap connection for {:?}", acct);
match imap_main(acct).await { loop {
Ok(_) => {} match imap_main(acct.clone()).await {
Err(err) => { Ok(_) => {}
error!("IMAP Error: {}", err); Err(err) => {
error!("IMAP Error: {}", err);
}
} }
warn!("connection dropped, retrying");
} }
}); });
curr_conn.push(handle); curr_conn.push(handle);
@ -91,7 +95,7 @@ async fn imap_main(acct: MailAccountConfig) -> Result<()> {
// let result = unauth.capabilities().await?; // let result = unauth.capabilities().await?;
loop { loop {
tokio::time::sleep(std::time::Duration::from_secs(10)).await; tokio::time::sleep(std::time::Duration::from_secs(60)).await;
debug!("heartbeat"); debug!("heartbeat");
} }
} }

View file

@ -33,8 +33,8 @@ async fn main() -> Result<()> {
let mut logger = fern::Dispatch::new() let mut logger = fern::Dispatch::new()
.format(move |out, message, record| { .format(move |out, message, record| {
out.finish(format_args!( out.finish(format_args!(
"{}[{}][{}] {}", "[{}][{}] {}",
chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"), // chrono::Local::now().format("[%Y-%m-%d][%H:%M:%S]"),
record.target(), record.target(),
colors.color(record.level()), colors.color(record.level()),
message message