use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::Result; use futures::{ future::{self, FutureExt}, stream::{Stream, StreamExt}, }; use tokio::{net::TcpStream, sync::mpsc}; use tokio_rustls::{ client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, }; use crate::proto::{ command::{ Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems, SearchCriteria, }, response::{ Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, Response, ResponseCode, Status, }, }; use super::inner::Inner; use super::response_stream::ResponseStream; /// An IMAP client that hasn't been connected yet. #[derive(Builder, Clone, Debug)] #[builder(build_fn(private))] pub struct Config { /// The hostname of the IMAP server. If using TLS, must be an address pub hostname: String, /// The port of the IMAP server. pub port: u16, /// Whether or not the client is using an encrypted stream. /// /// To upgrade the connection later, use the upgrade method. pub tls: bool, } impl ConfigBuilder { pub async fn open(self) -> Result { let config = self.build()?; let hostname = config.hostname.as_ref(); let port = config.port; let conn = TcpStream::connect((hostname, port)).await?; if config.tls { let mut tls_config = RustlsConfig::new(); tls_config .root_store .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); let tls_config = TlsConnector::from(Arc::new(tls_config)); let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap(); let conn = tls_config.connect(dnsname, conn).await?; let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; return Ok(ClientUnauthenticated::Encrypted(inner)); } else { let mut inner = Inner::new(conn, config).await?; inner.wait_for_greeting().await?; return Ok(ClientUnauthenticated::Unencrypted(inner)); } } } pub enum ClientUnauthenticated { Encrypted(Inner>), Unencrypted(Inner), } impl ClientUnauthenticated { pub async fn upgrade(self) -> Result { match self { // this is a no-op, we don't need to upgrade ClientUnauthenticated::Encrypted(_) => Ok(self), ClientUnauthenticated::Unencrypted(e) => { Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?)) } } } client_expose!(async execute<'a>(cmd: Command<'a>) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); } pub enum ClientAuthenticated { Encrypted(Inner>), Unencrypted(Inner), } impl ClientAuthenticated { client_expose!(async execute<'a>(cmd: Command<'a>) -> Result); client_expose!(async has_capability(cap: impl AsRef) -> Result); fn sender(&self) -> mpsc::UnboundedSender { match self { ClientAuthenticated::Encrypted(e) => e.write_tx.clone(), ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(), } } /// Runs the LIST command pub async fn list(&mut self) -> Result> { let cmd = Command::List(CommandList { reference: "", mailbox: "*", }); let res = self.execute(cmd).await?; let (_, data) = res.wait().await?; let mut folders = Vec::new(); for resp in data { if let Response::MailboxData(MailboxData::List(MailboxList { mailbox, .. })) = resp { folders.push(mailbox); } } Ok(folders) } /// Runs the SELECT command pub async fn select(&mut self, mailbox: impl AsRef) -> Result { let cmd = Command::Select(CommandSelect { mailbox: mailbox.as_ref(), }); let stream = self.execute(cmd).await?; let (_, data) = stream.wait().await?; let mut select = SelectResponse::default(); for resp in data { match resp { Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags, Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists), Response::MailboxData(MailboxData::Recent(recent)) => select.recent = Some(recent), Response::Tagged( _, Condition { status: Status::Ok, code: Some(code), .. }, ) => match code { ResponseCode::Unseen(value) => select.unseen = Some(value), ResponseCode::UidNext(value) => select.uid_next = Some(value), ResponseCode::UidValidity(value) => select.uid_validity = Some(value), _ => {} }, _ => {} } } Ok(select) } /// Runs the SEARCH command pub async fn uid_search(&mut self) -> Result> { let cmd = Command::UidSearch(CommandSearch { criteria: SearchCriteria::All, }); let stream = self.execute(cmd).await?; let (_, data) = stream.wait().await?; for resp in data { if let Response::MailboxData(MailboxData::Search(uids)) = resp { return Ok(uids); } } bail!("could not find the SEARCH response") } /// Runs the FETCH command pub async fn fetch( &mut self, uids: &[u32], items: FetchItems, ) -> Result)>> { let cmd = Command::Fetch(CommandFetch { ids: uids.to_vec(), items, }); debug!("fetch: {:?}", cmd); let stream = self.execute(cmd).await?; // let (done, data) = stream.wait().await?; Ok(stream.filter_map(|resp| match resp { Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), Response::Done(_) => future::ready(None).boxed(), _ => future::pending().boxed(), })) } /// Runs the UID FETCH command pub async fn uid_fetch( &mut self, uids: &[u32], items: FetchItems, ) -> Result)>> { let cmd = Command::UidFetch(CommandFetch { ids: uids.to_vec(), items, }); debug!("uid fetch: {:?}", cmd); let stream = self.execute(cmd).await?; // let (done, data) = stream.wait().await?; Ok(stream.filter_map(|resp| match resp { Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), Response::Done(_) => future::ready(None).boxed(), _ => future::pending().boxed(), })) } /// Runs the IDLE command #[cfg(feature = "rfc2177-idle")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] pub async fn idle(&mut self) -> Result { let cmd = Command::Idle; let stream = self.execute(cmd).await?; let sender = self.sender(); Ok(IdleToken { stream, sender }) } } #[derive(Debug, Default)] pub struct SelectResponse { pub flags: Vec, pub exists: Option, pub recent: Option, pub uid_next: Option, pub uid_validity: Option, pub unseen: Option, } /// A token that represents an idling connection. /// /// Dropping this token indicates that the idling should be completed, and the /// DONE command will be sent to the server as a result. #[cfg(feature = "rfc2177-idle")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] pub struct IdleToken { pub stream: ResponseStream, sender: mpsc::UnboundedSender, } #[cfg(feature = "rfc2177-idle")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] impl Drop for IdleToken { fn drop(&mut self) { // TODO: should ignore this? self.sender.send(format!("DONE\r\n")).unwrap(); } } #[cfg(feature = "rfc2177-idle")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] impl Stream for IdleToken { type Item = Response; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { let stream = Pin::new(&mut self.stream); Stream::poll_next(stream, cx) } }