use std::pin::Pin; use std::task::{Context, Poll}; use anyhow::Result; use futures::{ future::{self, FutureExt}, stream::{Stream, StreamExt}, }; use tokio::{net::TcpStream, sync::mpsc}; use tokio_rustls::client::TlsStream; use crate::proto::{ bytes::Bytes, command::{ Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems, SearchCriteria, }, response::{ Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, Response, ResponseCode, Status, }, }; use super::inner::Inner; use super::response_stream::ResponseStream; use super::upgrade::upgrade; /// An IMAP client that hasn't been connected yet. #[derive(Builder, Clone, Debug)] #[builder(build_fn(private))] pub struct Config { /// The hostname of the IMAP server. If using TLS, must be an address pub hostname: String, /// The port of the IMAP server. pub port: u16, /// Whether or not the client is using an encrypted stream. /// /// To upgrade the connection later, use the upgrade method. #[builder(default = "true")] pub tls: bool, /// Whether or not to verify hostname #[builder(default = "true")] pub verify_hostname: bool, } impl ConfigBuilder { pub async fn open(&self) -> Result { let config = self.build()?; let hostname = config.hostname.as_ref(); let port = config.port; let conn = TcpStream::connect((hostname, port)).await?; if config.tls { let conn = upgrade(conn, hostname).await?; let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; return Ok(ClientUnauthenticated::Encrypted(inner)); } else { let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; return Ok(ClientUnauthenticated::Unencrypted(inner)); } } } pub enum ClientUnauthenticated { Encrypted(Inner>), Unencrypted(Inner), } impl ClientUnauthenticated { pub async fn upgrade(self) -> Result { match self { // this is a no-op, we don't need to upgrade ClientUnauthenticated::Encrypted(_) => Ok(self), ClientUnauthenticated::Unencrypted(e) => { Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?)) } } } client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); } pub enum ClientAuthenticated { Encrypted(Inner>), Unencrypted(Inner), } impl ClientAuthenticated { client_expose!(async execute(cmd: Command) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); fn sender(&self) -> mpsc::UnboundedSender { match self { ClientAuthenticated::Encrypted(e) => e.write_tx.clone(), ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(), } } /// Runs the LIST command pub async fn list(&mut self) -> Result> { let cmd = Command::List(CommandList { reference: Bytes::from(""), mailbox: Bytes::from("*"), }); let res = self.execute(cmd).await?; let (_, data) = res.wait().await?; let mut folders = Vec::new(); for resp in data { if let Response::MailboxData(MailboxData::List(MailboxList { mailbox, .. })) = resp { folders.push(mailbox); } } Ok(folders) } /// Runs the SELECT command pub async fn select(&mut self, mailbox: impl AsRef) -> Result { let cmd = Command::Select(CommandSelect { mailbox: Bytes::from(mailbox.as_ref().to_owned()), }); let stream = self.execute(cmd).await?; let (_, data) = stream.wait().await?; let mut select = SelectResponse::default(); for resp in data { match resp { Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags, Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists), Response::MailboxData(MailboxData::Recent(recent)) => select.recent = Some(recent), Response::Tagged( _, Condition { status: Status::Ok, code: Some(code), .. }, ) => match code { ResponseCode::Unseen(value) => select.unseen = Some(value), ResponseCode::UidNext(value) => select.uid_next = Some(value), ResponseCode::UidValidity(value) => select.uid_validity = Some(value), _ => {} }, _ => {} } } Ok(select) } /// Runs the SEARCH command pub async fn uid_search(&mut self) -> Result> { let cmd = Command::UidSearch(CommandSearch { criteria: SearchCriteria::All, }); let stream = self.execute(cmd).await?; let (_, data) = stream.wait().await?; for resp in data { if let Response::MailboxData(MailboxData::Search(uids)) = resp { return Ok(uids); } } bail!("could not find the SEARCH response") } /// Runs the FETCH command pub async fn fetch( &mut self, uids: &[u32], items: FetchItems, ) -> Result)>> { let cmd = Command::Fetch(CommandFetch { ids: uids.to_vec(), items, }); debug!("fetch: {:?}", cmd); let stream = self.execute(cmd).await?; // let (done, data) = stream.wait().await?; Ok(stream.filter_map(|resp| match resp { Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), Response::Done(_) => future::ready(None).boxed(), _ => future::pending().boxed(), })) } /// Runs the UID FETCH command pub async fn uid_fetch( &mut self, uids: &[u32], items: FetchItems, ) -> Result)>> { let cmd = Command::UidFetch(CommandFetch { ids: uids.to_vec(), items, }); debug!("uid fetch: {:?}", cmd); let stream = self.execute(cmd).await?; // let (done, data) = stream.wait().await?; Ok(stream.filter_map(|resp| match resp { Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), Response::Done(_) => future::ready(None).boxed(), _ => future::pending().boxed(), })) } /// Runs the IDLE command #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] pub async fn idle(&mut self) -> Result { let cmd = Command::Idle; let stream = self.execute(cmd).await?; let sender = self.sender(); Ok(IdleToken { stream, sender }) } } #[derive(Debug, Default)] pub struct SelectResponse { pub flags: Vec, pub exists: Option, pub recent: Option, pub uid_next: Option, pub uid_validity: Option, pub unseen: Option, } /// A token that represents an idling connection. /// /// Dropping this token indicates that the idling should be completed, and the /// DONE command will be sent to the server as a result. #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] pub struct IdleToken { pub stream: ResponseStream, sender: mpsc::UnboundedSender, } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] impl Drop for IdleToken { fn drop(&mut self) { // TODO: should ignore this? self.sender.send(format!("DONE\r\n")).unwrap(); } } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] impl Stream for IdleToken { type Item = Response; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let stream = Pin::new(&mut self.stream); Stream::poll_next(stream, cx) } }