Revamp inner client loop and connect folder list to UI

This commit is contained in:
Michael Zhang 2021-03-06 18:56:35 -06:00
parent 94ead04391
commit 4c989e0991
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
9 changed files with 261 additions and 403 deletions

View file

@ -6,3 +6,9 @@ doc-open:
watch: watch:
cargo watch -x 'clippy --all --all-features' cargo watch -x 'clippy --all --all-features'
run:
cargo run -- --log-file output.log
tail:
tail -f output.log

View file

@ -1,4 +1,5 @@
use anyhow::Result; use anyhow::Result;
use futures::stream::StreamExt;
use crate::command::Command; use crate::command::Command;
use crate::response::{Response, ResponseDone, Status}; use crate::response::{Response, ResponseDone, Status};
@ -11,6 +12,8 @@ pub trait Auth {
// TODO: return the unauthed client if failed? // TODO: return the unauthed client if failed?
async fn perform_auth(self, client: ClientUnauthenticated) -> Result<ClientAuthenticated>; async fn perform_auth(self, client: ClientUnauthenticated) -> Result<ClientAuthenticated>;
/// Converts the wrappers around the client once the authentication has happened. Should only
/// be called by the `perform_auth` function.
fn convert_client(client: ClientUnauthenticated) -> ClientAuthenticated { fn convert_client(client: ClientUnauthenticated) -> ClientAuthenticated {
match client { match client {
ClientUnauthenticated::Encrypted(e) => ClientAuthenticated::Encrypted(e), ClientUnauthenticated::Encrypted(e) => ClientAuthenticated::Encrypted(e),
@ -32,19 +35,26 @@ impl Auth for Plain {
password: self.password, password: self.password,
}; };
let (result, _) = client.execute(command).await?; let result = client.execute(command).await?;
let result = result.await?; let done = result.done().await?;
if !matches!( assert!(done.is_some());
result, let done = done.unwrap();
Response::Done(ResponseDone {
status: Status::Ok, if done.status != Status::Ok {
.. bail!("unable to login: {:?}", done);
})
) {
bail!("unable to login: {:?}", result);
} }
// if !matches!(
// result,
// Response::Done(ResponseDone {
// status: Status::Ok,
// ..
// })
// ) {
// bail!("unable to login: {:?}", result);
// }
Ok(<Self as Auth>::convert_client(client)) Ok(<Self as Auth>::convert_client(client))
} }
} }

View file

@ -1,19 +1,21 @@
use std::collections::{HashSet, VecDeque};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll};
use anyhow::{Context as AnyhowContext, Error, Result}; use anyhow::Result;
use futures::{ use futures::{
future::{self, Either, Future, FutureExt, TryFutureExt}, future::{self, Either, FutureExt, TryFutureExt},
stream::StreamExt, stream::{Peekable, Stream, StreamExt},
}; };
use parking_lot::RwLock; use parking_lot::Mutex;
use tokio::{ use tokio::{
io::{ io::{
self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf,
}, },
sync::{mpsc, oneshot}, sync::{
mpsc,
oneshot::{self, error::TryRecvError},
},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_rustls::{ use tokio_rustls::{
@ -23,169 +25,89 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::Command; use crate::command::Command;
use crate::parser::{parse_capability, parse_response}; use crate::parser::{parse_capability, parse_response};
use crate::response::{Capability, Response, ResponseCode, ResponseData, ResponseDone, Status}; use crate::response::{Response, ResponseDone};
use super::ClientConfig; use super::ClientConfig;
pub type CapsLock = Arc<RwLock<Option<HashSet<Capability>>>>;
pub type ResponseFuture = Box<dyn Future<Output = Result<Response>> + Send + Unpin>;
pub type ResponseSender = mpsc::UnboundedSender<Response>;
pub type ResponseStream = mpsc::UnboundedReceiver<Response>;
type ResultQueue = Arc<RwLock<VecDeque<HandlerResult>>>;
pub type GreetingState = Arc<RwLock<(Option<Response>, Option<Waker>)>>;
pub const TAG_PREFIX: &str = "ptag"; pub const TAG_PREFIX: &str = "ptag";
type Command2 = (Command, mpsc::UnboundedSender<Response>);
#[derive(Debug)]
struct HandlerResult {
id: usize,
end: Option<oneshot::Sender<Response>>,
sender: ResponseSender,
waker: Option<Waker>,
}
/// The lower-level Client struct, that is shared by all of the exported structs in the state machine.
pub struct Client<C> { pub struct Client<C> {
ctr: usize,
config: ClientConfig, config: ClientConfig,
/// write half of the connection
conn: WriteHalf<C>, conn: WriteHalf<C>,
cmd_tx: mpsc::UnboundedSender<Command2>,
/// counter for monotonically incrementing unique ids greeting_rx: Option<oneshot::Receiver<()>>,
id: usize, exit_tx: oneshot::Sender<()>,
results: ResultQueue,
/// cached set of capabilities
caps: CapsLock,
/// join handle for the listener thread
listener_handle: JoinHandle<Result<ReadHalf<C>>>, listener_handle: JoinHandle<Result<ReadHalf<C>>>,
/// used for telling the listener thread to stop and return the read half
exit_tx: mpsc::Sender<()>,
/// used for receiving the greeting
greeting: GreetingState,
} }
impl<C> Client<C> impl<C> Client<C>
where where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
/// Creates a new client that wraps a connection
pub fn new(conn: C, config: ClientConfig) -> Self { pub fn new(conn: C, config: ClientConfig) -> Self {
let (read_half, write_half) = io::split(conn); let (read_half, write_half) = io::split(conn);
let results = Arc::new(RwLock::new(VecDeque::new())); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (exit_tx, exit_rx) = mpsc::channel(1); let (greeting_tx, greeting_rx) = oneshot::channel();
let greeting = Arc::new(RwLock::new((None, None))); let (exit_tx, exit_rx) = oneshot::channel();
let caps: CapsLock = Arc::new(RwLock::new(None)); let handle = tokio::spawn(
listen(read_half, cmd_rx, greeting_tx, exit_rx).map_err(|err| {
let listener_handle = tokio::spawn(
listen(
read_half,
caps.clone(),
results.clone(),
exit_rx,
greeting.clone(),
)
.map_err(|err| {
error!("Help, the listener loop died: {}", err); error!("Help, the listener loop died: {}", err);
err err
}), }),
); );
Client { Client {
config, ctr: 0,
conn: write_half, conn: write_half,
id: 0, config,
results, cmd_tx,
listener_handle, greeting_rx: Some(greeting_rx),
caps,
exit_tx, exit_tx,
greeting, listener_handle: handle,
} }
} }
/// Returns a future that doesn't resolve until we receive a greeting from the server. pub async fn wait_for_greeting(&mut self) -> Result<()> {
pub fn wait_for_greeting(&self) -> GreetingWaiter { if let Some(greeting_rx) = self.greeting_rx.take() {
debug!("waiting for greeting"); greeting_rx.await?;
GreetingWaiter(self.greeting.clone()) }
Ok(())
} }
/// Sends a command to the server and returns a handle to retrieve the result pub async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
pub async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> { let id = self.ctr;
// debug!("executing command {:?}", cmd); self.ctr += 1;
let id = self.id;
self.id += 1;
// create a channel for sending the final response
let (end_tx, end_rx) = oneshot::channel();
// create a channel for sending responses for this particular client call
// this should queue up responses
let (tx, rx) = mpsc::unbounded_channel();
debug!("EX[{}]: adding handler result to the handlers queue", id);
{
let mut handlers = self.results.write();
handlers.push_back(HandlerResult {
id,
end: Some(end_tx),
sender: tx,
waker: None,
});
}
debug!("EX[{}]: send the command to the server", id);
let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd); let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd);
self.conn.write_all(cmd_str.as_bytes()).await?; self.conn.write_all(cmd_str.as_bytes()).await?;
self.conn.flush().await?; self.conn.flush().await?;
debug!("EX[{}]: hellosu", id); let (tx, rx) = mpsc::unbounded_channel();
let q = self.results.clone(); self.cmd_tx.send((cmd, tx))?;
// let end = Box::new(end_rx.map_err(|err| Error::from).map(move |resp| resp));
let end = Box::new(end_rx.map_err(Error::from).map(move |resp| {
debug!("EX[{}]: -end result- {:?}", id, resp);
// pop the first entry from the list
let mut results = q.write();
results.pop_front();
resp
}));
Ok((end, rx)) Ok(ResponseStream { inner: rx })
} }
/// Executes the CAPABILITY command pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
pub async fn capabilities(&mut self, force: bool) -> Result<()> { // TODO: cache capabilities if needed?
{ let cap = cap.as_ref();
let caps = self.caps.read(); let cap = parse_capability(cap)?;
if caps.is_some() && !force {
return Ok(()); let resp = self.execute(Command::Capability).await?;
let (_, data) = resp.wait().await?;
for resp in data {
if let Response::Capabilities(caps) = resp {
return Ok(caps.contains(&cap));
} }
// debug!("cap: {:?}", resp);
} }
let cmd = Command::Capability; Ok(false)
// debug!("sending: {:?} {:?}", cmd, cmd.to_string());
let (result, intermediate) = self
.execute(cmd)
.await
.context("error executing CAPABILITY command")?;
let _ = result.await?;
if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate)
.filter(|resp| future::ready(matches!(resp, Response::Capabilities(_))))
.next()
.await
{
debug!("FOUND NEW CAPABILITIES: {:?}", new_caps);
let mut caps = self.caps.write();
*caps = Some(new_caps.iter().cloned().collect());
} }
Ok(())
}
/// Attempts to upgrade this connection using STARTTLS
pub async fn upgrade(mut self) -> Result<Client<TlsStream<C>>> { pub async fn upgrade(mut self) -> Result<Client<TlsStream<C>>> {
// TODO: make sure STARTTLS is in the capability list // TODO: make sure STARTTLS is in the capability list
if !self.has_capability("STARTTLS").await? { if !self.has_capability("STARTTLS").await? {
@ -193,17 +115,17 @@ where
} }
// first, send the STARTTLS command // first, send the STARTTLS command
let (resp, _) = self.execute(Command::Starttls).await?; let mut resp = self.execute(Command::Starttls).await?;
let resp = resp.await?; let resp = resp.next().await.unwrap();
debug!("server response to starttls: {:?}", resp); debug!("server response to starttls: {:?}", resp);
debug!("sending exit for upgrade"); debug!("sending exit for upgrade");
self.exit_tx.send(()).await?; // TODO: check that the channel is still open?
self.exit_tx.send(()).unwrap();
let reader = self.listener_handle.await??; let reader = self.listener_handle.await??;
let writer = self.conn; let writer = self.conn;
let conn = reader.unsplit(writer); let conn = reader.unsplit(writer);
let server_name = &self.config.hostname; let server_name = &self.config.hostname;
let mut tls_config = RustlsConfig::new(); let mut tls_config = RustlsConfig::new();
@ -217,148 +139,107 @@ where
Ok(Client::new(stream, self.config)) Ok(Client::new(stream, self.config))
} }
}
/// Check if this client has a particular capability pub struct ResponseStream {
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> { inner: mpsc::UnboundedReceiver<Response>,
let cap = cap.as_ref().to_owned(); }
debug!("checking for the capability: {:?}", cap);
let cap = parse_capability(cap)?;
self.capabilities(false).await?; impl ResponseStream {
let caps = self.caps.read(); /// Retrieves just the DONE item in the stream, discarding the rest
// TODO: refresh caps pub async fn done(mut self) -> Result<Option<ResponseDone>> {
while let Some(resp) = self.inner.recv().await {
if let Response::Done(done) = resp {
return Ok(Some(done));
}
}
Ok(None)
}
let caps = caps.as_ref().unwrap(); /// Waits for the entire stream to finish, returning the DONE status and the stream
let result = caps.contains(&cap); pub async fn wait(mut self) -> Result<(Option<ResponseDone>, Vec<Response>)> {
debug!("cap result: {:?}", result); let mut done = None;
Ok(result) let mut vec = Vec::new();
while let Some(resp) = self.inner.recv().await {
if let Response::Done(d) = resp {
done = Some(d);
break;
} else {
vec.push(resp);
}
}
Ok((done, vec))
} }
} }
pub struct GreetingWaiter(GreetingState); impl Stream for ResponseStream {
type Item = Response;
impl Future for GreetingWaiter { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
type Output = Response; self.inner.poll_recv(cx)
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let (state, waker) = &mut *self.0.write();
debug!("g {:?}", state);
if waker.is_none() {
*waker = Some(cx.waker().clone());
}
match state.take() {
Some(v) => Poll::Ready(v),
None => Poll::Pending,
}
} }
} }
/// Main listen loop for the application #[allow(unreachable_code)]
async fn listen<C>( async fn listen<C>(
conn: C, conn: ReadHalf<C>,
caps: CapsLock, mut cmd_rx: mpsc::UnboundedReceiver<Command2>,
results: ResultQueue, greeting_tx: oneshot::Sender<()>,
mut exit: mpsc::Receiver<()>, mut exit_rx: oneshot::Receiver<()>,
greeting: GreetingState, ) -> Result<ReadHalf<C>>
) -> Result<C>
where where
C: AsyncRead + Unpin, C: AsyncRead + Unpin,
{ {
// debug!("amogus");
let mut reader = BufReader::new(conn); let mut reader = BufReader::new(conn);
let mut greeting = Some(greeting); let mut greeting_tx = Some(greeting_tx);
let mut curr_cmd: Option<Command2> = None;
let mut exit_rx = exit_rx.map_err(|_| ()).shared();
// let mut exit_fut = Some(exit_rx.fuse());
// let mut fut1 = None;
loop { loop {
let mut next_line = String::new(); let mut next_line = String::new();
let fut = reader.read_line(&mut next_line).fuse(); let read_fut = reader.read_line(&mut next_line).fuse();
pin_mut!(fut); pin_mut!(read_fut);
let fut2 = exit.recv().fuse();
pin_mut!(fut2);
match future::select(fut, fut2).await { // only listen for a new command if there isn't one already
Either::Left((res, _)) => { let mut cmd_fut = if let Some(_) = curr_cmd {
let bytes = res.context("read failed")?; // if there is one, just make a future that never resolves so it'll always pick the
if bytes == 0 { // other options in the select.
bail!("connection probably died"); future::pending().boxed().fuse()
} } else {
cmd_rx.recv().boxed().fuse()
};
debug!("[LISTEN] got a new line {:?}", next_line); select! {
let resp = parse_response(next_line)?; _ = exit_rx => {
debug!("[LISTEN] parsed as {:?}", resp); debug!("exiting the loop");
// if this is the very first message, treat it as a greeting
if let Some(greeting) = greeting.take() {
let (greeting, waker) = &mut *greeting.write();
debug!("[LISTEN] received greeting!");
*greeting = Some(resp.clone());
if let Some(waker) = waker.take() {
waker.wake();
}
}
// update capabilities list
// TODO: probably not really necessary here (done somewhere else)?
if let Response::Capabilities(new_caps)
| Response::Data(ResponseData {
status: Status::Ok,
code: Some(ResponseCode::Capabilities(new_caps)),
..
}) = &resp
{
let caps = &mut *caps.write();
*caps = Some(new_caps.iter().cloned().collect());
debug!("new caps: {:?}", caps);
}
match &resp {
// bye
Response::Data(ResponseData {
status: Status::Bye,
..
}) => {
bail!("disconnected");
}
Response::Done(ResponseDone { tag, .. }) => {
if tag.starts_with(TAG_PREFIX) {
// let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
debug!("[LISTEN] Done: {:?}", tag);
let mut results = results.write();
if let Some(HandlerResult { end, waker, .. }) =
results.iter_mut().next()
{
if let Some(end) = end.take() {
end.send(resp).unwrap();
}
// *opt = Some(resp);
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
}
_ => {
debug!("[LISTEN] RESPONSE: {:?}", resp);
let mut results = results.write();
if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() {
// we don't really care if it fails to send
// this just means that the other side has dropped the channel
//
// which is fine since that just means they don't care about
// intermediate messages
let _ = sender.send(resp);
debug!("[LISTEN] pushed to intermediate for id {}", id);
debug!("[LISTEN] res: {:?}", results);
}
} // _ => {}
}
}
Either::Right((_, _)) => {
debug!("exiting read loop");
break; break;
} }
cmd = cmd_fut => {
if curr_cmd.is_none() {
curr_cmd = cmd;
}
}
len = read_fut => {
// res should not be None here
let resp = parse_response(next_line)?;
// if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() {
greeting_tx.send(());
}
if let Response::Done(_) = resp {
// since this is the DONE message, clear curr_cmd so another one can be sent
if let Some((_, cmd_tx)) = curr_cmd.take() {
cmd_tx.send(resp)?;
}
} else if let Some((ref cmd, ref mut cmd_tx)) = curr_cmd {
cmd_tx.send(resp)?;
}
}
} }
} }

View file

@ -50,9 +50,9 @@ use tokio_rustls::{
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::Command; use crate::command::Command;
use crate::response::{Response, ResponseData, ResponseDone}; use crate::response::{MailboxData, Response, ResponseData, ResponseDone};
pub use self::inner::{Client, ResponseFuture, ResponseStream}; pub use self::inner::{Client, ResponseStream};
/// Struct used to start building the config for a client. /// Struct used to start building the config for a client.
/// ///
@ -93,12 +93,12 @@ impl ClientConfig {
let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap(); let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap();
let conn = tls_config.connect(dnsname, conn).await?; let conn = tls_config.connect(dnsname, conn).await?;
let inner = Client::new(conn, self); let mut inner = Client::new(conn, self);
inner.wait_for_greeting().await; inner.wait_for_greeting().await?;
return Ok(ClientUnauthenticated::Encrypted(inner)); return Ok(ClientUnauthenticated::Encrypted(inner));
} else { } else {
let inner = Client::new(conn, self); let mut inner = Client::new(conn, self);
inner.wait_for_greeting().await; inner.wait_for_greeting().await?;
return Ok(ClientUnauthenticated::Unencrypted(inner)); return Ok(ClientUnauthenticated::Unencrypted(inner));
} }
} }
@ -121,7 +121,7 @@ impl ClientUnauthenticated {
} }
/// Exposing low-level execute /// Exposing low-level execute
async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> { async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
match self { match self {
ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await, ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await,
ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await, ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await,
@ -137,12 +137,6 @@ impl ClientUnauthenticated {
} }
} }
#[derive(Debug)]
pub struct ResponseCombined {
pub data: Vec<Response>,
pub done: ResponseDone,
}
pub enum ClientAuthenticated { pub enum ClientAuthenticated {
Encrypted(Client<TlsStream<TcpStream>>), Encrypted(Client<TlsStream<TcpStream>>),
Unencrypted(Client<TcpStream>), Unencrypted(Client<TcpStream>),
@ -150,45 +144,13 @@ pub enum ClientAuthenticated {
impl ClientAuthenticated { impl ClientAuthenticated {
/// Exposing low-level execute /// Exposing low-level execute
async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> { async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
match self { match self {
ClientAuthenticated::Encrypted(e) => e.execute(cmd).await, ClientAuthenticated::Encrypted(e) => e.execute(cmd).await,
ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await, ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await,
} }
} }
/// A wrapper around `execute` that waits for the response and returns a combined data
/// structure containing the intermediate results as well as the final status
async fn execute_combined(&mut self, cmd: Command) -> Result<ResponseCombined> {
let (resp, mut stream) = self.execute(cmd).await?;
let mut resp = resp.into_stream(); // turn into stream to avoid mess with returning futures from select
let mut data = Vec::new();
debug!("[COMBI] loop");
let done = loop {
let fut1 = resp.next().fuse();
let fut2 = stream.recv().fuse();
pin_mut!(fut1);
pin_mut!(fut2);
match future::select(fut1, fut2).await {
Either::Left((Some(Ok(Response::Done(done))), _)) => {
debug!("[COMBI] left: {:?}", done);
break done;
}
Either::Left(_) => unreachable!("got non-Response::Done from listen!"),
Either::Right((Some(resp), _)) => {
debug!("[COMBI] right: {:?}", resp);
data.push(resp);
}
Either::Right(_) => unreachable!(),
}
};
Ok(ResponseCombined { data, done })
}
/// Runs the LIST command /// Runs the LIST command
pub async fn list(&mut self) -> Result<Vec<String>> { pub async fn list(&mut self) -> Result<Vec<String>> {
let cmd = Command::List { let cmd = Command::List {
@ -196,29 +158,17 @@ impl ClientAuthenticated {
mailbox: "*".to_owned(), mailbox: "*".to_owned(),
}; };
let res = self.execute_combined(cmd).await?; let res = self.execute(cmd).await?;
debug!("res: {:?}", res); let (_, data) = res.wait().await?;
todo!()
// let mut folders = Vec::new(); let mut folders = Vec::new();
// loop { for resp in data {
// let st_next = st.recv(); if let Response::MailboxData(MailboxData::List { name, .. }) = resp {
// pin_mut!(st_next); folders.push(name.to_owned());
}
}
// match future::select(resp, st_next).await { Ok(folders)
// Either::Left((v, _)) => {
// break;
// }
// Either::Right((v, _) ) => {
// debug!("RESP: {:?}", v);
// // folders.push(v);
// }
// }
// }
// let resp = resp.await?;
// debug!("list response: {:?}", resp);
} }
/// Runs the SELECT command /// Runs the SELECT command
@ -226,11 +176,12 @@ impl ClientAuthenticated {
let cmd = Command::Select { let cmd = Command::Select {
mailbox: mailbox.as_ref().to_owned(), mailbox: mailbox.as_ref().to_owned(),
}; };
let (resp, mut st) = self.execute(cmd).await?; let mut stream = self.execute(cmd).await?;
// let (resp, mut st) = self.execute(cmd).await?;
debug!("execute called returned..."); debug!("execute called returned...");
debug!("ST: {:?}", st.recv().await); debug!("ST: {:?}", stream.next().await);
let resp = resp.await?; // let resp = resp.await?;
debug!("select response: {:?}", resp); // debug!("select response: {:?}", resp);
// nuke the capabilities cache // nuke the capabilities cache
self.nuke_capabilities(); self.nuke_capabilities();
@ -240,10 +191,10 @@ impl ClientAuthenticated {
/// Runs the IDLE command /// Runs the IDLE command
#[cfg(feature = "rfc2177-idle")] #[cfg(feature = "rfc2177-idle")]
pub async fn idle(&mut self) -> Result<UnboundedReceiverStream<Response>> { pub async fn idle(&mut self) -> Result<ResponseStream> {
let cmd = Command::Idle; let cmd = Command::Idle;
let (_, stream) = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
Ok(UnboundedReceiverStream::new(stream)) Ok(stream)
} }
fn nuke_capabilities(&mut self) { fn nuke_capabilities(&mut self) {

View file

@ -1,7 +1,7 @@
use std::fmt; use std::fmt;
/// Commands, without the tag part. /// Commands, without the tag part.
#[derive(Clone, Debug)] #[derive(Clone)]
pub enum Command { pub enum Command {
Capability, Capability,
Starttls, Starttls,
@ -21,6 +21,22 @@ pub enum Command {
Idle, Idle,
} }
impl fmt::Debug for Command {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use Command::*;
match self {
Capability => write!(f, "CAPABILITY"),
Starttls => write!(f, "STARTTLS"),
Login { .. } => write!(f, "LOGIN"),
Select { mailbox } => write!(f, "SELECT {}", mailbox),
List { reference, mailbox } => write!(f, "LIST {:?} {:?}", reference, mailbox),
#[cfg(feature = "rfc2177-idle")]
Idle => write!(f, "IDLE"),
}
}
}
impl fmt::Display for Command { impl fmt::Display for Command {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
use Command::*; use Command::*;

View file

@ -21,6 +21,7 @@ use tokio_stream::wrappers::WatchStream;
use crate::config::{Config, ConfigWatcher, ImapAuth, MailAccountConfig, TlsMethod}; use crate::config::{Config, ConfigWatcher, ImapAuth, MailAccountConfig, TlsMethod};
/// Command sent to the mail thread by something else (i.e. UI) /// Command sent to the mail thread by something else (i.e. UI)
#[derive(Debug)]
pub enum MailCommand { pub enum MailCommand {
/// Refresh the list /// Refresh the list
Refresh, Refresh,
@ -30,6 +31,7 @@ pub enum MailCommand {
} }
/// Possible events returned from the server that should be sent to the UI /// Possible events returned from the server that should be sent to the UI
#[derive(Debug)]
pub enum MailEvent { pub enum MailEvent {
/// Got the list of folders /// Got the list of folders
FolderList(Vec<String>), FolderList(Vec<String>),

View file

@ -4,7 +4,11 @@ use std::thread;
use anyhow::Result; use anyhow::Result;
use fern::colors::{Color, ColoredLevelConfig}; use fern::colors::{Color, ColoredLevelConfig};
use futures::future::TryFutureExt; use futures::future::TryFutureExt;
use panorama::{config::spawn_config_watcher_system, mail, report_err, ui}; use panorama::{
config::spawn_config_watcher_system,
mail::{self, MailEvent},
report_err, ui,
};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::{ use tokio::{
runtime::{Builder as RuntimeBuilder, Runtime}, runtime::{Builder as RuntimeBuilder, Runtime},
@ -63,7 +67,7 @@ async fn run(opt: Opt) -> Result<()> {
}); });
if !opt.headless { if !opt.headless {
run_ui(exit_tx); run_ui(exit_tx, mail2ui_rx);
} }
exit_rx.recv().await; exit_rx.recv().await;
@ -76,7 +80,7 @@ async fn run(opt: Opt) -> Result<()> {
} }
// Spawns the entire UI in a different thread, since it must be thread-local // Spawns the entire UI in a different thread, since it must be thread-local
fn run_ui(exit_tx: mpsc::Sender<()>) { fn run_ui(exit_tx: mpsc::Sender<()>, mail2ui_rx: mpsc::UnboundedReceiver<MailEvent>) {
let stdout = std::io::stdout(); let stdout = std::io::stdout();
let rt = RuntimeBuilder::new_current_thread() let rt = RuntimeBuilder::new_current_thread()
@ -88,7 +92,9 @@ fn run_ui(exit_tx: mpsc::Sender<()>) {
let localset = LocalSet::new(); let localset = LocalSet::new();
localset.spawn_local(async { localset.spawn_local(async {
ui::run_ui(stdout, exit_tx).unwrap_or_else(report_err).await; ui::run_ui(stdout, exit_tx, mail2ui_rx)
.unwrap_or_else(report_err)
.await;
}); });
rt.block_on(localset); rt.block_on(localset);

View file

@ -1,21 +1,29 @@
use tui::{ use tui::{
buffer::Buffer, buffer::Buffer,
layout::Rect, layout::{Constraint, Direction, Layout, Rect},
widgets::{StatefulWidget, Widget}, style::{Color, Modifier, Style},
widgets::*,
}; };
pub struct MailTabState {} use super::FrameType;
impl MailTabState { pub fn render_mail_tab(f: &mut FrameType, area: Rect, folders: &[String]) {
pub fn new() -> Self { let chunks = Layout::default()
MailTabState {} .direction(Direction::Horizontal)
} .margin(0)
} .constraints([Constraint::Length(20), Constraint::Max(5000)])
.split(area);
pub struct MailTab;
let items = folders
impl StatefulWidget for MailTab { .iter()
type State = MailTabState; .map(|s| ListItem::new(s.to_owned()))
.collect::<Vec<_>>();
fn render(self, rect: Rect, buffer: &mut Buffer, state: &mut Self::State) {}
let dirlist = List::new(items)
.block(Block::default().borders(Borders::NONE))
.style(Style::default().fg(Color::White))
.highlight_style(Style::default().add_modifier(Modifier::ITALIC))
.highlight_symbol(">>");
f.render_widget(dirlist, chunks[0]);
} }

View file

@ -12,6 +12,7 @@ use crossterm::{
event::{self, Event, KeyCode, KeyEvent}, event::{self, Event, KeyCode, KeyEvent},
style, terminal, style, terminal,
}; };
use futures::{future::FutureExt, select, stream::StreamExt};
use tokio::{sync::mpsc, time}; use tokio::{sync::mpsc, time};
use tui::{ use tui::{
backend::CrosstermBackend, backend::CrosstermBackend,
@ -22,70 +23,42 @@ use tui::{
Frame, Terminal, Frame, Terminal,
}; };
use self::mail_tab::{MailTab, MailTabState}; use crate::mail::MailEvent;
// pub(crate) type FrameType<'a> = Frame<'a, CrosstermBackend<Stdout>>; use self::mail_tab::render_mail_tab;
pub(crate) type FrameType<'a, 'b> = Frame<'a, CrosstermBackend<&'b mut Stdout>>;
const FRAME_DURATION: Duration = Duration::from_millis(17); const FRAME_DURATION: Duration = Duration::from_millis(17);
/// Main entrypoint for the UI /// Main entrypoint for the UI
pub async fn run_ui(mut stdout: Stdout, exit_tx: mpsc::Sender<()>) -> Result<()> { pub async fn run_ui(
mut stdout: Stdout,
exit_tx: mpsc::Sender<()>,
mut mail2ui_rx: mpsc::UnboundedReceiver<MailEvent>,
) -> Result<()> {
execute!(stdout, cursor::Hide, terminal::EnterAlternateScreen)?; execute!(stdout, cursor::Hide, terminal::EnterAlternateScreen)?;
terminal::enable_raw_mode()?; terminal::enable_raw_mode()?;
let backend = CrosstermBackend::new(&mut stdout); let backend = CrosstermBackend::new(&mut stdout);
let mut term = Terminal::new(backend)?; let mut term = Terminal::new(backend)?;
let mut mail_state = MailTabState::new(); let mut folders = Vec::<String>::new();
loop { loop {
term.draw(|f| { term.draw(|f| {
let chunks = Layout::default() let chunks = Layout::default()
.direction(Direction::Vertical) .direction(Direction::Vertical)
.margin(0) .margin(0)
.constraints([ .constraints([Constraint::Length(1), Constraint::Max(5000)])
Constraint::Length(1),
Constraint::Max(5000),
// Constraint::Percentage(10),
// Constraint::Percentage(80),
// Constraint::Percentage(10),
])
.split(f.size()); .split(f.size());
// let chunks2 = Layout::default()
// .direction(Direction::Horizontal)
// .margin(0)
// .constraints([
// Constraint::Length(20),
// Constraint::Max(5000),
// //
// ])
// .split(chunks[1]);
// this is the title bar // this is the title bar
let titles = vec!["hellosu"].into_iter().map(Spans::from).collect(); let titles = vec!["hellosu"].into_iter().map(Spans::from).collect();
let tabs = Tabs::new(titles); let tabs = Tabs::new(titles);
f.render_widget(tabs, chunks[0]); f.render_widget(tabs, chunks[0]);
let mail_tab = MailTab; render_mail_tab(f, chunks[1], &folders);
f.render_stateful_widget(mail_tab, chunks[1], &mut mail_state);
// TODO: check active tab
// let items = [
// ListItem::new("Osu"),
// ListItem::new("Game").style(Style::default().add_modifier(Modifier::BOLD)),
// ];
// let dirlist = List::new(items)
// .block(Block::default().title("List").borders(Borders::ALL))
// .style(Style::default().fg(Color::White))
// .highlight_style(Style::default().add_modifier(Modifier::ITALIC))
// .highlight_symbol(">>");
// f.render_widget(dirlist, chunks2[0]);
// let block = Block::default().title("Block").borders(Borders::ALL);
// f.render_widget(block, chunks2[1]);
// let block = Block::default().title("Block 2").borders(Borders::ALL);
// f.render_widget(block, chunks[1]);
})?; })?;
let event = if event::poll(FRAME_DURATION)? { let event = if event::poll(FRAME_DURATION)? {
@ -105,17 +78,22 @@ pub async fn run_ui(mut stdout: Stdout, exit_tx: mpsc::Sender<()>) -> Result<()>
None None
}; };
// approx 60fps select! {
time::sleep(FRAME_DURATION).await; mail_evt = mail2ui_rx.recv().fuse() => {
debug!("received mail event: {:?}", mail_evt);
// TODO: handle case that channel is closed later
let mail_evt = mail_evt.unwrap();
// if let Event::Input(input) = events.next()? { match mail_evt {
// match input { MailEvent::FolderList(new_folders) => {
// Key::Char('q') => { folders = new_folders;
// break; }
// } }
// _ => {} }
// }
// } // approx 60fps
_ = time::sleep(FRAME_DURATION).fuse() => {}
}
} }
mem::drop(term); mem::drop(term);