panorama/imap/src/client/client.rs

313 lines
9.9 KiB
Rust
Raw Normal View History

2021-08-23 06:52:20 +00:00
use std::ops::Range;
use std::pin::Pin;
use std::task::{Context, Poll};
2021-08-08 03:33:37 +00:00
use anyhow::Result;
use futures::{
future::{self, FutureExt},
stream::{Stream, StreamExt},
};
2021-08-09 22:06:49 +00:00
use panorama_proto_common::Bytes;
2021-08-09 06:28:54 +00:00
use tokio::net::TcpStream;
2021-08-09 05:36:10 +00:00
use tokio_rustls::client::TlsStream;
2021-08-08 03:33:37 +00:00
use crate::proto::{
command::{
Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems,
2021-08-23 06:52:20 +00:00
SearchCriteria, Sequence,
},
response::{
Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, Response,
ResponseCode, Status,
},
};
2021-08-08 03:33:37 +00:00
2021-08-09 11:29:48 +00:00
use super::auth::AuthMethod;
use super::inner::Inner;
use super::response_stream::ResponseStream;
use super::tls::wrap_tls;
2021-08-08 03:33:37 +00:00
/// An IMAP client that hasn't been connected yet.
#[derive(Builder, Clone, Debug)]
#[builder(build_fn(private))]
pub struct Config {
2021-08-08 03:33:37 +00:00
/// The hostname of the IMAP server. If using TLS, must be an address
pub hostname: String,
2021-08-08 03:33:37 +00:00
/// The port of the IMAP server.
pub port: u16,
2021-08-08 03:33:37 +00:00
/// Whether or not the client is using an encrypted stream.
///
/// To upgrade the connection later, use the upgrade method.
2021-08-09 05:36:10 +00:00
#[builder(default = "true")]
pub tls: bool,
2021-08-09 05:36:10 +00:00
/// Whether or not to verify hostname
#[builder(default = "true")]
pub verify_hostname: bool,
2021-08-08 03:33:37 +00:00
}
2021-08-09 06:28:54 +00:00
impl Config {
pub fn builder() -> ConfigBuilder { ConfigBuilder::default() }
}
impl ConfigBuilder {
2021-08-09 05:36:10 +00:00
pub async fn open(&self) -> Result<ClientUnauthenticated> {
let config = self.build()?;
let hostname = config.hostname.as_ref();
let port = config.port;
trace!("connecting to {}:{}...", hostname, port);
2021-08-08 03:33:37 +00:00
let conn = TcpStream::connect((hostname, port)).await?;
trace!("connected.");
2021-08-08 03:33:37 +00:00
if config.tls {
let conn = wrap_tls(conn, hostname).await?;
let mut inner = Inner::new(conn, config).await?;
2021-08-09 11:29:48 +00:00
2021-08-08 03:33:37 +00:00
inner.wait_for_greeting().await?;
2021-08-09 11:29:48 +00:00
debug!("received greeting");
2021-08-08 03:33:37 +00:00
return Ok(ClientUnauthenticated::Encrypted(inner));
} else {
let mut inner = Inner::new(conn, config).await?;
2021-08-09 11:29:48 +00:00
2021-08-08 03:33:37 +00:00
inner.wait_for_greeting().await?;
2021-08-09 11:29:48 +00:00
debug!("received greeting");
2021-08-08 03:33:37 +00:00
return Ok(ClientUnauthenticated::Unencrypted(inner));
}
}
}
2021-08-09 06:28:54 +00:00
/// A client that hasn't been authenticated.
2021-08-08 03:33:37 +00:00
pub enum ClientUnauthenticated {
Encrypted(Inner<TlsStream<TcpStream>>),
Unencrypted(Inner<TcpStream>),
2021-08-08 03:33:37 +00:00
}
impl ClientUnauthenticated {
pub async fn upgrade(self) -> Result<ClientUnauthenticated> {
match self {
// this is a no-op, we don't need to upgrade
ClientUnauthenticated::Encrypted(_) => Ok(self),
ClientUnauthenticated::Unencrypted(e) => {
Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?))
}
}
}
2021-08-09 11:29:48 +00:00
pub async fn auth(self, auth: impl AuthMethod) -> Result<ClientAuthenticated> {
match self {
// this is a no-op, we don't need to upgrade
ClientUnauthenticated::Encrypted(mut inner) => {
auth.perform_auth(&mut inner).await?;
Ok(ClientAuthenticated::Encrypted(inner))
}
ClientUnauthenticated::Unencrypted(mut inner) => {
auth.perform_auth(&mut inner).await?;
Ok(ClientAuthenticated::Unencrypted(inner))
}
}
}
2021-08-09 05:36:10 +00:00
client_expose!(async execute(cmd: Command) -> Result<ResponseStream>);
client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>);
2021-08-08 03:33:37 +00:00
}
2021-08-09 06:28:54 +00:00
/// A client that has been authenticated.
2021-08-08 03:33:37 +00:00
pub enum ClientAuthenticated {
Encrypted(Inner<TlsStream<TcpStream>>),
Unencrypted(Inner<TcpStream>),
2021-08-08 03:33:37 +00:00
}
impl ClientAuthenticated {
2021-08-09 05:36:10 +00:00
client_expose!(async execute(cmd: Command) -> Result<ResponseStream>);
client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>);
2021-08-08 03:33:37 +00:00
2021-08-23 07:50:13 +00:00
pub async fn search(&mut self) -> Result<()> {
let cmd = Command::Examine;
let res = self.execute(cmd).await?;
let (done, data) = res.wait().await?;
println!("done = {:?}", done);
println!("data = {:?}", data);
Ok(())
}
2021-08-08 03:33:37 +00:00
/// Runs the LIST command
pub async fn list(&mut self) -> Result<Vec<Mailbox>> {
let cmd = Command::List(CommandList {
2021-08-09 05:36:10 +00:00
reference: Bytes::from(""),
mailbox: Bytes::from("*"),
});
2021-08-08 03:33:37 +00:00
let res = self.execute(cmd).await?;
let (_, data) = res.wait().await?;
let mut folders = Vec::new();
for resp in data {
if let Response::MailboxData(MailboxData::List(MailboxList { mailbox, .. })) = resp {
folders.push(mailbox);
2021-08-08 03:33:37 +00:00
}
}
Ok(folders)
}
/// Runs the SELECT command
pub async fn select(&mut self, mailbox: impl AsRef<str>) -> Result<SelectResponse> {
let cmd = Command::Select(CommandSelect {
2021-08-09 05:36:10 +00:00
mailbox: Bytes::from(mailbox.as_ref().to_owned()),
});
2021-08-08 03:33:37 +00:00
let stream = self.execute(cmd).await?;
let (_, data) = stream.wait().await?;
let mut select = SelectResponse::default();
for resp in data {
match resp {
Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags,
Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists),
Response::MailboxData(MailboxData::Recent(recent)) => select.recent = Some(recent),
Response::Tagged(
_,
Condition {
status: Status::Ok,
code: Some(code),
..
},
2021-08-23 07:50:13 +00:00
)
| Response::Condition(Condition {
status: Status::Ok,
code: Some(code),
..
}) => match code {
2021-08-08 03:33:37 +00:00
ResponseCode::Unseen(value) => select.unseen = Some(value),
ResponseCode::UidNext(value) => select.uid_next = Some(value),
ResponseCode::UidValidity(value) => select.uid_validity = Some(value),
_ => {}
},
2021-08-23 07:50:13 +00:00
_ => warn!("unknown response {:?}", resp),
2021-08-08 03:33:37 +00:00
}
}
Ok(select)
}
/// Runs the SEARCH command
pub async fn uid_search(&mut self) -> Result<Vec<u32>> {
let cmd = Command::UidSearch(CommandSearch {
2021-08-08 03:33:37 +00:00
criteria: SearchCriteria::All,
});
2021-08-08 03:33:37 +00:00
let stream = self.execute(cmd).await?;
let (_, data) = stream.wait().await?;
for resp in data {
if let Response::MailboxData(MailboxData::Search(uids)) = resp {
return Ok(uids);
}
}
bail!("could not find the SEARCH response")
}
/// Runs the FETCH command
pub async fn fetch(
&mut self,
uids: &[u32],
2021-08-23 06:52:20 +00:00
uid_seqs: &[Range<u32>],
2021-08-08 03:33:37 +00:00
items: FetchItems,
) -> Result<impl Stream<Item = (u32, Vec<MessageAttribute>)>> {
2021-08-23 06:52:20 +00:00
let mut ids = Vec::new();
for uid in uids {
ids.push(Sequence::Single(*uid));
}
for seq in uid_seqs {
ids.push(Sequence::Range(seq.start, seq.end));
}
let cmd = Command::Fetch(CommandFetch { ids, items });
debug!("fetch: {:?}", cmd);
2021-08-08 03:33:37 +00:00
let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(),
}))
}
/// Runs the UID FETCH command
pub async fn uid_fetch(
&mut self,
uids: &[u32],
2021-08-23 06:52:20 +00:00
uid_seqs: &[Range<u32>],
2021-08-08 03:33:37 +00:00
items: FetchItems,
) -> Result<impl Stream<Item = (u32, Vec<MessageAttribute>)>> {
2021-08-23 06:52:20 +00:00
let mut ids = Vec::new();
for uid in uids {
ids.push(Sequence::Single(*uid));
}
for seq in uid_seqs {
ids.push(Sequence::Range(seq.start, seq.end));
}
let cmd = Command::UidFetch(CommandFetch { ids, items });
debug!("uid fetch: {:?}", cmd);
2021-08-08 03:33:37 +00:00
let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(),
}))
}
/// Runs the IDLE command
2021-08-09 05:42:41 +00:00
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
2021-08-09 11:50:19 +00:00
pub async fn idle(&mut self) -> Result<IdleToken> {
2021-08-08 03:33:37 +00:00
let cmd = Command::Idle;
let stream = self.execute(cmd).await?;
2021-08-09 11:50:32 +00:00
Ok(IdleToken { stream })
2021-08-08 03:33:37 +00:00
}
}
#[derive(Debug, Default)]
pub struct SelectResponse {
pub flags: Vec<Flag>,
2021-08-08 03:33:37 +00:00
pub exists: Option<u32>,
pub recent: Option<u32>,
pub uid_next: Option<u32>,
pub uid_validity: Option<u32>,
pub unseen: Option<u32>,
}
/// A token that represents an idling connection.
///
/// Dropping this token indicates that the idling should be completed, and the
/// DONE command will be sent to the server as a result.
2021-08-09 05:42:41 +00:00
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
2021-08-09 11:50:19 +00:00
pub struct IdleToken {
2021-08-08 03:33:37 +00:00
pub stream: ResponseStream,
2021-08-09 06:28:54 +00:00
// sender: mpsc::UnboundedSender<TaggedCommand>,
2021-08-08 03:33:37 +00:00
}
2021-08-09 05:42:41 +00:00
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
2021-08-09 11:50:19 +00:00
impl Drop for IdleToken {
2021-08-08 03:33:37 +00:00
fn drop(&mut self) {
2021-08-09 06:28:54 +00:00
// TODO: put this into a channel instead
// tokio::spawn(self.client.execute(Command::Done));
2021-08-08 03:33:37 +00:00
}
}
2021-08-09 05:42:41 +00:00
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
2021-08-09 11:50:19 +00:00
impl Stream for IdleToken {
2021-08-08 03:33:37 +00:00
type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let stream = Pin::new(&mut self.stream);
Stream::poll_next(stream, cx)
}
}