This commit is contained in:
Michael Zhang 2021-03-02 19:45:45 -06:00
parent 7f7a4a115a
commit 94ead04391
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
6 changed files with 110 additions and 65 deletions

View file

@ -1,7 +1,7 @@
use anyhow::Result; use anyhow::Result;
use crate::command::Command; use crate::command::Command;
use crate::response::{Response, Status}; use crate::response::{Response, ResponseDone, Status};
use super::{ClientAuthenticated, ClientUnauthenticated}; use super::{ClientAuthenticated, ClientUnauthenticated};
@ -37,10 +37,10 @@ impl Auth for Plain {
if !matches!( if !matches!(
result, result,
Response::Done { Response::Done(ResponseDone {
status: Status::Ok, status: Status::Ok,
.. ..
} })
) { ) {
bail!("unable to login: {:?}", result); bail!("unable to login: {:?}", result);
} }

View file

@ -23,7 +23,7 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::Command; use crate::command::Command;
use crate::parser::{parse_capability, parse_response}; use crate::parser::{parse_capability, parse_response};
use crate::response::{Capability, Response, ResponseCode, Status}; use crate::response::{Capability, Response, ResponseCode, ResponseData, ResponseDone, Status};
use super::ClientConfig; use super::ClientConfig;
@ -35,6 +35,7 @@ type ResultQueue = Arc<RwLock<VecDeque<HandlerResult>>>;
pub type GreetingState = Arc<RwLock<(Option<Response>, Option<Waker>)>>; pub type GreetingState = Arc<RwLock<(Option<Response>, Option<Waker>)>>;
pub const TAG_PREFIX: &str = "ptag"; pub const TAG_PREFIX: &str = "ptag";
#[derive(Debug)]
struct HandlerResult { struct HandlerResult {
id: usize, id: usize,
end: Option<oneshot::Sender<Response>>, end: Option<oneshot::Sender<Response>>,
@ -137,17 +138,8 @@ where
debug!("EX[{}]: send the command to the server", id); debug!("EX[{}]: send the command to the server", id);
let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd); let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd);
// debug!("[{}] writing to socket: {:?}", id, cmd_str);
self.conn.write_all(cmd_str.as_bytes()).await?; self.conn.write_all(cmd_str.as_bytes()).await?;
self.conn.flush().await?; self.conn.flush().await?;
// debug!("[{}] written.", id);
// let resp = ExecWaiter(self, id, false).await;
// let resp = {
// let mut handlers = self.results.write();
// handlers.remove(&id).unwrap().0.unwrap()
// };
// let resp = end_rx.await?;
debug!("EX[{}]: hellosu", id); debug!("EX[{}]: hellosu", id);
let q = self.results.clone(); let q = self.results.clone();
@ -178,14 +170,14 @@ where
.execute(cmd) .execute(cmd)
.await .await
.context("error executing CAPABILITY command")?; .context("error executing CAPABILITY command")?;
let result = result.await?; let _ = result.await?;
debug!("cap resp: {:?}", result);
if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate) if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate)
.filter(|resp| future::ready(matches!(resp, Response::Capabilities(_)))) .filter(|resp| future::ready(matches!(resp, Response::Capabilities(_))))
.next() .next()
.await .await
{ {
debug!("FOUND NEW CAPABILITIES: {:?}", new_caps);
let mut caps = self.caps.write(); let mut caps = self.caps.write();
*caps = Some(new_caps.iter().cloned().collect()); *caps = Some(new_caps.iter().cloned().collect());
} }
@ -292,6 +284,7 @@ where
debug!("[LISTEN] got a new line {:?}", next_line); debug!("[LISTEN] got a new line {:?}", next_line);
let resp = parse_response(next_line)?; let resp = parse_response(next_line)?;
debug!("[LISTEN] parsed as {:?}", resp);
// if this is the very first message, treat it as a greeting // if this is the very first message, treat it as a greeting
if let Some(greeting) = greeting.take() { if let Some(greeting) = greeting.take() {
@ -306,11 +299,11 @@ where
// update capabilities list // update capabilities list
// TODO: probably not really necessary here (done somewhere else)? // TODO: probably not really necessary here (done somewhere else)?
if let Response::Capabilities(new_caps) if let Response::Capabilities(new_caps)
| Response::Data { | Response::Data(ResponseData {
status: Status::Ok, status: Status::Ok,
code: Some(ResponseCode::Capabilities(new_caps)), code: Some(ResponseCode::Capabilities(new_caps)),
.. ..
} = &resp }) = &resp
{ {
let caps = &mut *caps.write(); let caps = &mut *caps.write();
*caps = Some(new_caps.iter().cloned().collect()); *caps = Some(new_caps.iter().cloned().collect());
@ -318,43 +311,24 @@ where
} }
match &resp { match &resp {
Response::Data {
status: Status::Ok, ..
} => {
let mut results = results.write();
if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() {
// we don't really care if it fails to send
// this just means that the other side has dropped the channel
//
// which is fine since that just means they don't care about
// intermediate messages
let _ = sender.send(resp);
debug!("[LISTEN] pushed to intermediate for id {}", id);
}
std::mem::drop(results);
debug!("[LISTEN] << unlocked self.results");
}
// bye // bye
Response::Data { Response::Data(ResponseData {
status: Status::Bye, status: Status::Bye,
.. ..
} => { }) => {
bail!("disconnected"); bail!("disconnected");
} }
Response::Done { tag, .. } => { Response::Done(ResponseDone { tag, .. }) => {
if tag.starts_with(TAG_PREFIX) { if tag.starts_with(TAG_PREFIX) {
// let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?; // let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
debug!("[LISTEN] Done: {:?}", tag);
let mut results = results.write(); let mut results = results.write();
if let Some(HandlerResult { if let Some(HandlerResult { end, waker, .. }) =
end: ref mut opt, results.iter_mut().next()
waker,
..
}) = results.iter_mut().next()
{ {
if let Some(opt) = opt.take() { if let Some(end) = end.take() {
opt.send(resp).unwrap(); end.send(resp).unwrap();
} }
// *opt = Some(resp); // *opt = Some(resp);
if let Some(waker) = waker.take() { if let Some(waker) = waker.take() {
@ -364,7 +338,20 @@ where
} }
} }
_ => {} _ => {
debug!("[LISTEN] RESPONSE: {:?}", resp);
let mut results = results.write();
if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() {
// we don't really care if it fails to send
// this just means that the other side has dropped the channel
//
// which is fine since that just means they don't care about
// intermediate messages
let _ = sender.send(resp);
debug!("[LISTEN] pushed to intermediate for id {}", id);
debug!("[LISTEN] res: {:?}", results);
}
} // _ => {}
} }
} }

View file

@ -39,6 +39,10 @@ mod inner;
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use futures::{
future::{self, Either, FutureExt},
stream::StreamExt,
};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_rustls::{ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
@ -46,7 +50,7 @@ use tokio_rustls::{
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::command::Command; use crate::command::Command;
use crate::response::Response; use crate::response::{Response, ResponseData, ResponseDone};
pub use self::inner::{Client, ResponseFuture, ResponseStream}; pub use self::inner::{Client, ResponseFuture, ResponseStream};
@ -56,7 +60,7 @@ pub use self::inner::{Client, ResponseFuture, ResponseStream};
/// the connection to the server. /// the connection to the server.
/// ///
/// [1]: self::ClientConfigBuilder::build /// [1]: self::ClientConfigBuilder::build
/// [2]: self::ClientConfig::open /// [2]: self::ClientConfig::connect
pub type ClientBuilder = ClientConfigBuilder; pub type ClientBuilder = ClientConfigBuilder;
/// An IMAP client that hasn't been connected yet. /// An IMAP client that hasn't been connected yet.
@ -133,6 +137,12 @@ impl ClientUnauthenticated {
} }
} }
#[derive(Debug)]
pub struct ResponseCombined {
pub data: Vec<Response>,
pub done: ResponseDone,
}
pub enum ClientAuthenticated { pub enum ClientAuthenticated {
Encrypted(Client<TlsStream<TcpStream>>), Encrypted(Client<TlsStream<TcpStream>>),
Unencrypted(Client<TcpStream>), Unencrypted(Client<TcpStream>),
@ -147,13 +157,47 @@ impl ClientAuthenticated {
} }
} }
/// A wrapper around `execute` that waits for the response and returns a combined data
/// structure containing the intermediate results as well as the final status
async fn execute_combined(&mut self, cmd: Command) -> Result<ResponseCombined> {
let (resp, mut stream) = self.execute(cmd).await?;
let mut resp = resp.into_stream(); // turn into stream to avoid mess with returning futures from select
let mut data = Vec::new();
debug!("[COMBI] loop");
let done = loop {
let fut1 = resp.next().fuse();
let fut2 = stream.recv().fuse();
pin_mut!(fut1);
pin_mut!(fut2);
match future::select(fut1, fut2).await {
Either::Left((Some(Ok(Response::Done(done))), _)) => {
debug!("[COMBI] left: {:?}", done);
break done;
}
Either::Left(_) => unreachable!("got non-Response::Done from listen!"),
Either::Right((Some(resp), _)) => {
debug!("[COMBI] right: {:?}", resp);
data.push(resp);
}
Either::Right(_) => unreachable!(),
}
};
Ok(ResponseCombined { data, done })
}
/// Runs the LIST command /// Runs the LIST command
pub async fn list(&mut self) -> Result<Vec<String>> { pub async fn list(&mut self) -> Result<Vec<String>> {
let cmd = Command::List { let cmd = Command::List {
reference: "".to_owned(), reference: "".to_owned(),
mailbox: "*".to_owned(), mailbox: "*".to_owned(),
}; };
let (mut resp, mut st) = self.execute(cmd).await?;
let res = self.execute_combined(cmd).await?;
debug!("res: {:?}", res);
todo!() todo!()
// let mut folders = Vec::new(); // let mut folders = Vec::new();
@ -187,14 +231,22 @@ impl ClientAuthenticated {
debug!("ST: {:?}", st.recv().await); debug!("ST: {:?}", st.recv().await);
let resp = resp.await?; let resp = resp.await?;
debug!("select response: {:?}", resp); debug!("select response: {:?}", resp);
// nuke the capabilities cache
self.nuke_capabilities();
Ok(()) Ok(())
} }
/// Runs the SELECT command /// Runs the IDLE command
#[cfg(feature = "rfc2177-idle")] #[cfg(feature = "rfc2177-idle")]
pub async fn idle(&mut self) -> Result<UnboundedReceiverStream<Response>> { pub async fn idle(&mut self) -> Result<UnboundedReceiverStream<Response>> {
let cmd = Command::Idle; let cmd = Command::Idle;
let (_, stream) = self.execute(cmd).await?; let (_, stream) = self.execute(cmd).await?;
Ok(UnboundedReceiverStream::new(stream)) Ok(UnboundedReceiverStream::new(stream))
} }
fn nuke_capabilities(&mut self) {
// TODO: do something here
}
} }

View file

@ -48,12 +48,12 @@ fn build_response(pair: Pair<Rule>) -> Response {
let pair = pairs.next().unwrap(); let pair = pairs.next().unwrap();
let (status, code, information) = build_resp_cond_state(pair); let (status, code, information) = build_resp_cond_state(pair);
Response::Done { Response::Done(ResponseDone {
tag, tag,
status, status,
code, code,
information, information,
} })
} }
_ => unreachable!("{:#?}", pair), _ => unreachable!("{:#?}", pair),
} }
@ -64,11 +64,11 @@ fn build_response(pair: Pair<Rule>) -> Response {
match pair.as_rule() { match pair.as_rule() {
Rule::resp_cond_state => { Rule::resp_cond_state => {
let (status, code, information) = build_resp_cond_state(pair); let (status, code, information) = build_resp_cond_state(pair);
Response::Data { Response::Data(ResponseData {
status, status,
code, code,
information, information,
} })
} }
Rule::mailbox_data => Response::MailboxData(build_mailbox_data(pair)), Rule::mailbox_data => Response::MailboxData(build_mailbox_data(pair)),
Rule::capability_data => Response::Capabilities(build_capabilities(pair)), Rule::capability_data => Response::Capabilities(build_capabilities(pair)),

View file

@ -9,17 +9,8 @@ pub enum Response {
code: Option<ResponseCode>, code: Option<ResponseCode>,
information: Option<String>, information: Option<String>,
}, },
Done { Done(ResponseDone),
tag: String, Data(ResponseData),
status: Status,
code: Option<ResponseCode>,
information: Option<String>,
},
Data {
status: Status,
code: Option<ResponseCode>,
information: Option<String>,
},
Expunge(u32), Expunge(u32),
Vanished { Vanished {
earlier: bool, earlier: bool,
@ -29,6 +20,21 @@ pub enum Response {
MailboxData(MailboxData), MailboxData(MailboxData),
} }
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ResponseData {
pub status: Status,
pub code: Option<ResponseCode>,
pub information: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ResponseDone {
pub tag: String,
pub status: Status,
pub code: Option<ResponseCode>,
pub information: Option<String>,
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)] #[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub enum Capability { pub enum Capability {
Imap4rev1, Imap4rev1,

View file

@ -136,7 +136,7 @@ async fn imap_main(acct: MailAccountConfig, mail2ui_tx: UnboundedSender<MailEven
debug!("listing all mailboxes..."); debug!("listing all mailboxes...");
let folder_list = authed.list().await?; let folder_list = authed.list().await?;
debug!("mailbox list: {:?}", folder_list); debug!("mailbox list: {:?}", folder_list);
mail2ui_tx.send(MailEvent::FolderList(folder_list)); let _ = mail2ui_tx.send(MailEvent::FolderList(folder_list));
let mut idle_stream = authed.idle().await?; let mut idle_stream = authed.idle().await?;