Modify the inner client to allow streaming intermediate content instead of just returning the last element

This commit is contained in:
Michael Zhang 2021-02-26 00:03:23 -06:00
parent d310bbe10e
commit aa796e533e
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
11 changed files with 149 additions and 85 deletions

1
Cargo.lock generated
View file

@ -1192,6 +1192,7 @@ dependencies = [
"pest_derive", "pest_derive",
"tokio", "tokio",
"tokio-rustls", "tokio-rustls",
"tokio-stream",
"webpki-roots", "webpki-roots",
] ]

View file

@ -25,6 +25,7 @@ pest = "2.1.3"
pest_derive = "2.1.0" pest_derive = "2.1.0"
tokio = { version = "1.1.1", features = ["full"] } tokio = { version = "1.1.1", features = ["full"] }
tokio-rustls = "0.22.0" tokio-rustls = "0.22.0"
tokio-stream = "0.1.3"
webpki-roots = "0.21.0" webpki-roots = "0.21.0"
[dev-dependencies] [dev-dependencies]

View file

@ -31,7 +31,9 @@ impl Auth for Plain {
username: self.username, username: self.username,
password: self.password, password: self.password,
}; };
let (result, _) = client.execute(command).await?; let (result, _) = client.execute(command).await?;
let result = result.await?;
if !matches!( if !matches!(
result, result,

View file

@ -4,39 +4,60 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll, Waker};
use anyhow::{Context as AnyhowContext, Result}; use anyhow::{Context as AnyhowContext, Error, Result};
use futures::future::{self, Either, Future, FutureExt}; use futures::{
future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt},
stream::{BoxStream, Stream, StreamExt, TryStream},
};
use genawaiter::{
sync::{gen, Gen},
yield_,
};
use parking_lot::{RwLock, RwLockWriteGuard}; use parking_lot::{RwLock, RwLockWriteGuard};
use tokio::{ use tokio::{
io::{ io::{
self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf,
}, },
sync::mpsc, sync::{mpsc, oneshot},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_rustls::{ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::Command; use crate::command::Command;
use crate::parser::{parse_capability, parse_response}; use crate::parser::{parse_capability, parse_response};
use crate::response::{Capability, Response, ResponseCode, Status}; use crate::response::{Capability, Response, ResponseCode, Status};
// use crate::types::{Capability as Capability_, Status};
use super::ClientConfig; use super::ClientConfig;
pub type CapsLock = Arc<RwLock<Option<HashSet<Capability>>>>; pub type CapsLock = Arc<RwLock<Option<HashSet<Capability>>>>;
pub type ResultMap = Arc<RwLock<VecDeque<(usize, Option<Response>, Vec<Response>, Option<Waker>)>>>; pub type ResponseFuture = Box<dyn Future<Output = Result<Response>> + Send + Unpin>;
pub type ResponseSender = mpsc::UnboundedSender<Response>;
pub type ResponseStream = mpsc::UnboundedReceiver<Response>;
type ResultQueue = Arc<RwLock<VecDeque<HandlerResult>>>;
pub type GreetingState = Arc<RwLock<(Option<Response>, Option<Waker>)>>; pub type GreetingState = Arc<RwLock<(Option<Response>, Option<Waker>)>>;
pub const TAG_PREFIX: &str = "panorama"; pub const TAG_PREFIX: &str = "panorama";
struct HandlerResult {
id: usize,
end: Option<oneshot::Sender<Response>>,
sender: ResponseSender,
waker: Option<Waker>,
}
/// The lower-level Client struct, that is shared by all of the exported structs in the state machine. /// The lower-level Client struct, that is shared by all of the exported structs in the state machine.
pub struct Client<C> { pub struct Client<C> {
config: ClientConfig, config: ClientConfig,
/// write half of the connection
conn: WriteHalf<C>, conn: WriteHalf<C>,
/// counter for monotonically incrementing unique ids
id: usize, id: usize,
results: ResultMap,
results: ResultQueue,
/// cached set of capabilities /// cached set of capabilities
caps: CapsLock, caps: CapsLock,
@ -90,13 +111,26 @@ where
} }
/// Sends a command to the server and returns a handle to retrieve the result /// Sends a command to the server and returns a handle to retrieve the result
pub async fn execute(&mut self, cmd: Command) -> Result<(Response, Vec<Response>)> { pub async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> {
debug!("executing command {:?}", cmd); // debug!("executing command {:?}", cmd);
let id = self.id; let id = self.id;
self.id += 1; self.id += 1;
// create a channel for sending the final response
let (end_tx, end_rx) = oneshot::channel();
// create a channel for sending responses for this particular client call
// this should queue up responses
let (tx, rx) = mpsc::unbounded_channel();
{ {
let mut handlers = self.results.write(); let mut handlers = self.results.write();
handlers.push_back((id, None, vec![], None)); handlers.push_back(HandlerResult {
id,
end: Some(end_tx),
sender: tx,
waker: None,
});
} }
let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd); let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd);
@ -104,13 +138,24 @@ where
self.conn.write_all(cmd_str.as_bytes()).await?; self.conn.write_all(cmd_str.as_bytes()).await?;
self.conn.flush().await?; self.conn.flush().await?;
// debug!("[{}] written.", id); // debug!("[{}] written.", id);
// let resp = ExecWaiter(self, id, false).await;
let resp = ExecWaiter(self, id, false).await;
// let resp = { // let resp = {
// let mut handlers = self.results.write(); // let mut handlers = self.results.write();
// handlers.remove(&id).unwrap().0.unwrap() // handlers.remove(&id).unwrap().0.unwrap()
// }; // };
Ok(resp)
// let resp = end_rx.await?;
let q = self.results.clone();
// let end = Box::new(end_rx.map_err(|err| Error::from).map(move |resp| resp));
let end = Box::new(end_rx.map_err(Error::from).map(move | resp | {
// pop the first entry from the list
let mut results = q.write();
results.pop_front();
resp
}));
Ok((end, rx))
} }
/// Executes the CAPABILITY command /// Executes the CAPABILITY command
@ -123,16 +168,18 @@ where
} }
let cmd = Command::Capability; let cmd = Command::Capability;
debug!("sending: {:?} {:?}", cmd, cmd.to_string()); // debug!("sending: {:?} {:?}", cmd, cmd.to_string());
let (result, intermediate) = self let (result, intermediate) = self
.execute(cmd) .execute(cmd)
.await .await
.context("error executing CAPABILITY command")?; .context("error executing CAPABILITY command")?;
let result = result.await?;
debug!("cap resp: {:?}", result); debug!("cap resp: {:?}", result);
if let Some(Response::Capabilities(new_caps)) = intermediate if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate)
.iter() .filter(|resp| future::ready(matches!(resp, Response::Capabilities(_))))
.find(|resp| matches!(resp, Response::Capabilities(_))) .next()
.await
{ {
let mut caps = self.caps.write(); let mut caps = self.caps.write();
*caps = Some(new_caps.iter().cloned().collect()); *caps = Some(new_caps.iter().cloned().collect());
@ -149,7 +196,8 @@ where
} }
// first, send the STARTTLS command // first, send the STARTTLS command
let resp = self.execute(Command::Starttls).await?; let (resp, _) = self.execute(Command::Starttls).await?;
let resp = resp.await?;
debug!("server response to starttls: {:?}", resp); debug!("server response to starttls: {:?}", resp);
debug!("sending exit for upgrade"); debug!("sending exit for upgrade");
@ -208,44 +256,55 @@ impl Future for GreetingWaiter {
} }
} }
pub struct ExecWaiter<'a, C>(&'a Client<C>, usize, bool); // pub struct ExecWaiter<'a, C>(&'a Client<C>, usize, bool);
//
impl<'a, C> Future for ExecWaiter<'a, C> { // impl<'a, C> Future for ExecWaiter<'a, C> {
type Output = (Response, Vec<Response>); // type Output = (Response, ResponseStream);
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> { // fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
// add the waker // // add the waker
let mut results = self.0.results.write(); // let mut results = self.0.results.write();
if !self.2 { // if !self.2 {
if let Some((_, _, _, waker_ref)) = // if let Some(HandlerResult {
results.iter_mut().find(|(id, _, _, _)| *id == self.1) // waker: waker_ref, ..
{ // }) = results
let waker = cx.waker().clone(); // .iter_mut()
*waker_ref = Some(waker); // .find(|HandlerResult { id, .. }| *id == self.1)
self.2 = true; // {
} // let waker = cx.waker().clone();
} // *waker_ref = Some(waker);
// self.2 = true;
// if this struct exists then there's definitely at least one entry // }
let (id, last_response, _, _) = &results[0]; // }
if *id != self.1 || last_response.is_none() { //
return Poll::Pending; // // if this struct exists then there's definitely at least one entry
} // let HandlerResult {
// id,
let (_, last_response, intermediate_responses, _) = results.pop_front().unwrap(); // end: last_response,
mem::drop(results); // ..
// } = &results[0];
Poll::Ready(( // if *id != self.1 || last_response.is_none() {
last_response.expect("already checked"), // return Poll::Pending;
intermediate_responses, // }
)) //
} // let HandlerResult {
} // end: last_response,
// stream: intermediate_responses,
// ..
// } = results.pop_front().unwrap();
// mem::drop(results);
//
// Poll::Ready((
// last_response.expect("already checked"),
// intermediate_responses,
// ))
// }
// }
/// Main listen loop for the application /// Main listen loop for the application
async fn listen<C>( async fn listen<C>(
conn: C, conn: C,
caps: CapsLock, caps: CapsLock,
results: ResultMap, results: ResultQueue,
mut exit: mpsc::Receiver<()>, mut exit: mpsc::Receiver<()>,
greeting: GreetingState, greeting: GreetingState,
) -> Result<C> ) -> Result<C>
@ -302,9 +361,9 @@ where
status: Status::Ok, .. status: Status::Ok, ..
} => { } => {
let mut results = results.write(); let mut results = results.write();
if let Some((_, _, intermediate, _)) = results.iter_mut().next() { if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() {
debug!("pushed to intermediate: {:?}", resp); debug!("pushed to intermediate for id {}: {:?}", id, resp);
intermediate.push(resp); sender.send(resp)?;
} }
} }
@ -320,8 +379,16 @@ where
if tag.starts_with(TAG_PREFIX) { if tag.starts_with(TAG_PREFIX) {
// let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?; // let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
let mut results = results.write(); let mut results = results.write();
if let Some((_, opt, _, waker)) = results.iter_mut().next() { if let Some(HandlerResult {
*opt = Some(resp); end: ref mut opt,
waker,
..
}) = results.iter_mut().next()
{
if let Some(opt) = opt.take() {
opt.send(resp).unwrap();
}
// *opt = Some(resp);
if let Some(waker) = waker.take() { if let Some(waker) = waker.take() {
waker.wake(); waker.wake();
} }

View file

@ -39,17 +39,16 @@ mod inner;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use genawaiter::{sync::gen, yield_};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use futures::stream::Stream;
use tokio_rustls::{ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::Command; use crate::command::Command;
use crate::response::Response; use crate::response::Response;
pub use self::inner::Client; pub use self::inner::{Client, ResponseFuture, ResponseStream};
/// Struct used to start building the config for a client. /// Struct used to start building the config for a client.
/// ///
@ -118,7 +117,7 @@ impl ClientUnauthenticated {
} }
/// Exposing low-level execute /// Exposing low-level execute
async fn execute(&mut self, cmd: Command) -> Result<(Response, Vec<Response>)> { async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> {
match self { match self {
ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await, ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await,
ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await, ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await,
@ -141,7 +140,7 @@ pub enum ClientAuthenticated {
impl ClientAuthenticated { impl ClientAuthenticated {
/// Exposing low-level execute /// Exposing low-level execute
async fn execute(&mut self, cmd: Command) -> Result<(Response, Vec<Response>)> { async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> {
match self { match self {
ClientAuthenticated::Encrypted(e) => e.execute(cmd).await, ClientAuthenticated::Encrypted(e) => e.execute(cmd).await,
ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await, ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await,
@ -154,7 +153,8 @@ impl ClientAuthenticated {
reference: "".to_owned(), reference: "".to_owned(),
mailbox: "*".to_owned(), mailbox: "*".to_owned(),
}; };
let resp = self.execute(cmd).await?; let (resp, stream) = self.execute(cmd).await?;
let resp = resp.await?;
debug!("list response: {:?}", resp); debug!("list response: {:?}", resp);
Ok(()) Ok(())
} }
@ -164,23 +164,17 @@ impl ClientAuthenticated {
let cmd = Command::Select { let cmd = Command::Select {
mailbox: mailbox.as_ref().to_owned(), mailbox: mailbox.as_ref().to_owned(),
}; };
let resp = self.execute(cmd).await?; let (resp, stream) = self.execute(cmd).await?;
let resp = resp.await?;
debug!("select response: {:?}", resp); debug!("select response: {:?}", resp);
Ok(()) Ok(())
} }
/// Runs the SELECT command /// Runs the SELECT command
#[cfg(feature = "rfc2177-idle")] #[cfg(feature = "rfc2177-idle")]
pub fn idle(&mut self) -> impl Stream<Item = ()> { pub async fn idle(&mut self) -> Result<UnboundedReceiverStream<Response>> {
gen!({ let cmd = Command::Idle;
loop { let (_, stream) = self.execute(cmd).await?;
tokio::time::sleep(std::time::Duration::from_secs(10)).await; Ok(UnboundedReceiverStream::new(stream))
yield_!(());
}
})
// let cmd = Command::Idle;
// let resp = self.execute(cmd).await?;
// debug!("idle response: {:?}", resp);
// Ok(())
} }
} }

View file

@ -1,7 +1,7 @@
use std::fmt; use std::fmt;
/// Commands, without the tag part. /// Commands, without the tag part.
#[derive(Clone, Debug)] #[derive(Clone)]
pub enum Command { pub enum Command {
Capability, Capability,
Starttls, Starttls,

View file

@ -130,7 +130,7 @@ async fn start_inotify_stream(
debug!("reading config from {:?}", path_c); debug!("reading config from {:?}", path_c);
let config = read_config(path_c).await.context("read")?; let config = read_config(path_c).await.context("read")?;
debug!("sending config {:?}", config); // debug!("sending config {:?}", config);
config_tx.send(config)?; config_tx.send(config)?;
} }
} }

View file

@ -1,7 +1,10 @@
//! Mail //! Mail
use anyhow::Result; use anyhow::Result;
use futures::{future::FutureExt, stream::StreamExt}; use futures::{
future::FutureExt,
stream::{Stream, StreamExt},
};
use panorama_imap::{ use panorama_imap::{
client::{ client::{
auth::{self, Auth}, auth::{self, Auth},
@ -49,7 +52,7 @@ pub async fn run_mail(
for acct in config.mail_accounts.into_iter() { for acct in config.mail_accounts.into_iter() {
let handle = tokio::spawn(async move { let handle = tokio::spawn(async move {
debug!("opening imap connection for {:?}", acct); // debug!("opening imap connection for {:?}", acct);
loop { loop {
match imap_main(acct.clone()).await { match imap_main(acct.clone()).await {
Ok(_) => {} Ok(_) => {}
@ -110,7 +113,7 @@ async fn imap_main(acct: MailAccountConfig) -> Result<()> {
debug!("listing all emails..."); debug!("listing all emails...");
let folder_tree = authed.list().await?; let folder_tree = authed.list().await?;
let mut idle_stream = authed.idle(); let mut idle_stream = authed.idle().await?;
loop { loop {
idle_stream.next().await; idle_stream.next().await;

View file

@ -1,3 +1 @@
pub trait Drawable { pub trait Drawable {}
}

View file

@ -1,8 +1,8 @@
//! UI //! UI
mod drawable; mod drawable;
mod tabs;
mod table; mod table;
mod tabs;
use std::fmt::Debug; use std::fmt::Debug;
use std::io::Write; use std::io::Write;

View file

@ -1,3 +1 @@
pub struct Tabs { pub struct Tabs {}
}