use std::ops::Range; use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::Result; use futures::{ future::{self, FutureExt}, stream::{Stream, StreamExt}, }; use panorama_proto_common::Bytes; use tokio::net::TcpStream; use tokio_rustls::client::TlsStream; use crate::proto::{ command::{ Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems, SearchCriteria, Sequence, }, response::{ Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, Response, ResponseCode, Status, }, }; use super::auth::AuthMethod; use super::inner::Inner; use super::response_stream::ResponseStream; use super::tls::wrap_tls; /// An IMAP client that hasn't been connected yet. #[derive(Builder, Clone, Debug)] #[builder(build_fn(private))] pub struct Config { /// The hostname of the IMAP server. If using TLS, must be an address pub hostname: String, /// The port of the IMAP server. pub port: u16, /// Whether or not the client is using an encrypted stream. /// /// To upgrade the connection later, use the upgrade method. #[builder(default = "true")] pub tls: bool, /// Whether or not to verify hostname #[builder(default = "true")] pub verify_hostname: bool, } impl Config { pub fn builder() -> ConfigBuilder { ConfigBuilder::default() } } impl ConfigBuilder { pub async fn open(&self) -> Result { let config = self.build()?; let hostname = config.hostname.as_ref(); let port = config.port; trace!("connecting to {}:{}...", hostname, port); let conn = TcpStream::connect((hostname, port)).await?; trace!("connected."); if config.tls { let conn = wrap_tls(conn, hostname).await?; let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; debug!("received greeting"); return Ok(ClientUnauthenticated::Encrypted(inner)); } else { let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; debug!("received greeting"); return Ok(ClientUnauthenticated::Unencrypted(inner)); } } } /// A client that hasn't been authenticated. pub enum ClientUnauthenticated { Encrypted(Inner>), Unencrypted(Inner), } impl ClientUnauthenticated { pub async fn upgrade(self) -> Result { match self { // this is a no-op, we don't need to upgrade ClientUnauthenticated::Encrypted(_) => Ok(self), ClientUnauthenticated::Unencrypted(e) => { Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?)) } } } pub async fn auth( self, auth: impl AuthMethod, ) -> Result { match self { // this is a no-op, we don't need to upgrade ClientUnauthenticated::Encrypted(mut inner) => { auth.perform_auth(&mut inner).await?; Ok(ClientAuthenticated::Encrypted(inner)) } ClientUnauthenticated::Unencrypted(mut inner) => { auth.perform_auth(&mut inner).await?; Ok(ClientAuthenticated::Unencrypted(inner)) } } } client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); } /// A client that has been authenticated. pub enum ClientAuthenticated { Encrypted(Inner>), Unencrypted(Inner), } impl ClientAuthenticated { client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); pub async fn search(&mut self) -> Result<()> { let cmd = Command::Examine; let res = self.execute(cmd).await?; let (done, data) = res.wait().await?; println!("done = {:?}", done); println!("data = {:?}", data); Ok(()) } /// Runs the LIST command pub async fn list(&mut self) -> Result> { let cmd = Command::List(CommandList { reference: Bytes::from(""), mailbox: Bytes::from("*"), }); let res = self.execute(cmd).await?; let (_, data) = res.wait().await?; let mut folders = Vec::new(); for resp in data { if let Response::MailboxData(MailboxData::List(MailboxList { mailbox, .. })) = resp { folders.push(mailbox); } } Ok(folders) } /// Runs the SELECT command pub async fn select( &mut self, mailbox: impl AsRef, ) -> Result { let cmd = Command::Select(CommandSelect { mailbox: Bytes::from(mailbox.as_ref().to_owned()), }); let stream = self.execute(cmd).await?; let (_, data) = stream.wait().await?; let mut select = SelectResponse::default(); for resp in data { match resp { Response::MailboxData(MailboxData::Flags(flags)) => { select.flags = flags } Response::MailboxData(MailboxData::Exists(exists)) => { select.exists = Some(exists) } Response::MailboxData(MailboxData::Recent(recent)) => { select.recent = Some(recent) } Response::Tagged( _, Condition { status: Status::Ok, code: Some(code), .. }, ) | Response::Condition(Condition { status: Status::Ok, code: Some(code), .. }) => match code { ResponseCode::Unseen(value) => select.unseen = Some(value), ResponseCode::UidNext(value) => { select.uid_next = Some(value) } ResponseCode::UidValidity(value) => { select.uid_validity = Some(value) } _ => {} }, _ => warn!("unknown response {:?}", resp), } } Ok(select) } /// Runs the SEARCH command pub async fn uid_search(&mut self) -> Result> { let cmd = Command::UidSearch(CommandSearch { criteria: SearchCriteria::All, }); let stream = self.execute(cmd).await?; let (_, data) = stream.wait().await?; for resp in data { if let Response::MailboxData(MailboxData::Search(uids)) = resp { return Ok(uids); } } bail!("could not find the SEARCH response") } /// Runs the FETCH command pub async fn fetch( &mut self, uids: &[u32], uid_seqs: &[Range], items: FetchItems, ) -> Result)>> { let mut ids = Vec::new(); for uid in uids { ids.push(Sequence::Single(*uid)); } for seq in uid_seqs { ids.push(Sequence::Range(seq.start, seq.end)); } let cmd = Command::Fetch(CommandFetch { ids, items }); debug!("fetch: {:?}", cmd); let stream = self.execute(cmd).await?; // let (done, data) = stream.wait().await?; Ok(stream.filter_map(|resp| match resp { Response::Fetch(n, attrs) => { future::ready(Some((n, attrs))).boxed() } Response::Done(_) => future::ready(None).boxed(), _ => future::pending().boxed(), })) } /// Runs the UID FETCH command pub async fn uid_fetch( &mut self, uids: &[u32], uid_seqs: &[Range], items: FetchItems, ) -> Result)>> { let mut ids = Vec::new(); for uid in uids { ids.push(Sequence::Single(*uid)); } for seq in uid_seqs { ids.push(Sequence::Range(seq.start, seq.end)); } let cmd = Command::UidFetch(CommandFetch { ids, items }); debug!("uid fetch: {:?}", cmd); let stream = self.execute(cmd).await?; // let (done, data) = stream.wait().await?; Ok(stream.filter_map(|resp| match resp { Response::Fetch(n, attrs) => { future::ready(Some((n, attrs))).boxed() } Response::Done(_) => future::ready(None).boxed(), _ => future::pending().boxed(), })) } /// Runs the IDLE command #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] pub async fn idle(&mut self) -> Result { let cmd = Command::Idle; let stream = self.execute(cmd).await?; Ok(IdleToken { stream }) } } #[derive(Debug, Default)] pub struct SelectResponse { pub flags: Vec, pub exists: Option, pub recent: Option, pub uid_next: Option, pub uid_validity: Option, pub unseen: Option, } /// A token that represents an idling connection. /// /// Dropping this token indicates that the idling should be completed, and the /// DONE command will be sent to the server as a result. #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] pub struct IdleToken { pub stream: ResponseStream, // sender: mpsc::UnboundedSender, } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] impl Drop for IdleToken { fn drop(&mut self) { // TODO: put this into a channel instead // tokio::spawn(self.client.execute(Command::Done)); } } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] impl Stream for IdleToken { type Item = Response; fn poll_next( mut self: Pin<&mut Self>, cx: &mut Context, ) -> Poll> { let stream = Pin::new(&mut self.stream); Stream::poll_next(stream, cx) } }