diff --git a/daemon/src/config/mod.rs b/daemon/src/config/mod.rs index 65059b1..a9f564b 100644 --- a/daemon/src/config/mod.rs +++ b/daemon/src/config/mod.rs @@ -14,50 +14,50 @@ pub use self::watcher::{spawn_config_watcher_system, ConfigWatcher}; /// Configuration #[derive(Default, Serialize, Deserialize, Clone, Debug)] pub struct Config { - /// Version of the config to use - /// (potentially for migration later?) - pub version: String, + /// Version of the config to use + /// (potentially for migration later?) + pub version: String, - /// Directory to store panorama-related data in - pub data_dir: PathBuf, + /// Directory to store panorama-related data in + pub data_dir: PathBuf, - /// Mail accounts - #[serde(rename = "mail")] - pub mail_accounts: HashMap, + /// Mail accounts + #[serde(rename = "mail")] + pub mail_accounts: HashMap, } impl Config { - pub async fn from_file(path: impl AsRef) -> Result { - let mut file = File::open(path.as_ref())?; - let mut contents = Vec::new(); - file.read_to_end(&mut contents)?; - let config = toml::from_slice(&contents)?; - Ok(config) - } + pub async fn from_file(path: impl AsRef) -> Result { + let mut file = File::open(path.as_ref())?; + let mut contents = Vec::new(); + file.read_to_end(&mut contents)?; + let config = toml::from_slice(&contents)?; + Ok(config) + } } /// Configuration for a single mail account #[derive(Serialize, Deserialize, Clone, Debug)] pub struct MailAccountConfig { - /// Imap - pub imap: ImapConfig, + /// Imap + pub imap: ImapConfig, } /// Configuring an IMAP server #[derive(Serialize, Deserialize, Clone, Debug)] pub struct ImapConfig { - /// Host of the IMAP server (needs to be hostname for TLS) - pub server: String, + /// Host of the IMAP server (needs to be hostname for TLS) + pub server: String, - /// Port of the IMAP server - pub port: u16, + /// Port of the IMAP server + pub port: u16, - /// TLS - pub tls: TlsMethod, + /// TLS + pub tls: TlsMethod, - /// Auth - #[serde(flatten)] - pub auth: ImapAuth, + /// Auth + #[serde(flatten)] + pub auth: ImapAuth, } /// Method of authentication for the IMAP server @@ -65,28 +65,28 @@ pub struct ImapConfig { #[derivative(Debug)] #[serde(tag = "auth")] pub enum ImapAuth { - /// Use plain username/password authentication - #[serde(rename = "plain")] - Plain { - username: String, + /// Use plain username/password authentication + #[serde(rename = "plain")] + Plain { + username: String, - #[derivative(Debug = "ignore")] - password: String, - }, + #[derivative(Debug = "ignore")] + password: String, + }, } /// Describes when to perform the TLS handshake #[derive(Serialize, Deserialize, Clone, Debug)] pub enum TlsMethod { - /// Perform TLS handshake immediately upon connection - #[serde(rename = "on")] - On, + /// Perform TLS handshake immediately upon connection + #[serde(rename = "on")] + On, - /// Perform TLS handshake after issuing the STARTTLS command - #[serde(rename = "starttls")] - Starttls, + /// Perform TLS handshake after issuing the STARTTLS command + #[serde(rename = "starttls")] + Starttls, - /// Don't perform TLS handshake at all (unsecured) - #[serde(rename = "off")] - Off, + /// Don't perform TLS handshake at all (unsecured) + #[serde(rename = "off")] + Off, } diff --git a/daemon/src/config/watcher.rs b/daemon/src/config/watcher.rs index 235e235..1e6e32f 100644 --- a/daemon/src/config/watcher.rs +++ b/daemon/src/config/watcher.rs @@ -5,8 +5,8 @@ use std::sync::mpsc as stdmpsc; use anyhow::{Context, Result}; use futures::future::TryFutureExt; use notify::{ - recommended_watcher, Event as NotifyEvent, RecommendedWatcher, - RecursiveMode, Watcher, + recommended_watcher, Event as NotifyEvent, RecommendedWatcher, RecursiveMode, + Watcher, }; use tokio::{sync::watch, task::JoinHandle}; use xdg::BaseDirectories; @@ -20,91 +20,86 @@ pub type ConfigWatcher = watch::Receiver; /// config update events. pub fn spawn_config_watcher_system() -> Result<(JoinHandle<()>, ConfigWatcher)> { - let (tx, rx) = stdmpsc::channel(); - let mut dir_watcher = recommended_watcher(move |res| match res { - Ok(event) => { - tx.send(event).unwrap(); - } - Err(_) => {} - })?; - - let xdg = BaseDirectories::new()?; - let config_home = xdg.get_config_home().join("panorama"); - if !config_home.exists() { - fs::create_dir_all(&config_home)?; + let (tx, rx) = stdmpsc::channel(); + let mut dir_watcher = recommended_watcher(move |res| match res { + Ok(event) => { + tx.send(event).unwrap(); } + Err(_) => {} + })?; - dir_watcher - .watch(&config_home, RecursiveMode::Recursive) - .context("adding watch for config home")?; + let xdg = BaseDirectories::new()?; + let config_home = xdg.get_config_home().join("panorama"); + if !config_home.exists() { + fs::create_dir_all(&config_home)?; + } - debug!("watching {:?}", config_home); - let (config_tx, config_update) = watch::channel(Config::default()); - let handle = tokio::spawn( - start_notify_stream(dir_watcher, rx, config_home, config_tx) - .unwrap_or_else(|_err| todo!()), - ); - Ok((handle, config_update)) + dir_watcher + .watch(&config_home, RecursiveMode::Recursive) + .context("adding watch for config home")?; + + debug!("watching {:?}", config_home); + let (config_tx, config_update) = watch::channel(Config::default()); + let handle = tokio::spawn( + start_notify_stream(dir_watcher, rx, config_home, config_tx) + .unwrap_or_else(|_err| todo!()), + ); + Ok((handle, config_update)) } async fn start_notify_stream( - _watcher: RecommendedWatcher, - rx: stdmpsc::Receiver, - config_home: impl AsRef, - config_tx: watch::Sender, + _watcher: RecommendedWatcher, + rx: stdmpsc::Receiver, + config_home: impl AsRef, + config_tx: watch::Sender, ) -> Result<()> { - let config_home = config_home.as_ref().to_path_buf(); - let config_path = config_home.join("panorama.toml"); + let config_home = config_home.as_ref().to_path_buf(); + let config_path = config_home.join("panorama.toml"); - // first shot - { - let config = Config::from_file(&config_path).await?; - config_tx.send(config)?; - } + // first shot + { + let config = Config::from_file(&config_path).await?; + config_tx.send(config)?; + } - debug!("listening for inotify events"); - loop { - use notify::EventKind; + debug!("listening for inotify events"); + loop { + use notify::EventKind; - let event = rx.recv()?; - debug!("notify event: {:?}", event); + let event = rx.recv()?; + debug!("notify event: {:?}", event); - match event.kind { - EventKind::Create(_) | EventKind::Modify(_) => { - let path_expect = config_home - .clone() - .join("panorama.toml") - .canonicalize() - .context("osu")?; - if !path_expect.exists() { - debug!("path {:?} doesn't exist", path_expect); - continue; - } - - match event.paths.iter().find(|p| *p == &path_expect) { - Some(path) => path.to_path_buf(), - None => continue, - }; - - // TODO: any better way to do this? - let config_path_c = - config_path.canonicalize().context("cfg_path")?; - if config_path_c != path_expect { - debug!( - "did not match {:?} {:?}", - config_path_c, path_expect - ); - continue; - } - - debug!("reading config from {:?}", path_expect); - let config = - Config::from_file(path_expect).await.context("read")?; - // debug!("sending config {:?}", config); - config_tx.send(config)?; - } - - _ => {} + match event.kind { + EventKind::Create(_) | EventKind::Modify(_) => { + let path_expect = config_home + .clone() + .join("panorama.toml") + .canonicalize() + .context("osu")?; + if !path_expect.exists() { + debug!("path {:?} doesn't exist", path_expect); + continue; } + + match event.paths.iter().find(|p| *p == &path_expect) { + Some(path) => path.to_path_buf(), + None => continue, + }; + + // TODO: any better way to do this? + let config_path_c = config_path.canonicalize().context("cfg_path")?; + if config_path_c != path_expect { + debug!("did not match {:?} {:?}", config_path_c, path_expect); + continue; + } + + debug!("reading config from {:?}", path_expect); + let config = Config::from_file(path_expect).await.context("read")?; + // debug!("sending config {:?}", config); + config_tx.send(config)?; + } + + _ => {} } + } } diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index 2a5ab54..53cfb35 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -16,8 +16,8 @@ use anyhow::{Context, Result}; use clap::Parser; use futures::future::FutureExt; use tokio::{ - fs::{self, OpenOptions}, - sync::{mpsc, oneshot}, + fs::{self, OpenOptions}, + sync::{mpsc, oneshot}, }; use xdg::BaseDirectories; @@ -30,89 +30,88 @@ type ExitListener = oneshot::Receiver<()>; /// panorama components over Unix sockets. #[derive(Debug, Parser)] struct Options { - // /// Config file path (defaults to XDG) - // #[clap(long = "config", short = 'c')] - // config_file: Option, - /// Verbose mode (-v, -vv, -vvv, etc) - #[clap(short = 'v', long = "verbose", parse(from_occurrences))] - verbose: usize, + // /// Config file path (defaults to XDG) + // #[clap(long = "config", short = 'c')] + // config_file: Option, + /// Verbose mode (-v, -vv, -vvv, etc) + #[clap(short = 'v', long = "verbose", parse(from_occurrences))] + verbose: usize, } #[tokio::main] pub async fn run() -> Result<()> { - let opt = Options::parse(); + let opt = Options::parse(); - stderrlog::new() - .module(module_path!()) - .module("panorama_daemon") - .module("panorama_imap") - .verbosity(opt.verbose) - .init() - .unwrap(); + stderrlog::new() + .module(module_path!()) + .module("panorama_daemon") + .module("panorama_imap") + .verbosity(opt.verbose) + .init() + .unwrap(); - // if we're using a config-watcher, then start the watcher system - #[cfg(feature = "config-watch")] - { - let (_, mut config_watcher) = config::spawn_config_watcher_system()?; + // if we're using a config-watcher, then start the watcher system + #[cfg(feature = "config-watch")] + { + let (_, mut config_watcher) = config::spawn_config_watcher_system()?; - loop { - let (exit_tx, exit_rx) = oneshot::channel(); - let new_config = config_watcher.borrow().clone(); - tokio::spawn(run_with_config(new_config, exit_rx)); + loop { + let (exit_tx, exit_rx) = oneshot::channel(); + let new_config = config_watcher.borrow().clone(); + tokio::spawn(run_with_config(new_config, exit_rx)); - // wait till the config has changed, then tell the current thread to - // stop - config_watcher.changed().await?; - let _ = exit_tx.send(()); - } + // wait till the config has changed, then tell the current thread to + // stop + config_watcher.changed().await?; + let _ = exit_tx.send(()); + } + } + + // TODO: handle SIGHUP on Unix? pretty common for systemd to send this when + // reloading config files + + // if not, just read the config once and run the daemon + #[cfg(not(feature = "config-watch"))] + { + let xdg = BaseDirectories::new()?; + let config_home = xdg.get_config_home().join("panorama"); + if !config_home.exists() { + fs::create_dir_all(&config_home).await?; } - // TODO: handle SIGHUP on Unix? pretty common for systemd to send this when - // reloading config files + let config_path = config_home.join("panorama.toml"); + let config_path_c = config_path.canonicalize().context("cfg_path")?; + let config = Config::from_file(config_path_c).await.context("read")?; - // if not, just read the config once and run the daemon - #[cfg(not(feature = "config-watch"))] - { - let xdg = BaseDirectories::new()?; - let config_home = xdg.get_config_home().join("panorama"); - if !config_home.exists() { - fs::create_dir_all(&config_home).await?; - } - - let config_path = config_home.join("panorama.toml"); - let config_path_c = config_path.canonicalize().context("cfg_path")?; - let config = Config::from_file(config_path_c).await.context("read")?; - - let (_, exit_rx) = oneshot::channel(); - run_with_config(config, exit_rx).await - } + let (_, exit_rx) = oneshot::channel(); + run_with_config(config, exit_rx).await + } } async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> { - debug!("New config: {:?}", config); + debug!("New config: {:?}", config); - // keep track of which threads need to be stopped when this function is - // stopped - let mut notify_mail_threads = Vec::new(); + // keep track of which threads need to be stopped when this function is + // stopped + let mut notify_mail_threads = Vec::new(); - for (account_name, account) in config.mail_accounts { - let (exit_tx, exit_rx) = oneshot::channel(); - tokio::spawn(async { - match run_single_mail_account(account_name, account, exit_rx).await - { - Ok(_) => {} - Err(err) => panic!("failed: {:?}", err), - } - }); - notify_mail_threads.push(exit_tx); - } + for (account_name, account) in config.mail_accounts { + let (exit_tx, exit_rx) = oneshot::channel(); + tokio::spawn(async { + match run_single_mail_account(account_name, account, exit_rx).await { + Ok(_) => {} + Err(err) => panic!("failed: {:?}", err), + } + }); + notify_mail_threads.push(exit_tx); + } - exit.await?; - for exit_tx in notify_mail_threads { - let _ = exit_tx.send(()); - } + exit.await?; + for exit_tx in notify_mail_threads { + let _ = exit_tx.send(()); + } - Ok(()) + Ok(()) } /// The main loop for a single mail account. @@ -120,68 +119,68 @@ async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> { /// This loop is restarted each time the config watcher gets a new config, at /// which point `exit` is sent a single value telling us to break the loop. async fn run_single_mail_account( - account_name: String, - account: MailAccountConfig, - exit: ExitListener, + account_name: String, + account: MailAccountConfig, + exit: ExitListener, ) -> Result<()> { - debug!("run_single_mail_account({}, {:?})", account_name, account); - let mut exit = exit.fuse(); + debug!("run_single_mail_account({}, {:?})", account_name, account); + let mut exit = exit.fuse(); - let xdg = BaseDirectories::new()?; - let data_home = xdg.get_data_home().join("panorama"); - if !data_home.exists() { - fs::create_dir_all(&data_home) - .await - .context("could not create config directory")?; + let xdg = BaseDirectories::new()?; + let data_home = xdg.get_data_home().join("panorama"); + if !data_home.exists() { + fs::create_dir_all(&data_home) + .await + .context("could not create config directory")?; + } + let db_path = data_home.join("db.sqlite3"); + debug!("Opening database at path: {:?}", db_path); + if !db_path.exists() { + OpenOptions::new() + .create(true) + .write(true) + .open(&db_path) + .await + .context("could not touch db path")?; + } + let db_path = db_path + .canonicalize() + .context("could not canonicalize db path")?; + let store = + MailStore::open(format!("sqlite://{}", db_path.to_string_lossy())) + .await + .context("couldn't open mail store")?; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let sync_fut = sync_main(&account_name, account, tx, store).fuse(); + pin_mut!(sync_fut); + + debug!("Mail account loop for {}.", account_name); + loop { + select! { + res = sync_fut => match res { + Ok(_) => {}, + Err(err) => { + error!("sync_main died with: {:?}", err); + break; + } + }, + + evt_opt = rx.recv().fuse() => { + let evt = match evt_opt { + Some(evt) => evt, + None => break, + }; + + debug!("Event: {:?}", evt); + }, + + // we're being told to exit the loop + _ = exit => break, } - let db_path = data_home.join("db.sqlite3"); - debug!("Opening database at path: {:?}", db_path); - if !db_path.exists() { - OpenOptions::new() - .create(true) - .write(true) - .open(&db_path) - .await - .context("could not touch db path")?; - } - let db_path = db_path - .canonicalize() - .context("could not canonicalize db path")?; - let store = - MailStore::open(format!("sqlite://{}", db_path.to_string_lossy())) - .await - .context("couldn't open mail store")?; + } - let (tx, mut rx) = mpsc::unbounded_channel(); - - let sync_fut = sync_main(&account_name, account, tx, store).fuse(); - pin_mut!(sync_fut); - - debug!("Mail account loop for {}.", account_name); - loop { - select! { - res = sync_fut => match res { - Ok(_) => {}, - Err(err) => { - error!("sync_main died with: {:?}", err); - break; - } - }, - - evt_opt = rx.recv().fuse() => { - let evt = match evt_opt { - Some(evt) => evt, - None => break, - }; - - debug!("Event: {:?}", evt); - }, - - // we're being told to exit the loop - _ = exit => break, - } - } - - debug!("disconnecting from account {}", account_name); - Ok(()) + debug!("disconnecting from account {}", account_name); + Ok(()) } diff --git a/daemon/src/mail/mod.rs b/daemon/src/mail/mod.rs index 7147196..ba70404 100644 --- a/daemon/src/mail/mod.rs +++ b/daemon/src/mail/mod.rs @@ -6,11 +6,11 @@ mod store; use anyhow::{Context, Result}; use futures::stream::StreamExt; use panorama_imap::{ - client::{auth::Login, ConfigBuilder}, - proto::{ - command::FetchItems, - response::{MailboxData, Response}, - }, + client::{auth::Login, ConfigBuilder}, + proto::{ + command::FetchItems, + response::{MailboxData, Response}, + }, }; use sqlx::migrate::Migrator; use tokio::sync::mpsc::UnboundedSender; @@ -24,197 +24,190 @@ static MIGRATOR: Migrator = sqlx::migrate!(); /// The main function for the IMAP syncing thread pub async fn sync_main( - acct_name: impl AsRef, - acct: MailAccountConfig, - _mail2ui_tx: UnboundedSender, - _mail_store: MailStore, + acct_name: impl AsRef, + acct: MailAccountConfig, + _mail2ui_tx: UnboundedSender, + _mail_store: MailStore, ) -> Result<()> { - let acct_name = acct_name.as_ref(); - debug!("Starting main synchronization procedure for {}", acct_name); + let acct_name = acct_name.as_ref(); + debug!("Starting main synchronization procedure for {}", acct_name); - // loop ensures that the connection is retried after it dies - loop { - let client = ConfigBuilder::default() - .hostname(acct.imap.server.clone()) - .port(acct.imap.port) - .tls(matches!(acct.imap.tls, TlsMethod::On)) - .open() + // loop ensures that the connection is retried after it dies + loop { + let client = ConfigBuilder::default() + .hostname(acct.imap.server.clone()) + .port(acct.imap.port) + .tls(matches!(acct.imap.tls, TlsMethod::On)) + .open() + .await + .map_err(|err| anyhow!("err: {}", err))?; + debug!("Connected to {}:{}.", &acct.imap.server, acct.imap.port); + + debug!("TLS Upgrade option: {:?}", acct.imap.tls); + let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { + debug!("attempting to upgrade"); + let client = client + .upgrade() + .await + .context("could not upgrade connection")?; + debug!("upgrade successful"); + client + } else { + warn!("Continuing with unencrypted connection!"); + client + }; + + debug!("preparing to auth"); + // check if the authentication method is supported + let mut authed = match &acct.imap.auth { + ImapAuth::Plain { username, password } => { + let login = Login { + username: username.clone(), + password: password.clone(), + }; + unauth.auth(login).await? + } + }; + + debug!("authentication successful!"); + let folder_list = authed.list().await?; + // let _ = mail2ui_tx.send(MailEvent::FolderList( + // acct_name.clone(), + // folder_list.clone(), + // )); + + debug!("mailbox list: {:?}", folder_list); + for folder in folder_list.iter() { + debug!("folder: {:?}", folder); + let select = authed.select("INBOX").await?; + debug!("select response: {:?}", select); + if let (Some(_exists), Some(_uidvalidity)) = + (select.exists, select.uid_validity) + { + // figure out which uids don't exist locally yet + let new_uids = vec![]; + // let new_uids = + // stream::iter(1..exists).map(Ok).try_filter_map(|uid| { + // todo!() + // // mail_store.try_identify_email(&acct_name, &folder, + // uid, uidvalidity, None) // // + // invert the option to only select uids that + // haven't been downloaded // + // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) + // // .map_err(|err| err.context("error checking if + // the email is already downloaded + // [try_identify_email]")) }).try_collect:: + // >().await?; + if !new_uids.is_empty() { + debug!("fetching uids {:?}", new_uids); + let _fetched = authed + .uid_fetch(&new_uids, &[], FetchItems::Envelope) .await - .map_err(|err| anyhow!("err: {}", err))?; - debug!("Connected to {}:{}.", &acct.imap.server, acct.imap.port); - - debug!("TLS Upgrade option: {:?}", acct.imap.tls); - let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { - debug!("attempting to upgrade"); - let client = client - .upgrade() - .await - .context("could not upgrade connection")?; - debug!("upgrade successful"); - client - } else { - warn!("Continuing with unencrypted connection!"); - client - }; - - debug!("preparing to auth"); - // check if the authentication method is supported - let mut authed = match &acct.imap.auth { - ImapAuth::Plain { username, password } => { - let login = Login { - username: username.clone(), - password: password.clone(), - }; - unauth.auth(login).await? - } - }; - - debug!("authentication successful!"); - let folder_list = authed.list().await?; - // let _ = mail2ui_tx.send(MailEvent::FolderList( - // acct_name.clone(), - // folder_list.clone(), - // )); - - debug!("mailbox list: {:?}", folder_list); - for folder in folder_list.iter() { - debug!("folder: {:?}", folder); - let select = authed.select("INBOX").await?; - debug!("select response: {:?}", select); - if let (Some(_exists), Some(_uidvalidity)) = - (select.exists, select.uid_validity) - { - // figure out which uids don't exist locally yet - let new_uids = vec![]; - // let new_uids = - // stream::iter(1..exists).map(Ok).try_filter_map(|uid| { - // todo!() - // // mail_store.try_identify_email(&acct_name, &folder, - // uid, uidvalidity, None) // // - // invert the option to only select uids that - // haven't been downloaded // - // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) - // // .map_err(|err| err.context("error checking if - // the email is already downloaded - // [try_identify_email]")) }).try_collect:: - // >().await?; - if !new_uids.is_empty() { - debug!("fetching uids {:?}", new_uids); - let _fetched = authed - .uid_fetch(&new_uids, &[], FetchItems::Envelope) - .await - .context("error fetching uids")?; - // fetched - // .map(Ok) - // .try_for_each_concurrent(None, |(uid, attrs)| { - // mail_store.store_email(&acct_name, &folder, uid, - // uidvalidity, attrs) }) - // .await - // .context("error during fetch-store")?; - } - } + .context("error fetching uids")?; + // fetched + // .map(Ok) + // .try_for_each_concurrent(None, |(uid, attrs)| { + // mail_store.store_email(&acct_name, &folder, uid, + // uidvalidity, attrs) }) + // .await + // .context("error during fetch-store")?; } - tokio::time::sleep(std::time::Duration::from_secs(50)).await; + } + } + tokio::time::sleep(std::time::Duration::from_secs(50)).await; - // TODO: remove this later - // continue; - // let's just select INBOX for now, maybe have a config for default - // mailbox later? + // TODO: remove this later + // continue; - debug!("selecting the INBOX mailbox"); - let select = authed.select("INBOX").await?; - debug!("select result: {:?}", select); + // let's just select INBOX for now, maybe have a config for default + // mailbox later? + + debug!("selecting the INBOX mailbox"); + let select = authed.select("INBOX").await?; + debug!("select result: {:?}", select); + + loop { + let message_uids = authed.uid_search().await?; + let message_uids = message_uids.into_iter().take(30).collect::>(); + // let _ = mail2ui_tx.send(MailEvent::MessageUids( + // acct_name.clone(), + // message_uids.clone(), + // )); + // TODO: make this happen concurrently with the main loop? + let mut message_list = authed + .uid_fetch(&message_uids, &[], FetchItems::All) + .await + .unwrap(); + while let Some((_uid, _attrs)) = message_list.next().await { + // let evt = MailEvent::UpdateUid(acct_name.clone(), uid, + // attrs); TODO: probably odn't care about this? + // let _ = mail2ui_tx.send(evt); + } + + // TODO: check if IDLE is supported + let supports_idle = true; // authed.has_capability("IDLE").await?; + + if supports_idle { + let mut idle_stream = authed.idle().await?; loop { - let message_uids = authed.uid_search().await?; - let message_uids = - message_uids.into_iter().take(30).collect::>(); - // let _ = mail2ui_tx.send(MailEvent::MessageUids( - // acct_name.clone(), - // message_uids.clone(), - // )); - // TODO: make this happen concurrently with the main loop? - let mut message_list = authed + let evt = match idle_stream.next().await { + Some(v) => v, + None => break, + }; + + debug!("got an event: {:?}", evt); + match evt { + Response::MailboxData(MailboxData::Exists(uid)) => { + debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); + // send DONE to stop the idle + std::mem::drop(idle_stream); + // let handle = Notification::new() + // .summary("New Email") + // .body("TODO") + // .icon("firefox") + // .timeout(Timeout::Milliseconds(6000)) + // .show()?; + let message_uids = authed.uid_search().await?; + let message_uids = + message_uids.into_iter().take(20).collect::>(); + // let _ = mail2ui_tx.send(MailEvent::MessageUids( + // acct_name.clone(), + // message_uids.clone(), + // )); + // TODO: make this happen concurrently with the main + // loop? + let mut message_list = authed .uid_fetch(&message_uids, &[], FetchItems::All) .await .unwrap(); - while let Some((_uid, _attrs)) = message_list.next().await { - // let evt = MailEvent::UpdateUid(acct_name.clone(), uid, - // attrs); TODO: probably odn't care about this? - // let _ = mail2ui_tx.send(evt); - } - - // TODO: check if IDLE is supported - let supports_idle = true; // authed.has_capability("IDLE").await?; - - if supports_idle { - let mut idle_stream = authed.idle().await?; - loop { - let evt = match idle_stream.next().await { - Some(v) => v, - None => break, - }; - - debug!("got an event: {:?}", evt); - match evt { - Response::MailboxData(MailboxData::Exists(uid)) => { - debug!( - "NEW MESSAGE WITH UID {:?}, droping everything", - uid - ); - // send DONE to stop the idle - std::mem::drop(idle_stream); - // let handle = Notification::new() - // .summary("New Email") - // .body("TODO") - // .icon("firefox") - // .timeout(Timeout::Milliseconds(6000)) - // .show()?; - let message_uids = authed.uid_search().await?; - let message_uids = message_uids - .into_iter() - .take(20) - .collect::>(); - // let _ = mail2ui_tx.send(MailEvent::MessageUids( - // acct_name.clone(), - // message_uids.clone(), - // )); - // TODO: make this happen concurrently with the main - // loop? - let mut message_list = authed - .uid_fetch(&message_uids, &[], FetchItems::All) - .await - .unwrap(); - while let Some((_uid, _attrs)) = - message_list.next().await - { - // let evt = MailEvent::UpdateUid(acct_name. - // clone(), uid, attrs); - // debug!("sent {:?}", evt); - // mail2ui_tx.send(evt); - } - - idle_stream = authed.idle().await?; - } - _ => {} - } - } - } else { - loop { - tokio::time::sleep(std::time::Duration::from_secs(20)) - .await; - debug!("heartbeat"); - } - } - if false { - break; + while let Some((_uid, _attrs)) = message_list.next().await { + // let evt = MailEvent::UpdateUid(acct_name. + // clone(), uid, attrs); + // debug!("sent {:?}", evt); + // mail2ui_tx.send(evt); + } + + idle_stream = authed.idle().await?; } + _ => {} + } } - - // wait a bit so we're not hitting the server really fast if the fail - // happens early on - // - // TODO: some kind of smart exponential backoff that considers some time - // threshold to be a failing case? - tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } else { + loop { + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + debug!("heartbeat"); + } + } + if false { + break; + } } + + // wait a bit so we're not hitting the server really fast if the fail + // happens early on + // + // TODO: some kind of smart exponential backoff that considers some time + // threshold to be a failing case? + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } } diff --git a/daemon/src/mail/store.rs b/daemon/src/mail/store.rs index 38e2f83..94d747f 100644 --- a/daemon/src/mail/store.rs +++ b/daemon/src/mail/store.rs @@ -4,20 +4,20 @@ use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; use super::MIGRATOR; pub struct MailStore { - pool: SqlitePool, + pool: SqlitePool, } impl MailStore { - /// Creates a new connection to a SQLite database. - pub async fn open(uri: impl AsRef) -> Result { - let pool = SqlitePoolOptions::new().connect(uri.as_ref()).await?; + /// Creates a new store tied to a SQLite database. + pub async fn open(uri: impl AsRef) -> Result { + let pool = SqlitePoolOptions::new().connect(uri.as_ref()).await?; - // run migrations, if available - MIGRATOR - .run(&pool) - .await - .context("could not run migrations on the pool")?; + // run migrations, if available + MIGRATOR + .run(&pool) + .await + .context("could not run migrations on the pool")?; - Ok(MailStore { pool }) - } + Ok(MailStore { pool }) + } } diff --git a/imap/src/client/auth.rs b/imap/src/client/auth.rs index 6c70964..0f4e6db 100644 --- a/imap/src/client/auth.rs +++ b/imap/src/client/auth.rs @@ -7,40 +7,40 @@ use crate::client::inner::Inner; use crate::proto::command::{Command, CommandLogin}; pub trait Client: - AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static + AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static { } impl Client for C where - C: Send + Sync + Unpin + AsyncWrite + AsyncRead + 'static + C: Send + Sync + Unpin + AsyncWrite + AsyncRead + 'static { } #[async_trait] pub trait AuthMethod { - async fn perform_auth(&self, inner: &mut Inner) -> Result<()> - where - C: Client; + async fn perform_auth(&self, inner: &mut Inner) -> Result<()> + where + C: Client; } pub struct Login { - pub username: String, - pub password: String, + pub username: String, + pub password: String, } #[async_trait] impl AuthMethod for Login { - async fn perform_auth(&self, inner: &mut Inner) -> Result<()> - where - C: Client, - { - let command = Command::Login(CommandLogin { - userid: Bytes::from(self.username.clone()), - password: Bytes::from(self.password.clone()), - }); + async fn perform_auth(&self, inner: &mut Inner) -> Result<()> + where + C: Client, + { + let command = Command::Login(CommandLogin { + userid: Bytes::from(self.username.clone()), + password: Bytes::from(self.password.clone()), + }); - let result = inner.execute(command).await?; - info!("result: {:?}", result.wait().await?); + let result = inner.execute(command).await?; + info!("result: {:?}", result.wait().await?); - Ok(()) - } + Ok(()) + } } diff --git a/imap/src/client/client.rs b/imap/src/client/client.rs index d68feb6..5dbb0ac 100644 --- a/imap/src/client/client.rs +++ b/imap/src/client/client.rs @@ -4,22 +4,22 @@ use std::task::{Context, Poll}; use anyhow::Result; use futures::{ - future::{self, FutureExt}, - stream::{Stream, StreamExt}, + future::{self, FutureExt}, + stream::{Stream, StreamExt}, }; use panorama_proto_common::Bytes; use tokio::net::TcpStream; use tokio_rustls::client::TlsStream; use crate::proto::{ - command::{ - Command, CommandFetch, CommandList, CommandSearch, CommandSelect, - FetchItems, SearchCriteria, Sequence, - }, - response::{ - Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, - Response, ResponseCode, Status, - }, + command::{ + Command, CommandFetch, CommandList, CommandSearch, CommandSelect, + FetchItems, SearchCriteria, Sequence, + }, + response::{ + Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, + Response, ResponseCode, Status, + }, }; use super::auth::AuthMethod; @@ -31,278 +31,270 @@ use super::tls::wrap_tls; #[derive(Builder, Clone, Debug)] #[builder(build_fn(private))] pub struct Config { - /// The hostname of the IMAP server. If using TLS, must be an address - pub hostname: String, + /// The hostname of the IMAP server. If using TLS, must be an address + pub hostname: String, - /// The port of the IMAP server. - pub port: u16, + /// The port of the IMAP server. + pub port: u16, - /// Whether or not the client is using an encrypted stream. - /// - /// To upgrade the connection later, use the upgrade method. - #[builder(default = "true")] - pub tls: bool, + /// Whether or not the client is using an encrypted stream. + /// + /// To upgrade the connection later, use the upgrade method. + #[builder(default = "true")] + pub tls: bool, - /// Whether or not to verify hostname - #[builder(default = "true")] - pub verify_hostname: bool, + /// Whether or not to verify hostname + #[builder(default = "true")] + pub verify_hostname: bool, } impl Config { - pub fn builder() -> ConfigBuilder { ConfigBuilder::default() } + pub fn builder() -> ConfigBuilder { ConfigBuilder::default() } } impl ConfigBuilder { - pub async fn open(&self) -> Result { - let config = self.build()?; + pub async fn open(&self) -> Result { + let config = self.build()?; - let hostname = config.hostname.as_ref(); - let port = config.port; - trace!("connecting to {}:{}...", hostname, port); - let conn = TcpStream::connect((hostname, port)).await?; - trace!("connected."); + let hostname = config.hostname.as_ref(); + let port = config.port; + trace!("connecting to {}:{}...", hostname, port); + let conn = TcpStream::connect((hostname, port)).await?; + trace!("connected."); - if config.tls { - let conn = wrap_tls(conn, hostname).await?; - let mut inner = Inner::new(conn, config).await?; + if config.tls { + let conn = wrap_tls(conn, hostname).await?; + let mut inner = Inner::new(conn, config).await?; - inner.wait_for_greeting().await?; - debug!("received greeting"); + inner.wait_for_greeting().await?; + debug!("received greeting"); - return Ok(ClientUnauthenticated::Encrypted(inner)); - } else { - let mut inner = Inner::new(conn, config).await?; + return Ok(ClientUnauthenticated::Encrypted(inner)); + } else { + let mut inner = Inner::new(conn, config).await?; - inner.wait_for_greeting().await?; - debug!("received greeting"); + inner.wait_for_greeting().await?; + debug!("received greeting"); - return Ok(ClientUnauthenticated::Unencrypted(inner)); - } + return Ok(ClientUnauthenticated::Unencrypted(inner)); } + } } /// A client that hasn't been authenticated. pub enum ClientUnauthenticated { - Encrypted(Inner>), - Unencrypted(Inner), + Encrypted(Inner>), + Unencrypted(Inner), } impl ClientUnauthenticated { - pub async fn upgrade(self) -> Result { - match self { - // this is a no-op, we don't need to upgrade - ClientUnauthenticated::Encrypted(_) => Ok(self), - ClientUnauthenticated::Unencrypted(e) => { - Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?)) - } - } + pub async fn upgrade(self) -> Result { + match self { + // this is a no-op, we don't need to upgrade + ClientUnauthenticated::Encrypted(_) => Ok(self), + ClientUnauthenticated::Unencrypted(e) => { + Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?)) + } } + } - pub async fn auth( - self, - auth: impl AuthMethod, - ) -> Result { - match self { - // this is a no-op, we don't need to upgrade - ClientUnauthenticated::Encrypted(mut inner) => { - auth.perform_auth(&mut inner).await?; - Ok(ClientAuthenticated::Encrypted(inner)) - } - ClientUnauthenticated::Unencrypted(mut inner) => { - auth.perform_auth(&mut inner).await?; - Ok(ClientAuthenticated::Unencrypted(inner)) - } - } + pub async fn auth( + self, + auth: impl AuthMethod, + ) -> Result { + match self { + // this is a no-op, we don't need to upgrade + ClientUnauthenticated::Encrypted(mut inner) => { + auth.perform_auth(&mut inner).await?; + Ok(ClientAuthenticated::Encrypted(inner)) + } + ClientUnauthenticated::Unencrypted(mut inner) => { + auth.perform_auth(&mut inner).await?; + Ok(ClientAuthenticated::Unencrypted(inner)) + } } + } - client_expose!(async execute(cmd: Command) -> Result); - client_expose!(async has_capability(cap: impl AsRef) -> Result); + client_expose!(async execute(cmd: Command) -> Result); + client_expose!(async has_capability(cap: impl AsRef) -> Result); } /// A client that has been authenticated. pub enum ClientAuthenticated { - Encrypted(Inner>), - Unencrypted(Inner), + Encrypted(Inner>), + Unencrypted(Inner), } impl ClientAuthenticated { - client_expose!(async execute(cmd: Command) -> Result); - client_expose!(async has_capability(cap: impl AsRef) -> Result); + client_expose!(async execute(cmd: Command) -> Result); + client_expose!(async has_capability(cap: impl AsRef) -> Result); - pub async fn search(&mut self) -> Result<()> { - let cmd = Command::Examine; - let res = self.execute(cmd).await?; - let (done, data) = res.wait().await?; - println!("done = {:?}", done); - println!("data = {:?}", data); + pub async fn search(&mut self) -> Result<()> { + let cmd = Command::Examine; + let res = self.execute(cmd).await?; + let (done, data) = res.wait().await?; + println!("done = {:?}", done); + println!("data = {:?}", data); - Ok(()) + Ok(()) + } + + /// Runs the LIST command + pub async fn list(&mut self) -> Result> { + let cmd = Command::List(CommandList { + reference: Bytes::from(""), + mailbox: Bytes::from("*"), + }); + + let res = self.execute(cmd).await?; + let (_, data) = res.wait().await?; + + let mut folders = Vec::new(); + for resp in data { + if let Response::MailboxData(MailboxData::List(MailboxList { + mailbox, + .. + })) = resp + { + folders.push(mailbox); + } } - /// Runs the LIST command - pub async fn list(&mut self) -> Result> { - let cmd = Command::List(CommandList { - reference: Bytes::from(""), - mailbox: Bytes::from("*"), - }); + Ok(folders) + } - let res = self.execute(cmd).await?; - let (_, data) = res.wait().await?; + /// Runs the SELECT command + pub async fn select( + &mut self, + mailbox: impl AsRef, + ) -> Result { + let cmd = Command::Select(CommandSelect { + mailbox: Bytes::from(mailbox.as_ref().to_owned()), + }); - let mut folders = Vec::new(); - for resp in data { - if let Response::MailboxData(MailboxData::List(MailboxList { - mailbox, - .. - })) = resp - { - folders.push(mailbox); - } + let stream = self.execute(cmd).await?; + let (_, data) = stream.wait().await?; + + let mut select = SelectResponse::default(); + for resp in data { + match resp { + Response::MailboxData(MailboxData::Flags(flags)) => { + select.flags = flags } - - Ok(folders) + Response::MailboxData(MailboxData::Exists(exists)) => { + select.exists = Some(exists) + } + Response::MailboxData(MailboxData::Recent(recent)) => { + select.recent = Some(recent) + } + Response::Tagged( + _, + Condition { + status: Status::Ok, + code: Some(code), + .. + }, + ) + | Response::Condition(Condition { + status: Status::Ok, + code: Some(code), + .. + }) => match code { + ResponseCode::Unseen(value) => select.unseen = Some(value), + ResponseCode::UidNext(value) => select.uid_next = Some(value), + ResponseCode::UidValidity(value) => select.uid_validity = Some(value), + _ => {} + }, + _ => warn!("unknown response {:?}", resp), + } } - /// Runs the SELECT command - pub async fn select( - &mut self, - mailbox: impl AsRef, - ) -> Result { - let cmd = Command::Select(CommandSelect { - mailbox: Bytes::from(mailbox.as_ref().to_owned()), - }); + Ok(select) + } - let stream = self.execute(cmd).await?; - let (_, data) = stream.wait().await?; - - let mut select = SelectResponse::default(); - for resp in data { - match resp { - Response::MailboxData(MailboxData::Flags(flags)) => { - select.flags = flags - } - Response::MailboxData(MailboxData::Exists(exists)) => { - select.exists = Some(exists) - } - Response::MailboxData(MailboxData::Recent(recent)) => { - select.recent = Some(recent) - } - Response::Tagged( - _, - Condition { - status: Status::Ok, - code: Some(code), - .. - }, - ) - | Response::Condition(Condition { - status: Status::Ok, - code: Some(code), - .. - }) => match code { - ResponseCode::Unseen(value) => select.unseen = Some(value), - ResponseCode::UidNext(value) => { - select.uid_next = Some(value) - } - ResponseCode::UidValidity(value) => { - select.uid_validity = Some(value) - } - _ => {} - }, - _ => warn!("unknown response {:?}", resp), - } - } - - Ok(select) + /// Runs the SEARCH command + pub async fn uid_search(&mut self) -> Result> { + let cmd = Command::UidSearch(CommandSearch { + criteria: SearchCriteria::all(), + }); + let stream = self.execute(cmd).await?; + let (_, data) = stream.wait().await?; + for resp in data { + if let Response::MailboxData(MailboxData::Search(uids)) = resp { + return Ok(uids); + } } + bail!("could not find the SEARCH response") + } - /// Runs the SEARCH command - pub async fn uid_search(&mut self) -> Result> { - let cmd = Command::UidSearch(CommandSearch { - criteria: SearchCriteria::all(), - }); - let stream = self.execute(cmd).await?; - let (_, data) = stream.wait().await?; - for resp in data { - if let Response::MailboxData(MailboxData::Search(uids)) = resp { - return Ok(uids); - } - } - bail!("could not find the SEARCH response") + /// Runs the FETCH command + pub async fn fetch( + &mut self, + uids: &[u32], + uid_seqs: &[Range], + items: FetchItems, + ) -> Result)>> { + let mut ids = Vec::new(); + for uid in uids { + ids.push(Sequence::Single(*uid)); } + for seq in uid_seqs { + ids.push(Sequence::Range(seq.start, seq.end)); + } + let cmd = Command::Fetch(CommandFetch { ids, items }); + debug!("fetch: {:?}", cmd); + let stream = self.execute(cmd).await?; + // let (done, data) = stream.wait().await?; + Ok(stream.filter_map(|resp| match resp { + Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), + Response::Done(_) => future::ready(None).boxed(), + _ => future::pending().boxed(), + })) + } - /// Runs the FETCH command - pub async fn fetch( - &mut self, - uids: &[u32], - uid_seqs: &[Range], - items: FetchItems, - ) -> Result)>> { - let mut ids = Vec::new(); - for uid in uids { - ids.push(Sequence::Single(*uid)); - } - for seq in uid_seqs { - ids.push(Sequence::Range(seq.start, seq.end)); - } - let cmd = Command::Fetch(CommandFetch { ids, items }); - debug!("fetch: {:?}", cmd); - let stream = self.execute(cmd).await?; - // let (done, data) = stream.wait().await?; - Ok(stream.filter_map(|resp| match resp { - Response::Fetch(n, attrs) => { - future::ready(Some((n, attrs))).boxed() - } - Response::Done(_) => future::ready(None).boxed(), - _ => future::pending().boxed(), - })) + /// Runs the UID FETCH command + pub async fn uid_fetch( + &mut self, + uids: &[u32], + uid_seqs: &[Range], + items: FetchItems, + ) -> Result)>> { + let mut ids = Vec::new(); + for uid in uids { + ids.push(Sequence::Single(*uid)); } + for seq in uid_seqs { + ids.push(Sequence::Range(seq.start, seq.end)); + } + let cmd = Command::UidFetch(CommandFetch { ids, items }); + debug!("uid fetch: {:?}", cmd); + let stream = self.execute(cmd).await?; + // let (done, data) = stream.wait().await?; + Ok(stream.filter_map(|resp| match resp { + Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), + Response::Done(_) => future::ready(None).boxed(), + _ => future::pending().boxed(), + })) + } - /// Runs the UID FETCH command - pub async fn uid_fetch( - &mut self, - uids: &[u32], - uid_seqs: &[Range], - items: FetchItems, - ) -> Result)>> { - let mut ids = Vec::new(); - for uid in uids { - ids.push(Sequence::Single(*uid)); - } - for seq in uid_seqs { - ids.push(Sequence::Range(seq.start, seq.end)); - } - let cmd = Command::UidFetch(CommandFetch { ids, items }); - debug!("uid fetch: {:?}", cmd); - let stream = self.execute(cmd).await?; - // let (done, data) = stream.wait().await?; - Ok(stream.filter_map(|resp| match resp { - Response::Fetch(n, attrs) => { - future::ready(Some((n, attrs))).boxed() - } - Response::Done(_) => future::ready(None).boxed(), - _ => future::pending().boxed(), - })) - } - - /// Runs the IDLE command - #[cfg(feature = "rfc2177")] - #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] - pub async fn idle(&mut self) -> Result { - let cmd = Command::Idle; - let stream = self.execute(cmd).await?; - Ok(IdleToken { stream }) - } + /// Runs the IDLE command + #[cfg(feature = "rfc2177")] + #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] + pub async fn idle(&mut self) -> Result { + let cmd = Command::Idle; + let stream = self.execute(cmd).await?; + Ok(IdleToken { stream }) + } } #[derive(Debug, Default)] pub struct SelectResponse { - pub flags: Vec, - pub exists: Option, - pub recent: Option, - pub uid_next: Option, - pub uid_validity: Option, - pub unseen: Option, + pub flags: Vec, + pub exists: Option, + pub recent: Option, + pub uid_next: Option, + pub uid_validity: Option, + pub unseen: Option, } /// A token that represents an idling connection. @@ -312,28 +304,28 @@ pub struct SelectResponse { #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] pub struct IdleToken { - pub stream: ResponseStream, - // sender: mpsc::UnboundedSender, + pub stream: ResponseStream, + // sender: mpsc::UnboundedSender, } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] impl Drop for IdleToken { - fn drop(&mut self) { - // TODO: put this into a channel instead - // tokio::spawn(self.client.execute(Command::Done)); - } + fn drop(&mut self) { + // TODO: put this into a channel instead + // tokio::spawn(self.client.execute(Command::Done)); + } } #[cfg(feature = "rfc2177")] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] impl Stream for IdleToken { - type Item = Response; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { - let stream = Pin::new(&mut self.stream); - Stream::poll_next(stream, cx) - } + type Item = Response; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + let stream = Pin::new(&mut self.stream); + Stream::poll_next(stream, cx) + } } diff --git a/imap/src/client/codec.rs b/imap/src/client/codec.rs index a03160c..b623061 100644 --- a/imap/src/client/codec.rs +++ b/imap/src/client/codec.rs @@ -6,90 +6,90 @@ use panorama_proto_common::{convert_error, Bytes}; use tokio_util::codec::{Decoder, Encoder}; use crate::proto::{ - command::Command, - response::{Response, Tag}, - rfc3501::response as parse_response, + command::Command, + response::{Response, Tag}, + rfc3501::response as parse_response, }; /// A codec that can be used for decoding `Response`s and encoding `Command`s. #[derive(Default)] pub struct ImapCodec { - decode_need_message_bytes: usize, + decode_need_message_bytes: usize, } impl<'a> Decoder for ImapCodec { - type Item = Response; - type Error = io::Error; + type Item = Response; + type Error = io::Error; - fn decode( - &mut self, - buf: &mut BytesMut, - ) -> Result, io::Error> { - use nom::Err; + fn decode( + &mut self, + buf: &mut BytesMut, + ) -> Result, io::Error> { + use nom::Err; - if self.decode_need_message_bytes > buf.len() { - return Ok(None); - } - - // this is a pretty hot mess so here's my best attempt at explaining - // buf, or buf1, is the original message - - // "split" mutably removes all the bytes from the self, and returns a - // new BytesMut with the contents. so buf2 now has all the - // original contents and buf1 is now empty - let buf2 = buf.split(); - - // now we're going to clone buf2 here, calling "freeze" turns the - // BytesMut back into Bytes so we can manipulate it. remember, - // none of this should be actually copying anything - let buf3 = buf2.clone().freeze(); - debug!("going to parse a response since buffer len: {}", buf3.len()); - // trace!("buf: {:?}", buf3); - - // we don't know how long the message is going to be yet, so parse it - // out of the Bytes right now, and since the buffer is being - // consumed, subtracting the remainder of the string from the - // original total (buf4_len) will tell us how long the payload - // was. this also avoids unnecessary cloning - let buf4: Bytes = buf3.clone().into(); - let buf4_len = buf4.len(); - let (response, len) = match parse_response(buf4) { - Ok((remaining, response)) => (response, buf4_len - remaining.len()), - - // the incomplete cases: set the decoded bytes and quit early - Err(nom::Err::Incomplete(Needed::Size(min))) => { - self.decode_need_message_bytes = min.get(); - return Ok(None); - } - Err(nom::Err::Incomplete(_)) => { - return Ok(None); - } - - // shit - Err(Err::Error(err)) | Err(Err::Failure(err)) => { - let buf4 = buf3.clone().into(); - error!("failed to parse: {:?}", buf4); - error!("code: {}", convert_error(buf4, &err)); - return Err(io::Error::new( - io::ErrorKind::Other, - format!("error during parsing of {:?}", buf), - )); - } - }; - - info!("success, parsed as {:?}", response); - // "unsplit" is the opposite of split, we're getting back the original - // data here - buf.unsplit(buf2); - - // and then move to after the message we just parsed - let first_part = buf.split_to(len); - trace!("parsed from: {:?}", first_part); - - // since we're done parsing a complete message, set this to zero - self.decode_need_message_bytes = 0; - Ok(Some(response)) + if self.decode_need_message_bytes > buf.len() { + return Ok(None); } + + // this is a pretty hot mess so here's my best attempt at explaining + // buf, or buf1, is the original message + + // "split" mutably removes all the bytes from the self, and returns a + // new BytesMut with the contents. so buf2 now has all the + // original contents and buf1 is now empty + let buf2 = buf.split(); + + // now we're going to clone buf2 here, calling "freeze" turns the + // BytesMut back into Bytes so we can manipulate it. remember, + // none of this should be actually copying anything + let buf3 = buf2.clone().freeze(); + debug!("going to parse a response since buffer len: {}", buf3.len()); + // trace!("buf: {:?}", buf3); + + // we don't know how long the message is going to be yet, so parse it + // out of the Bytes right now, and since the buffer is being + // consumed, subtracting the remainder of the string from the + // original total (buf4_len) will tell us how long the payload + // was. this also avoids unnecessary cloning + let buf4: Bytes = buf3.clone().into(); + let buf4_len = buf4.len(); + let (response, len) = match parse_response(buf4) { + Ok((remaining, response)) => (response, buf4_len - remaining.len()), + + // the incomplete cases: set the decoded bytes and quit early + Err(nom::Err::Incomplete(Needed::Size(min))) => { + self.decode_need_message_bytes = min.get(); + return Ok(None); + } + Err(nom::Err::Incomplete(_)) => { + return Ok(None); + } + + // shit + Err(Err::Error(err)) | Err(Err::Failure(err)) => { + let buf4 = buf3.clone().into(); + error!("failed to parse: {:?}", buf4); + error!("code: {}", convert_error(buf4, &err)); + return Err(io::Error::new( + io::ErrorKind::Other, + format!("error during parsing of {:?}", buf), + )); + } + }; + + info!("success, parsed as {:?}", response); + // "unsplit" is the opposite of split, we're getting back the original + // data here + buf.unsplit(buf2); + + // and then move to after the message we just parsed + let first_part = buf.split_to(len); + trace!("parsed from: {:?}", first_part); + + // since we're done parsing a complete message, set this to zero + self.decode_need_message_bytes = 0; + Ok(Some(response)) + } } /// A command with its accompanying tag. @@ -97,24 +97,24 @@ impl<'a> Decoder for ImapCodec { pub struct TaggedCommand(pub Tag, pub Command); impl<'a> Encoder<&'a TaggedCommand> for ImapCodec { - type Error = io::Error; + type Error = io::Error; - fn encode( - &mut self, - tagged_cmd: &TaggedCommand, - dst: &mut BytesMut, - ) -> Result<(), io::Error> { - let tag = &*tagged_cmd.0 .0; - let command = &tagged_cmd.1; + fn encode( + &mut self, + tagged_cmd: &TaggedCommand, + dst: &mut BytesMut, + ) -> Result<(), io::Error> { + let tag = &*tagged_cmd.0 .0; + let command = &tagged_cmd.1; - dst.put(tag); - dst.put_u8(b' '); + dst.put(tag); + dst.put_u8(b' '); - // TODO: don't allocate here! use a stream writer - let cmd_bytes = format_bytes!(b"{}", command); - dst.extend_from_slice(cmd_bytes.as_slice()); + // TODO: don't allocate here! use a stream writer + let cmd_bytes = format_bytes!(b"{}", command); + dst.extend_from_slice(cmd_bytes.as_slice()); - // debug!("C>>>S: {:?}", dst); - Ok(()) - } + // debug!("C>>>S: {:?}", dst); + Ok(()) + } } diff --git a/imap/src/client/inner.rs b/imap/src/client/inner.rs index 6a328e9..2128fb8 100644 --- a/imap/src/client/inner.rs +++ b/imap/src/client/inner.rs @@ -1,30 +1,29 @@ use std::collections::HashSet; use std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, + atomic::{AtomicU32, Ordering}, + Arc, }; use anyhow::{Context, Result}; use futures::{ - future::{self, FutureExt, TryFutureExt}, - stream::StreamExt, + future::{self, FutureExt, TryFutureExt}, + stream::StreamExt, }; use panorama_proto_common::Bytes; use tokio::{ - io::{ - split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, - WriteHalf, - }, - sync::{mpsc, oneshot, RwLock}, - task::JoinHandle, + io::{ + split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf, + }, + sync::{mpsc, oneshot, RwLock}, + task::JoinHandle, }; use tokio_rustls::client::TlsStream; use tokio_util::codec::FramedRead; use crate::proto::{ - command::Command, - response::{Capability, Condition, Response, Status, Tag}, - rfc3501::capability as parse_capability, + command::Command, + response::{Capability, Condition, Response, Status, Tag}, + rfc3501::capability as parse_capability, }; use super::client::Config; @@ -42,193 +41,187 @@ type GreetingWaiter = oneshot::Receiver<()>; /// Low-level client, can directly read from and write to the stream /// without the additional type-safety of the higher-level state machine. pub struct Inner { - config: Config, - tag_number: AtomicU32, - command_tx: mpsc::UnboundedSender, + config: Config, + tag_number: AtomicU32, + command_tx: mpsc::UnboundedSender, - read_exit: ExitSender, - read_handle: JoinHandle>, + read_exit: ExitSender, + read_handle: JoinHandle>, - write_exit: ExitSender, - write_handle: JoinHandle>, - _write_tx: mpsc::UnboundedSender, + write_exit: ExitSender, + write_handle: JoinHandle>, + _write_tx: mpsc::UnboundedSender, - greeting_rx: Option, - capabilities: Arc>>>, + greeting_rx: Option, + capabilities: Arc>>>, } #[derive(Debug)] struct CommandContainer { - tag: Tag, - command: Command, - channel: mpsc::UnboundedSender, + tag: Tag, + command: Command, + channel: mpsc::UnboundedSender, } impl Inner where - C: AsyncRead + AsyncWrite + Unpin + Send + 'static, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - pub async fn new(c: C, config: Config) -> Result { - let (command_tx, command_rx) = mpsc::unbounded_channel(); + pub async fn new(c: C, config: Config) -> Result { + let (command_tx, command_rx) = mpsc::unbounded_channel(); - // break the stream of bytes into a reader and a writer - // the read_half represents the server->client connection - // the write_half represents the client->server connection - let (read_half, write_half) = split(c); + // break the stream of bytes into a reader and a writer + // the read_half represents the server->client connection + // the write_half represents the client->server connection + let (read_half, write_half) = split(c); - // this channel is used to inform clients when we receive the - // initial greeting from the server - let (greeting_tx, greeting_rx) = oneshot::channel(); + // this channel is used to inform clients when we receive the + // initial greeting from the server + let (greeting_tx, greeting_rx) = oneshot::channel(); - // spawn the server->client loop - let (read_exit, exit_rx) = oneshot::channel(); - let (write_tx, write_rx) = mpsc::unbounded_channel(); // TODO: maybe an arbitrary/configurable limit here would be better? - let read_handle = tokio::spawn(read_loop( - read_half, - exit_rx, - greeting_tx, - write_tx.clone(), - command_rx, - )); + // spawn the server->client loop + let (read_exit, exit_rx) = oneshot::channel(); + let (write_tx, write_rx) = mpsc::unbounded_channel(); // TODO: maybe an arbitrary/configurable limit here would be better? + let read_handle = tokio::spawn(read_loop( + read_half, + exit_rx, + greeting_tx, + write_tx.clone(), + command_rx, + )); - // spawn the client->server loop - let (write_exit, exit_rx) = oneshot::channel(); - let write_handle = - tokio::spawn(write_loop(write_half, exit_rx, write_rx)); + // spawn the client->server loop + let (write_exit, exit_rx) = oneshot::channel(); + let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx)); - let tag_number = AtomicU32::new(0); - let capabilities = Arc::new(RwLock::new(None)); - Ok(Inner { - config, - tag_number, - command_tx, - read_exit, - read_handle, - write_exit, - write_handle, - _write_tx: write_tx, - greeting_rx: Some(greeting_rx), - capabilities, - }) - } + let tag_number = AtomicU32::new(0); + let capabilities = Arc::new(RwLock::new(None)); + Ok(Inner { + config, + tag_number, + command_tx, + read_exit, + read_handle, + write_exit, + write_handle, + _write_tx: write_tx, + greeting_rx: Some(greeting_rx), + capabilities, + }) + } - pub async fn execute( - &mut self, - command: Command, - ) -> Result { - let id = self.tag_number.fetch_add(1, Ordering::SeqCst); - let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id))); + pub async fn execute(&mut self, command: Command) -> Result { + let id = self.tag_number.fetch_add(1, Ordering::SeqCst); + let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id))); - let (channel, rx) = mpsc::unbounded_channel(); - self.command_tx.send(CommandContainer { - tag, - command, - channel, - })?; + let (channel, rx) = mpsc::unbounded_channel(); + self.command_tx.send(CommandContainer { + tag, + command, + channel, + })?; - let stream = ResponseStream { inner: rx }; - Ok(stream) - } + let stream = ResponseStream { inner: rx }; + Ok(stream) + } - pub async fn has_capability( - &mut self, - cap: impl AsRef, - ) -> Result { - let mut cap_slice = cap.as_ref().as_bytes().to_vec(); + pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { + let mut cap_slice = cap.as_ref().as_bytes().to_vec(); - // since we're doing incremental parsing, we have to finish this off - // with something that's invalid - cap_slice.push(b'\n'); + // since we're doing incremental parsing, we have to finish this off + // with something that's invalid + cap_slice.push(b'\n'); - let cap_bytes = Bytes::from(cap_slice); - trace!("CAP_BYTES: {:?}", cap_bytes); + let cap_bytes = Bytes::from(cap_slice); + trace!("CAP_BYTES: {:?}", cap_bytes); - let (_, cap) = parse_capability(cap_bytes) - .context("could not parse capability")?; + let (_, cap) = + parse_capability(cap_bytes).context("could not parse capability")?; - let contains = { - let read = self.capabilities.read().await; - if let Some(read) = &*read { - read.contains(&cap) - } else { - std::mem::drop(read); + let contains = { + let read = self.capabilities.read().await; + if let Some(read) = &*read { + read.contains(&cap) + } else { + std::mem::drop(read); - let cmd = self.execute(Command::Capability).await?; - let (_, res) = cmd.wait().await?; + let cmd = self.execute(Command::Capability).await?; + let (_, res) = cmd.wait().await?; - let mut capabilities = HashSet::new(); - // todo!("done: {:?} {:?}", done, res); + let mut capabilities = HashSet::new(); + // todo!("done: {:?} {:?}", done, res); - for caps in res.iter().filter_map(|res| match res { - Response::Capabilities(caps) => Some(caps), - _ => None, - }) { - capabilities.extend(caps.clone()); - } - - let mut write = self.capabilities.write().await; - *write = Some(capabilities); - true - } - }; - - Ok(contains) - } - - pub async fn upgrade(mut self) -> Result>> { - debug!("preparing to upgrade using STARTTLS"); - - // check that this capability exists - // if it doesn't exist, then it's not an IMAP4-compliant server - if !self - .has_capability("STARTTLS") - .await - .context("could not check starttls capability")? - { - bail!("Server does not have the STARTTLS capability"); + for caps in res.iter().filter_map(|res| match res { + Response::Capabilities(caps) => Some(caps), + _ => None, + }) { + capabilities.extend(caps.clone()); } - // issue the STARTTLS command to the server - let resp = self - .execute(Command::Starttls) - .await - .context("could not send starttls command")?; - resp.wait() - .await - .context("could not receive starttls response")?; - debug!("received OK from server"); + let mut write = self.capabilities.write().await; + *write = Some(capabilities); + true + } + }; - // issue exit to the read loop and retrieve the read half - let _ = self.read_exit.send(()); - let read_half = self - .read_handle - .await - .context("could not retrieve read half of connection")?; + Ok(contains) + } - // issue exit to the write loop and retrieve the write half - let _ = self.write_exit.send(()); - let write_half = self - .write_handle - .await - .context("could not retrieve write half of connection")?; + pub async fn upgrade(mut self) -> Result>> { + debug!("preparing to upgrade using STARTTLS"); - // put the read half and write half back together - let stream = read_half.unsplit(write_half); - let tls_stream = wrap_tls(stream, &self.config.hostname) - .await - .context("could not initialize tls stream")?; - - Inner::new(tls_stream, self.config) - .await - .context("could not construct new client") + // check that this capability exists + // if it doesn't exist, then it's not an IMAP4-compliant server + if !self + .has_capability("STARTTLS") + .await + .context("could not check starttls capability")? + { + bail!("Server does not have the STARTTLS capability"); } - pub async fn wait_for_greeting(&mut self) -> Result<()> { - if let Some(greeting_rx) = self.greeting_rx.take() { - greeting_rx.await?; - } - Ok(()) + // issue the STARTTLS command to the server + let resp = self + .execute(Command::Starttls) + .await + .context("could not send starttls command")?; + resp + .wait() + .await + .context("could not receive starttls response")?; + debug!("received OK from server"); + + // issue exit to the read loop and retrieve the read half + let _ = self.read_exit.send(()); + let read_half = self + .read_handle + .await + .context("could not retrieve read half of connection")?; + + // issue exit to the write loop and retrieve the write half + let _ = self.write_exit.send(()); + let write_half = self + .write_handle + .await + .context("could not retrieve write half of connection")?; + + // put the read half and write half back together + let stream = read_half.unsplit(write_half); + let tls_stream = wrap_tls(stream, &self.config.hostname) + .await + .context("could not initialize tls stream")?; + + Inner::new(tls_stream, self.config) + .await + .context("could not construct new client") + } + + pub async fn wait_for_greeting(&mut self) -> Result<()> { + if let Some(greeting_rx) = self.greeting_rx.take() { + greeting_rx.await?; } + Ok(()) + } } // exit is a channel that will notify this loop when some external @@ -236,133 +229,133 @@ where // // when the loop exits, the read half of the stream will be returned async fn read_loop( - stream: ReadHalf, - exit: ExitListener, - greeting_tx: GreetingSender, - write_tx: mpsc::UnboundedSender, - mut command_rx: mpsc::UnboundedReceiver, + stream: ReadHalf, + exit: ExitListener, + greeting_tx: GreetingSender, + write_tx: mpsc::UnboundedSender, + mut command_rx: mpsc::UnboundedReceiver, ) -> ReadHalf where - C: AsyncRead, + C: AsyncRead, { - // this lets us "use up" the greeting sender - let mut greeting_tx = Some(greeting_tx); + // this lets us "use up" the greeting sender + let mut greeting_tx = Some(greeting_tx); - let mut curr_cmd: Option = None; + let mut curr_cmd: Option = None; - // set up framed communication - let codec = ImapCodec::default(); - let mut framed = FramedRead::new(stream, codec); + // set up framed communication + let codec = ImapCodec::default(); + let mut framed = FramedRead::new(stream, codec); - let exit = exit.fuse(); - pin_mut!(exit); - loop { - debug!("READ LOOP ITER"); - let next = framed.next().fuse(); - pin_mut!(next); + let exit = exit.fuse(); + pin_mut!(exit); + loop { + debug!("READ LOOP ITER"); + let next = framed.next().fuse(); + pin_mut!(next); - // only listen for a new command if there isn't one already - let mut cmd_fut = if let Some(ref cmd) = curr_cmd { - debug!("current command: {:?}", cmd); - // if there is one, just make a future that never resolves so it'll - // always pick the other options in the select. - future::pending().boxed().fuse() - } else { - command_rx.recv().boxed().fuse() - }; + // only listen for a new command if there isn't one already + let mut cmd_fut = if let Some(ref cmd) = curr_cmd { + debug!("current command: {:?}", cmd); + // if there is one, just make a future that never resolves so it'll + // always pick the other options in the select. + future::pending().boxed().fuse() + } else { + command_rx.recv().boxed().fuse() + }; - select! { - // read a command from the command list - command = cmd_fut => { - if curr_cmd.is_none() { - if let Some(CommandContainer { ref tag, ref command, .. }) = command { - let _ = write_tx.send(TaggedCommand(tag.clone(), command.clone())); - // let cmd_str = format!("{} {:?}\r\n", tag, cmd); - // write_tx.send(cmd_str); - } - curr_cmd = command; - debug!("new command: {:?}", curr_cmd); + select! { + // read a command from the command list + command = cmd_fut => { + if curr_cmd.is_none() { + if let Some(CommandContainer { ref tag, ref command, .. }) = command { + let _ = write_tx.send(TaggedCommand(tag.clone(), command.clone())); + // let cmd_str = format!("{} {:?}\r\n", tag, cmd); + // write_tx.send(cmd_str); } + curr_cmd = command; + debug!("new command: {:?}", curr_cmd); } - - // new message from the server - resp = next => { - let resp = match resp { - Some(Ok(v)) => v, - a => { error!("failed: {:?}", a); todo!("fuck"); }, - }; - trace!("S>>>C: {:?}", resp); - - // if this is the very first response, then it's a greeting - if let Some(greeting_tx) = greeting_tx.take() { - greeting_tx.send(()).unwrap(); - } - - if let Response::Done(_) = resp { - // since this is the DONE message, clear curr_cmd so another one can be sent - if let Some(CommandContainer { channel, .. }) = curr_cmd.take() { - let _ = channel.send(resp); - // debug!("res0: {:?}", res); - } - } else if let Response::Tagged(_, Condition { status: Status::Ok, ..}) = resp { - // clear curr_cmd so another one can be sent - if let Some(CommandContainer { channel, .. }) = curr_cmd.take() { - let _ = channel.send(resp); - // debug!("res0: {:?}", res); - } - } else if let Some(CommandContainer { channel, .. }) = curr_cmd.as_mut() { - // we got a response from the server for this command, so send it over the - // channel - - // debug!("sending {:?} to tag {}", resp, tag); - let _res = channel.send(resp); - // debug!("res1: {:?}", res); - } - } - - _ = exit => break, } - } - framed.into_inner() + // new message from the server + resp = next => { + let resp = match resp { + Some(Ok(v)) => v, + a => { error!("failed: {:?}", a); todo!("fuck"); }, + }; + trace!("S>>>C: {:?}", resp); + + // if this is the very first response, then it's a greeting + if let Some(greeting_tx) = greeting_tx.take() { + greeting_tx.send(()).unwrap(); + } + + if let Response::Done(_) = resp { + // since this is the DONE message, clear curr_cmd so another one can be sent + if let Some(CommandContainer { channel, .. }) = curr_cmd.take() { + let _ = channel.send(resp); + // debug!("res0: {:?}", res); + } + } else if let Response::Tagged(_, Condition { status: Status::Ok, ..}) = resp { + // clear curr_cmd so another one can be sent + if let Some(CommandContainer { channel, .. }) = curr_cmd.take() { + let _ = channel.send(resp); + // debug!("res0: {:?}", res); + } + } else if let Some(CommandContainer { channel, .. }) = curr_cmd.as_mut() { + // we got a response from the server for this command, so send it over the + // channel + + // debug!("sending {:?} to tag {}", resp, tag); + let _res = channel.send(resp); + // debug!("res1: {:?}", res); + } + } + + _ = exit => break, + } + } + + framed.into_inner() } async fn write_loop( - stream: WriteHalf, - exit_rx: ExitListener, - mut command_rx: mpsc::UnboundedReceiver, + stream: WriteHalf, + exit_rx: ExitListener, + mut command_rx: mpsc::UnboundedReceiver, ) -> WriteHalf where - C: AsyncWrite, + C: AsyncWrite, { - // set up framed communication - // let codec = ImapCodec::default(); - let mut stream = BufWriter::new(stream); - // let mut framed = FramedWrite::new(stream, codec); + // set up framed communication + // let codec = ImapCodec::default(); + let mut stream = BufWriter::new(stream); + // let mut framed = FramedWrite::new(stream, codec); - let mut exit_rx = exit_rx.map_err(|_| ()).shared(); - loop { - let command_fut = command_rx.recv().fuse(); - pin_mut!(command_fut); + let mut exit_rx = exit_rx.map_err(|_| ()).shared(); + loop { + let command_fut = command_rx.recv().fuse(); + pin_mut!(command_fut); - select! { - command = command_fut => { - // TODO: handle errors here - if let Some(command) = command { - let cmd = format_bytes!(b"{} {}\r\n", &*command.0.0, command.1); - debug!("sending command: {:?}", String::from_utf8_lossy(&cmd)); - let _ = stream.write_all(&cmd).await; - let _ = stream.flush().await; - // let _ = framed.send(&command).await; - // let _ = framed.flush().await; - } - // let _ = stream.write_all(line.as_bytes()).await; - // let _ = stream.flush().await; + select! { + command = command_fut => { + // TODO: handle errors here + if let Some(command) = command { + let cmd = format_bytes!(b"{} {}\r\n", &*command.0.0, command.1); + debug!("sending command: {:?}", String::from_utf8_lossy(&cmd)); + let _ = stream.write_all(&cmd).await; + let _ = stream.flush().await; + // let _ = framed.send(&command).await; + // let _ = framed.flush().await; } - _ = exit_rx => break, + // let _ = stream.write_all(line.as_bytes()).await; + // let _ = stream.flush().await; } + _ = exit_rx => break, } + } - // framed.into_inner() - stream.into_inner() + // framed.into_inner() + stream.into_inner() } diff --git a/imap/src/client/mod.rs b/imap/src/client/mod.rs index b68aff0..8192863 100644 --- a/imap/src/client/mod.rs +++ b/imap/src/client/mod.rs @@ -24,7 +24,7 @@ mod inner; mod tls; pub use self::client::{ - ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder, + ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder, }; pub use self::codec::{ImapCodec, TaggedCommand}; diff --git a/imap/src/client/response_stream.rs b/imap/src/client/response_stream.rs index 4ca0d50..ce4cc7d 100644 --- a/imap/src/client/response_stream.rs +++ b/imap/src/client/response_stream.rs @@ -9,45 +9,43 @@ use crate::proto::response::{Response, ResponseDone}; /// A series of responses that follow an pub struct ResponseStream { - pub(crate) inner: mpsc::UnboundedReceiver, + pub(crate) inner: mpsc::UnboundedReceiver, } impl ResponseStream { - /// Retrieves just the DONE item in the stream, discarding the rest - pub async fn done(mut self) -> Result> { - while let Some(resp) = self.inner.recv().await { - if let Response::Done(done) = resp { - return Ok(Some(done)); - } - } - Ok(None) + /// Retrieves just the DONE item in the stream, discarding the rest + pub async fn done(mut self) -> Result> { + while let Some(resp) = self.inner.recv().await { + if let Response::Done(done) = resp { + return Ok(Some(done)); + } } + Ok(None) + } - /// Waits for the entire stream to finish, returning the DONE status and the - /// stream - pub async fn wait( - mut self, - ) -> Result<(Option, Vec)> { - let mut done = None; - let mut vec = Vec::new(); - while let Some(resp) = self.inner.recv().await { - if let Response::Done(d) = resp { - done = Some(d); - break; - } else { - vec.push(resp); - } - } - Ok((done, vec)) + /// Waits for the entire stream to finish, returning the DONE status and the + /// stream + pub async fn wait(mut self) -> Result<(Option, Vec)> { + let mut done = None; + let mut vec = Vec::new(); + while let Some(resp) = self.inner.recv().await { + if let Response::Done(d) = resp { + done = Some(d); + break; + } else { + vec.push(resp); + } } + Ok((done, vec)) + } } impl Stream for ResponseStream { - type Item = Response; - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { - self.inner.poll_recv(cx) - } + type Item = Response; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context, + ) -> Poll> { + self.inner.poll_recv(cx) + } } diff --git a/imap/src/client/tls.rs b/imap/src/client/tls.rs index fbd2ea7..a289d30 100644 --- a/imap/src/client/tls.rs +++ b/imap/src/client/tls.rs @@ -3,43 +3,42 @@ use std::{convert::TryFrom, sync::Arc}; use anyhow::Result; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_rustls::{ - client::TlsStream, - rustls::{ - ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore, - ServerName, - }, - TlsConnector, + client::TlsStream, + rustls::{ + ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore, ServerName, + }, + TlsConnector, }; /// Wraps the given async stream in TLS with the given hostname (required) pub async fn wrap_tls( - c: C, - hostname: impl AsRef, + c: C, + hostname: impl AsRef, ) -> Result> where - C: AsyncRead + AsyncWrite + Unpin, + C: AsyncRead + AsyncWrite + Unpin, { - let server_name = hostname.as_ref(); + let server_name = hostname.as_ref(); - let mut root_store = RootCertStore::empty(); - root_store.add_server_trust_anchors( - webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| { - OwnedTrustAnchor::from_subject_spki_name_constraints( - ta.subject, - ta.spki, - ta.name_constraints, - ) - }), - ); + let mut root_store = RootCertStore::empty(); + root_store.add_server_trust_anchors( + webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| { + OwnedTrustAnchor::from_subject_spki_name_constraints( + ta.subject, + ta.spki, + ta.name_constraints, + ) + }), + ); - let tls_config = RustlsConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_store) - .with_no_client_auth(); + let tls_config = RustlsConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_store) + .with_no_client_auth(); - let tls_config = TlsConnector::from(Arc::new(tls_config)); - let server_name = ServerName::try_from(server_name).unwrap(); - let stream = tls_config.connect(server_name, c).await?; + let tls_config = TlsConnector::from(Arc::new(tls_config)); + let server_name = ServerName::try_from(server_name).unwrap(); + let stream = tls_config.connect(server_name, c).await?; - Ok(stream) + Ok(stream) } diff --git a/imap/src/proto/command.rs b/imap/src/proto/command.rs index f11047f..d4143a5 100644 --- a/imap/src/proto/command.rs +++ b/imap/src/proto/command.rs @@ -1,7 +1,7 @@ use std::{ - collections::HashSet, - io::{self, Write}, - ops::{Bound, RangeBounds}, + collections::HashSet, + io::{self, Write}, + ops::{Bound, RangeBounds}, }; use format_bytes::DisplayBytes; @@ -11,169 +11,169 @@ use super::rfc3501::is_quoted_specials; #[derive(Clone, Debug)] pub enum Command { - // Any state - Capability, - Noop, - Logout, + // Any state + Capability, + Noop, + Logout, - // Not authenticated - Login(CommandLogin), - Starttls, - Authenticate, + // Not authenticated + Login(CommandLogin), + Starttls, + Authenticate, - // Authenticated - Select(CommandSelect), - Examine, - Create, - Delete, - Rename, - Subscribe, - Unsubscribe, - List(CommandList), - Lsub, - Status, - Append, + // Authenticated + Select(CommandSelect), + Examine, + Create, + Delete, + Rename, + Subscribe, + Unsubscribe, + List(CommandList), + Lsub, + Status, + Append, - // Selected - Check, - Close, - Expunge, - Search, - Copy, - Fetch(CommandFetch), - Store, - UidCopy, - UidFetch(CommandFetch), - UidStore, - UidSearch(CommandSearch), + // Selected + Check, + Close, + Expunge, + Search, + Copy, + Fetch(CommandFetch), + Store, + UidCopy, + UidFetch(CommandFetch), + UidStore, + UidSearch(CommandSearch), - // Extensions - #[cfg(feature = "rfc2177")] - Idle, - #[cfg(feature = "rfc2177")] - Done, + // Extensions + #[cfg(feature = "rfc2177")] + Idle, + #[cfg(feature = "rfc2177")] + Done, } impl DisplayBytes for Command { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - let quote = quote_string(b'\x22', b'\\', is_quoted_specials); - match self { - // command-any - Command::Capability => write_bytes!(w, b"CAPABILITY"), - Command::Logout => write_bytes!(w, b"LOGOUT"), - Command::Noop => write_bytes!(w, b"NOOP"), + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + let quote = quote_string(b'\x22', b'\\', is_quoted_specials); + match self { + // command-any + Command::Capability => write_bytes!(w, b"CAPABILITY"), + Command::Logout => write_bytes!(w, b"LOGOUT"), + Command::Noop => write_bytes!(w, b"NOOP"), - // command-nonauth - Command::Login(login) => { - write_bytes!( - w, - b"LOGIN {} {}", - quote(&login.userid), - quote(&login.password) - ) - } - Command::Starttls => write_bytes!(w, b"STARTTLS"), + // command-nonauth + Command::Login(login) => { + write_bytes!( + w, + b"LOGIN {} {}", + quote(&login.userid), + quote(&login.password) + ) + } + Command::Starttls => write_bytes!(w, b"STARTTLS"), - // command-auth - Command::List(list) => { - write_bytes!( - w, - b"LIST {} {}", - quote(&list.reference), - quote(&list.mailbox) - ) - } - Command::Select(select) => { - write_bytes!(w, b"SELECT {}", quote(&select.mailbox)) - } + // command-auth + Command::List(list) => { + write_bytes!( + w, + b"LIST {} {}", + quote(&list.reference), + quote(&list.mailbox) + ) + } + Command::Select(select) => { + write_bytes!(w, b"SELECT {}", quote(&select.mailbox)) + } - // selected - Command::UidFetch(fetch) => write_bytes!(w, b"UID FETCH {}", fetch), + // selected + Command::UidFetch(fetch) => write_bytes!(w, b"UID FETCH {}", fetch), - // extensions - #[cfg(feature = "rfc2177")] - Command::Idle => write_bytes!(w, b"IDLE"), + // extensions + #[cfg(feature = "rfc2177")] + Command::Idle => write_bytes!(w, b"IDLE"), - _ => todo!("unimplemented command: {:?}", self), - } + _ => todo!("unimplemented command: {:?}", self), } + } } #[derive(Clone, Debug)] pub struct CommandFetch { - pub ids: Vec, - pub items: FetchItems, + pub ids: Vec, + pub items: FetchItems, } impl DisplayBytes for CommandFetch { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - for (i, seq) in self.ids.iter().enumerate() { - if i != 0 { - write_bytes!(w, b",")?; - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + for (i, seq) in self.ids.iter().enumerate() { + if i != 0 { + write_bytes!(w, b",")?; + } - match seq { - Sequence::Single(n) => write_bytes!(w, b"{}", n)?, - Sequence::Range(m, n) => write_bytes!(w, b"{}:{}", m, n)?, - } - } - - write_bytes!(w, b" {}", self.items) + match seq { + Sequence::Single(n) => write_bytes!(w, b"{}", n)?, + Sequence::Range(m, n) => write_bytes!(w, b"{}:{}", m, n)?, + } } + + write_bytes!(w, b" {}", self.items) + } } #[derive(Clone, Debug)] pub enum Sequence { - Single(u32), - Range(u32, u32), + Single(u32), + Range(u32, u32), } #[derive(Clone, Debug)] pub struct CommandList { - pub reference: Bytes, - pub mailbox: Bytes, + pub reference: Bytes, + pub mailbox: Bytes, } #[derive(Clone, Derivative)] #[derivative(Debug)] pub struct CommandLogin { - pub userid: Bytes, + pub userid: Bytes, - #[derivative(Debug = "ignore")] - pub password: Bytes, + #[derivative(Debug = "ignore")] + pub password: Bytes, } #[derive(Clone, Debug)] pub struct CommandSearch { - pub criteria: SearchCriteria, + pub criteria: SearchCriteria, } #[derive(Clone, Debug)] pub struct CommandSelect { - pub mailbox: Bytes, + pub mailbox: Bytes, } // #[derive(Clone, Debug)] pub enum FetchItems { - All, - Fast, - Full, - Flags, - Envelope, + All, + Fast, + Full, + Flags, + Envelope, } impl DisplayBytes for FetchItems { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - FetchItems::All => write_bytes!(w, b"ALL"), - FetchItems::Fast => write_bytes!(w, b"FAST"), - FetchItems::Full => write_bytes!(w, b"FULL"), - FetchItems::Flags => write_bytes!(w, b"FLAGS"), - FetchItems::Envelope => write_bytes!(w, b"ENVELOPE"), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + FetchItems::All => write_bytes!(w, b"ALL"), + FetchItems::Fast => write_bytes!(w, b"FAST"), + FetchItems::Full => write_bytes!(w, b"FULL"), + FetchItems::Flags => write_bytes!(w, b"FLAGS"), + FetchItems::Envelope => write_bytes!(w, b"ENVELOPE"), } + } } #[derive(Clone, Debug)] @@ -185,74 +185,74 @@ pub enum FetchAttr {} pub struct SearchCriteria(Vec<(Bound, Bound)>); impl SearchCriteria { - pub fn all() -> Self { - let mut set = Vec::new(); - set.push((Bound::Unbounded, Bound::Unbounded)); - SearchCriteria(set) + pub fn all() -> Self { + let mut set = Vec::new(); + set.push((Bound::Unbounded, Bound::Unbounded)); + SearchCriteria(set) + } + + pub fn contains(&self, n: u32) -> bool { + for range in self.0.iter() { + if range.contains(&n) { + return true; + } } - pub fn contains(&self, n: u32) -> bool { - for range in self.0.iter() { - if range.contains(&n) { - return true; - } - } + false + } - false - } - - pub fn with_range(&mut self, range: impl RangeBounds) -> &mut Self { - let range = (range.start_bound().cloned(), range.end_bound().cloned()); - self.0.push(range); - self - } + pub fn with_range(&mut self, range: impl RangeBounds) -> &mut Self { + let range = (range.start_bound().cloned(), range.end_bound().cloned()); + self.0.push(range); + self + } } impl DisplayBytes for SearchCriteria { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - // TODO: is it faster to batch these up or not? - for (i, range) in self.0.iter().enumerate() { - if i != 0 { - write_bytes!(w, b",")?; - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + // TODO: is it faster to batch these up or not? + for (i, range) in self.0.iter().enumerate() { + if i != 0 { + write_bytes!(w, b",")?; + } - match range.0 { - Bound::Excluded(n) => write_bytes!(w, b"{}", &(n + 1))?, - Bound::Included(n) => write_bytes!(w, b"{}", &n)?, - Bound::Unbounded => write_bytes!(w, b"1")?, - } + match range.0 { + Bound::Excluded(n) => write_bytes!(w, b"{}", &(n + 1))?, + Bound::Included(n) => write_bytes!(w, b"{}", &n)?, + Bound::Unbounded => write_bytes!(w, b"1")?, + } - write_bytes!(w, b":")?; + write_bytes!(w, b":")?; - match range.1 { - Bound::Excluded(n) => write_bytes!(w, b"{}", &(n - 1))?, - Bound::Included(n) => write_bytes!(w, b"{}", &n)?, - Bound::Unbounded => write_bytes!(w, b"*")?, - } - } - - Ok(()) + match range.1 { + Bound::Excluded(n) => write_bytes!(w, b"{}", &(n - 1))?, + Bound::Included(n) => write_bytes!(w, b"{}", &n)?, + Bound::Unbounded => write_bytes!(w, b"*")?, + } } + + Ok(()) + } } #[cfg(test)] mod tests { - use super::*; + use super::*; - #[test] - fn display_search_criteria() { - // TODO: is there a trivial case? - assert_eq!(format_bytes!(b"{}", SearchCriteria::all()), b"1:*"); + #[test] + fn display_search_criteria() { + // TODO: is there a trivial case? + assert_eq!(format_bytes!(b"{}", SearchCriteria::all()), b"1:*"); - assert_eq!( - format_bytes!( - b"{}", - SearchCriteria(vec![ - (Bound::Unbounded, Bound::Included(4)), - (Bound::Excluded(5), Bound::Unbounded), - ]) - ), - b"1:4,6:*" - ); - } + assert_eq!( + format_bytes!( + b"{}", + SearchCriteria(vec![ + (Bound::Unbounded, Bound::Included(4)), + (Bound::Excluded(5), Bound::Unbounded), + ]) + ), + b"1:4,6:*" + ); + } } diff --git a/imap/src/proto/response.rs b/imap/src/proto/response.rs index 4dcef75..bbb51be 100644 --- a/imap/src/proto/response.rs +++ b/imap/src/proto/response.rs @@ -17,9 +17,9 @@ pub type Atom = Bytes; pub struct Tag(pub Bytes); impl DisplayBytes for Tag { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - write_bytes!(w, b"{}", self.0) - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + write_bytes!(w, b"{}", self.0) + } } #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -27,366 +27,366 @@ pub struct Timestamp(pub(crate) DateTime); #[cfg(feature = "fuzzing")] impl<'a> Arbitrary<'a> for Timestamp { - fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result { - let (y, m, d, H, M, S) = ( - // TODO: year range? - u.int_in_range(-4000..=4000i32)?, - u.int_in_range(1..=12u32)?, - u.int_in_range(1..=28u32)?, - u.int_in_range(0..=23u32)?, - u.int_in_range(0..=59u32)?, - u.int_in_range(0..=59u32)?, - ); - println!("{:?}", (y, m, d, H, M, S)); - Ok(Timestamp( - // TODO: introduce offset - FixedOffset::west(0).ymd(y, m, d).and_hms(H, M, S), - )) - } + fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result { + let (y, m, d, H, M, S) = ( + // TODO: year range? + u.int_in_range(-4000..=4000i32)?, + u.int_in_range(1..=12u32)?, + u.int_in_range(1..=28u32)?, + u.int_in_range(0..=23u32)?, + u.int_in_range(0..=59u32)?, + u.int_in_range(0..=59u32)?, + ); + println!("{:?}", (y, m, d, H, M, S)); + Ok(Timestamp( + // TODO: introduce offset + FixedOffset::west(0).ymd(y, m, d).and_hms(H, M, S), + )) + } } #[derive(Debug)] #[non_exhaustive] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum Response { - Capabilities(Vec), - Continue(ResponseText), - Condition(Condition), - Done(ResponseDone), - MailboxData(MailboxData), - Fetch(u32, Vec), - Expunge(u32), - Fatal(Condition), - Tagged(Tag, Condition), + Capabilities(Vec), + Continue(ResponseText), + Condition(Condition), + Done(ResponseDone), + MailboxData(MailboxData), + Fetch(u32, Vec), + Expunge(u32), + Fatal(Condition), + Tagged(Tag, Condition), } impl DisplayBytes for Response { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - #[allow(unreachable_patterns)] - match self { - Response::Capabilities(caps) => { - write_bytes!(w, b"CAPABILITY")?; - for cap in caps { - write_bytes!(w, b" {}", cap)?; - } - Ok(()) - } - Response::Continue(cont) => write_bytes!(w, b"+ {}\r\n", cont), - Response::Condition(cond) => write_bytes!(w, b"* {}\r\n", cond), - Response::Done(_) => write_bytes!(w, b""), - Response::MailboxData(data) => write_bytes!(w, b"* {}\r\n", data), - Response::Fetch(n, attrs) => { - write_bytes!(w, b"{} FETCH (", n)?; - for (i, attr) in attrs.iter().enumerate() { - if i != 0 { - write_bytes!(w, b" ")?; - } - write_bytes!(w, b"{}", attr)?; - } - write_bytes!(w, b")\r\n") - } - Response::Expunge(n) => write_bytes!(w, b"{} EXPUNGE\r\n", n), - Response::Fatal(cond) => write_bytes!(w, b"* {}\r\n", cond), - Response::Tagged(tag, cond) => { - write_bytes!(w, b"{} {}\r\n", tag, cond) - } - _ => todo!(), + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + #[allow(unreachable_patterns)] + match self { + Response::Capabilities(caps) => { + write_bytes!(w, b"CAPABILITY")?; + for cap in caps { + write_bytes!(w, b" {}", cap)?; } + Ok(()) + } + Response::Continue(cont) => write_bytes!(w, b"+ {}\r\n", cont), + Response::Condition(cond) => write_bytes!(w, b"* {}\r\n", cond), + Response::Done(_) => write_bytes!(w, b""), + Response::MailboxData(data) => write_bytes!(w, b"* {}\r\n", data), + Response::Fetch(n, attrs) => { + write_bytes!(w, b"{} FETCH (", n)?; + for (i, attr) in attrs.iter().enumerate() { + if i != 0 { + write_bytes!(w, b" ")?; + } + write_bytes!(w, b"{}", attr)?; + } + write_bytes!(w, b")\r\n") + } + Response::Expunge(n) => write_bytes!(w, b"{} EXPUNGE\r\n", n), + Response::Fatal(cond) => write_bytes!(w, b"* {}\r\n", cond), + Response::Tagged(tag, cond) => { + write_bytes!(w, b"{} {}\r\n", tag, cond) + } + _ => todo!(), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub struct ResponseText { - pub code: Option, - pub info: Bytes, + pub code: Option, + pub info: Bytes, } impl DisplayBytes for ResponseText { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - if let Some(code) = &self.code { - write_bytes!(w, b"[{}] ", code)?; - } - write_bytes!(w, b"{}", self.info) + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + if let Some(code) = &self.code { + write_bytes!(w, b"[{}] ", code)?; } + write_bytes!(w, b"{}", self.info) + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum MessageAttribute { - BodySection, - BodyStructure, - Envelope(Envelope), - Flags(Vec), - InternalDate(Timestamp), - ModSeq(u64), // RFC 4551, section 3.3.2 - Rfc822(Option), - Rfc822Header(Option), - Rfc822Size(u32), - Rfc822Text(Option), - Uid(u32), + BodySection, + BodyStructure, + Envelope(Envelope), + Flags(Vec), + InternalDate(Timestamp), + ModSeq(u64), // RFC 4551, section 3.3.2 + Rfc822(Option), + Rfc822Header(Option), + Rfc822Size(u32), + Rfc822Text(Option), + Uid(u32), } impl DisplayBytes for MessageAttribute { - fn display_bytes(&self, _: &mut dyn Write) -> io::Result<()> { - match self { - _ => todo!(), - } + fn display_bytes(&self, _: &mut dyn Write) -> io::Result<()> { + match self { + _ => todo!(), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub struct Envelope { - pub date: Option, - pub subject: Option, - pub from: Option>, - pub sender: Option>, - pub reply_to: Option>, - pub to: Option>, - pub cc: Option>, - pub bcc: Option>, - pub in_reply_to: Option, - pub message_id: Option, + pub date: Option, + pub subject: Option, + pub from: Option>, + pub sender: Option>, + pub reply_to: Option>, + pub to: Option>, + pub cc: Option>, + pub bcc: Option>, + pub in_reply_to: Option, + pub message_id: Option, } #[derive(Debug, PartialEq, Eq)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub struct Address { - pub name: Option, - pub adl: Option, - pub mailbox: Option, - pub host: Option, + pub name: Option, + pub adl: Option, + pub mailbox: Option, + pub host: Option, } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub struct ResponseDone { - pub tag: Tag, - pub status: Status, - pub code: Option, - pub info: Option, + pub tag: Tag, + pub status: Status, + pub code: Option, + pub info: Option, } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub struct Condition { - pub status: Status, - pub code: Option, - pub info: Bytes, + pub status: Status, + pub code: Option, + pub info: Bytes, } impl DisplayBytes for Condition { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - write_bytes!(w, b"{} ", self.status)?; - if let Some(code) = &self.code { - write_bytes!(w, b"[{}] ", code)?; - } - write_bytes!(w, b"{}", self.info) + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + write_bytes!(w, b"{} ", self.status)?; + if let Some(code) = &self.code { + write_bytes!(w, b"[{}] ", code)?; } + write_bytes!(w, b"{}", self.info) + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum Status { - Ok, - No, - Bad, - PreAuth, - Bye, + Ok, + No, + Bad, + PreAuth, + Bye, } impl DisplayBytes for Status { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - Status::Ok => write_bytes!(w, b"OK"), - Status::No => write_bytes!(w, b"NO"), - Status::Bad => write_bytes!(w, b"BAD"), - Status::PreAuth => write_bytes!(w, b"PREAUTH"), - Status::Bye => write_bytes!(w, b"BYE"), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + Status::Ok => write_bytes!(w, b"OK"), + Status::No => write_bytes!(w, b"NO"), + Status::Bad => write_bytes!(w, b"BAD"), + Status::PreAuth => write_bytes!(w, b"PREAUTH"), + Status::Bye => write_bytes!(w, b"BYE"), } + } } #[derive(Debug)] #[non_exhaustive] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum ResponseCode { - Alert, - BadCharset(Option>), - Capabilities(Vec), - HighestModSeq(u64), // RFC 4551, section 3.1.1 - Parse, - PermanentFlags(Vec), - ReadOnly, - ReadWrite, - TryCreate, - UidNext(u32), - UidValidity(u32), - Unseen(u32), - AppendUid(u32, Vec), - CopyUid(u32, Vec, Vec), - UidNotSticky, - Other(Bytes, Option), + Alert, + BadCharset(Option>), + Capabilities(Vec), + HighestModSeq(u64), // RFC 4551, section 3.1.1 + Parse, + PermanentFlags(Vec), + ReadOnly, + ReadWrite, + TryCreate, + UidNext(u32), + UidValidity(u32), + Unseen(u32), + AppendUid(u32, Vec), + CopyUid(u32, Vec, Vec), + UidNotSticky, + Other(Bytes, Option), } impl DisplayBytes for ResponseCode { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - ResponseCode::Alert => write_bytes!(w, b"ALERT"), - _ => todo!(), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + ResponseCode::Alert => write_bytes!(w, b"ALERT"), + _ => todo!(), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum UidSetMember { - UidRange(RangeInclusive), - Uid(u32), + UidRange(RangeInclusive), + Uid(u32), } #[derive(Clone, Debug, PartialEq, Eq, Hash)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum Capability { - Imap4rev1, - Auth(Atom), - Atom(Atom), + Imap4rev1, + Auth(Atom), + Atom(Atom), } impl DisplayBytes for Capability { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - Capability::Imap4rev1 => write_bytes!(w, b"IMAP4rev1"), - Capability::Auth(a) => write_bytes!(w, b"AUTH={}", a), - Capability::Atom(a) => write_bytes!(w, b"{}", a), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + Capability::Imap4rev1 => write_bytes!(w, b"IMAP4rev1"), + Capability::Auth(a) => write_bytes!(w, b"AUTH={}", a), + Capability::Atom(a) => write_bytes!(w, b"{}", a), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum MailboxData { - Flags(Vec), - List(MailboxList), - Lsub(MailboxList), - Search(Vec), - Status, - Exists(u32), - Recent(u32), + Flags(Vec), + List(MailboxList), + Lsub(MailboxList), + Search(Vec), + Status, + Exists(u32), + Recent(u32), } impl DisplayBytes for MailboxData { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - MailboxData::Flags(flags) => { - write_bytes!(w, b"(")?; - for (i, flag) in flags.iter().enumerate() { - if i != 0 { - write_bytes!(w, b" ")?; - } - write_bytes!(w, b"{}", flag)?; - } - write_bytes!(w, b")") - } - MailboxData::List(list) => write_bytes!(w, b"{}", list), - MailboxData::Exists(n) => write_bytes!(w, b"{} EXISTS", n), - MailboxData::Recent(n) => write_bytes!(w, b"{} RECENT", n), - _ => todo!(), + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + MailboxData::Flags(flags) => { + write_bytes!(w, b"(")?; + for (i, flag) in flags.iter().enumerate() { + if i != 0 { + write_bytes!(w, b" ")?; + } + write_bytes!(w, b"{}", flag)?; } + write_bytes!(w, b")") + } + MailboxData::List(list) => write_bytes!(w, b"{}", list), + MailboxData::Exists(n) => write_bytes!(w, b"{} EXISTS", n), + MailboxData::Recent(n) => write_bytes!(w, b"{} RECENT", n), + _ => todo!(), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum Mailbox { - Inbox, - Name(Bytes), + Inbox, + Name(Bytes), } impl DisplayBytes for Mailbox { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - Mailbox::Inbox => write_bytes!(w, b"INBOX"), - Mailbox::Name(b) => write_bytes!(w, b"{}", b), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + Mailbox::Inbox => write_bytes!(w, b"INBOX"), + Mailbox::Name(b) => write_bytes!(w, b"{}", b), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum Flag { - Answered, - Flagged, - Deleted, - Seen, - Draft, - Recent, - Keyword(Atom), - Extension(Atom), - SpecialCreate, + Answered, + Flagged, + Deleted, + Seen, + Draft, + Recent, + Keyword(Atom), + Extension(Atom), + SpecialCreate, } impl DisplayBytes for Flag { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - Flag::Answered => write_bytes!(w, b"\\Answered"), - Flag::Flagged => write_bytes!(w, b"\\Flagged"), - Flag::Deleted => write_bytes!(w, b"\\Deleted"), - Flag::Seen => write_bytes!(w, b"\\Seen"), - Flag::Draft => write_bytes!(w, b"\\Draft"), - Flag::Recent => write_bytes!(w, b"\\Recent"), - Flag::Keyword(atom) => write_bytes!(w, b"{}", atom), - Flag::Extension(atom) => write_bytes!(w, b"\\{}", atom), - Flag::SpecialCreate => write_bytes!(w, b"\\*"), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + Flag::Answered => write_bytes!(w, b"\\Answered"), + Flag::Flagged => write_bytes!(w, b"\\Flagged"), + Flag::Deleted => write_bytes!(w, b"\\Deleted"), + Flag::Seen => write_bytes!(w, b"\\Seen"), + Flag::Draft => write_bytes!(w, b"\\Draft"), + Flag::Recent => write_bytes!(w, b"\\Recent"), + Flag::Keyword(atom) => write_bytes!(w, b"{}", atom), + Flag::Extension(atom) => write_bytes!(w, b"\\{}", atom), + Flag::SpecialCreate => write_bytes!(w, b"\\*"), } + } } #[derive(Debug)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub struct MailboxList { - pub flags: Vec, - pub delimiter: Option, - pub mailbox: Mailbox, + pub flags: Vec, + pub delimiter: Option, + pub mailbox: Mailbox, } impl DisplayBytes for MailboxList { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - write_bytes!(w, b"(")?; - for (i, flag) in self.flags.iter().enumerate() { - if i != 0 { - write_bytes!(w, b" ")?; - } - write_bytes!(w, b"{}", flag)?; - } - write_bytes!(w, b") ")?; - match self.delimiter { - Some(d) => write_bytes!(w, b"\"{}\"", d)?, - None => write_bytes!(w, b"NIL")?, - } - write_bytes!(w, b" {}", self.mailbox) + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + write_bytes!(w, b"(")?; + for (i, flag) in self.flags.iter().enumerate() { + if i != 0 { + write_bytes!(w, b" ")?; + } + write_bytes!(w, b"{}", flag)?; } + write_bytes!(w, b") ")?; + match self.delimiter { + Some(d) => write_bytes!(w, b"\"{}\"", d)?, + None => write_bytes!(w, b"NIL")?, + } + write_bytes!(w, b" {}", self.mailbox) + } } #[derive(Debug, PartialEq, Eq)] #[cfg_attr(feature = "fuzzing", derive(Arbitrary))] pub enum MailboxListFlag { - NoInferiors, - NoSelect, - Marked, - Unmarked, - Extension(Atom), + NoInferiors, + NoSelect, + Marked, + Unmarked, + Extension(Atom), } impl DisplayBytes for MailboxListFlag { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - match self { - MailboxListFlag::NoInferiors => write_bytes!(w, b"\\Noinferiors"), - MailboxListFlag::NoSelect => write_bytes!(w, b"\\NoSelect"), - MailboxListFlag::Marked => write_bytes!(w, b"\\Marked"), - MailboxListFlag::Unmarked => write_bytes!(w, b"\\Unmarked"), - MailboxListFlag::Extension(a) => write_bytes!(w, b"\\{}", a), - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + match self { + MailboxListFlag::NoInferiors => write_bytes!(w, b"\\Noinferiors"), + MailboxListFlag::NoSelect => write_bytes!(w, b"\\NoSelect"), + MailboxListFlag::Marked => write_bytes!(w, b"\\Marked"), + MailboxListFlag::Unmarked => write_bytes!(w, b"\\Unmarked"), + MailboxListFlag::Extension(a) => write_bytes!(w, b"\\{}", a), } + } } diff --git a/imap/src/proto/rfc2234.rs b/imap/src/proto/rfc2234.rs index 38cc070..c67adda 100644 --- a/imap/src/proto/rfc2234.rs +++ b/imap/src/proto/rfc2234.rs @@ -4,9 +4,9 @@ //! Grammar from use nom::{ - branch::alt, - multi::many0, - sequence::{pair, preceded}, + branch::alt, + multi::many0, + sequence::{pair, preceded}, }; use panorama_proto_common::{byte, satisfy, skip}; diff --git a/imap/src/proto/rfc3501/mod.rs b/imap/src/proto/rfc3501/mod.rs index c3b76ba..69e67f3 100644 --- a/imap/src/proto/rfc3501/mod.rs +++ b/imap/src/proto/rfc3501/mod.rs @@ -8,30 +8,30 @@ pub mod tests; use chrono::{FixedOffset, NaiveTime, TimeZone}; use nom::{ - branch::alt, - combinator::{map, map_res, opt, verify}, - multi::{count, many0, many1, many_m_n}, - sequence::{delimited, pair, preceded, separated_pair, terminated, tuple}, + branch::alt, + combinator::{map, map_res, opt, verify}, + multi::{count, many0, many1, many_m_n}, + sequence::{delimited, pair, preceded, separated_pair, terminated, tuple}, }; use panorama_proto_common::{ - byte, never, parse_num, satisfy, tagi, take, take_while1, Bytes, VResult, + byte, never, parse_num, satisfy, tagi, take, take_while1, Bytes, VResult, }; use super::response::{ - Address, Atom, Capability, Condition, Envelope, Flag, Mailbox, MailboxData, - MailboxList, MailboxListFlag, MessageAttribute, Response, ResponseCode, - ResponseText, Status, Tag, Timestamp, + Address, Atom, Capability, Condition, Envelope, Flag, Mailbox, MailboxData, + MailboxList, MailboxListFlag, MessageAttribute, Response, ResponseCode, + ResponseText, Status, Tag, Timestamp, }; use super::rfc2234::{ - is_char, is_cr, is_ctl, is_digit, is_dquote, is_lf, is_sp, CRLF, DIGIT, - DQUOTE, SP, + is_char, is_cr, is_ctl, is_digit, is_dquote, is_lf, is_sp, CRLF, DIGIT, + DQUOTE, SP, }; /// Grammar rule `T / nil` produces `Option` macro_rules! opt_nil { - ($t:expr) => { - alt((map($t, Some), map(crate::proto::rfc3501::nil, |_| None))) - }; + ($t:expr) => { + alt((map($t, Some), map(crate::proto::rfc3501::nil, |_| None))) + }; } rule!(pub address : Address => map(paren!(tuple(( @@ -52,7 +52,7 @@ rule!(pub addr_name : Option => nstring); rule!(pub astring : Bytes => alt((take_while1(is_astring_char), string))); pub(crate) fn is_astring_char(c: u8) -> bool { - is_atom_char(c) || is_resp_specials(c) + is_atom_char(c) || is_resp_specials(c) } rule!(pub ASTRING_CHAR : u8 => alt((ATOM_CHAR, resp_specials))); @@ -66,14 +66,14 @@ pub(crate) fn is_atom_char(c: u8) -> bool { is_char(c) && !is_atom_specials(c) } rule!(pub ATOM_CHAR : u8 => satisfy(is_atom_char)); pub(crate) fn is_atom_specials(c: u8) -> bool { - c == b'(' - || c == b')' - || c == b'{' - || is_sp(c) - || is_ctl(c) - || is_list_wildcards(c) - || is_quoted_specials(c) - || is_resp_specials(c) + c == b'(' + || c == b')' + || c == b'{' + || is_sp(c) + || is_ctl(c) + || is_list_wildcards(c) + || is_quoted_specials(c) + || is_resp_specials(c) } rule!(pub atom_specials : u8 => satisfy(is_atom_specials)); @@ -215,11 +215,11 @@ rule!(pub list_wildcards : u8 => satisfy(is_list_wildcards)); // determined to exceed a certain threshold so we don't have insane amounts of // data in memory pub fn literal(i: Bytes) -> VResult { - let mut length_of = - terminated(delimited(byte(b'{'), number, byte(b'}')), CRLF); - let (i, length) = length_of(i)?; - debug!("length is: {:?}", length); - map(take(length), Bytes::from)(i) + let mut length_of = + terminated(delimited(byte(b'{'), number, byte(b'}')), CRLF); + let (i, length) = length_of(i)?; + debug!("length is: {:?}", length); + map(take(length), Bytes::from)(i) } rule!(pub mailbox : Mailbox => alt(( @@ -292,7 +292,7 @@ rule!(pub nil : Bytes => tagi(b"NIL")); rule!(pub nstring : Option => opt_nil!(string)); pub(crate) fn number(i: Bytes) -> VResult { - map_res(take_while1(is_digit), parse_num::<_, u32>)(i) + map_res(take_while1(is_digit), parse_num::<_, u32>)(i) } rule!(pub nz_number : u32 => verify(number, |n| *n != 0)); @@ -378,7 +378,7 @@ rule!(pub tag : Tag => map(take_while1(is_tag_char), Tag)); rule!(pub text : Bytes => map(take_while1(is_text_char), Bytes::from)); pub(crate) fn is_text_char(c: u8) -> bool { - is_char(c) && !is_cr(c) && !is_lf(c) + is_char(c) && !is_cr(c) && !is_lf(c) } rule!(pub TEXT_CHAR : u8 => satisfy(is_text_char)); diff --git a/imap/src/proto/rfc3501/tests.rs b/imap/src/proto/rfc3501/tests.rs index 82273f3..28a507d 100644 --- a/imap/src/proto/rfc3501/tests.rs +++ b/imap/src/proto/rfc3501/tests.rs @@ -10,114 +10,114 @@ use crate::proto::rfc3501::*; #[test] fn test_literal() { - assert_eq!( - literal(Bytes::from(b"{13}\r\nHello, world!")) - .unwrap() - .1 - .as_ref(), - b"Hello, world!" - ); + assert_eq!( + literal(Bytes::from(b"{13}\r\nHello, world!")) + .unwrap() + .1 + .as_ref(), + b"Hello, world!" + ); } #[test] fn from_afl() { - let _ = response(Bytes::from(b"* 4544444444 444 ")); + let _ = response(Bytes::from(b"* 4544444444 444 ")); - let _ = response(Bytes::from(b"* 8045 FETCH (UID 8225 ENVELOPE (\"Sun, 21 Mar 2021 18:44:10 -0700\" \"SUBJECT\" ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"norepjy\" NIL \"noreply\" \"example.com\")) ((\"NAME\" NIL \"user\" \"gmail.com\")) NIL NIL NIL \"\") FLAGS () INTERNALDATE \"22-Mar-2021 01:64:12 \x7f0000\" RFC822.SIZE 13503)".to_vec())); + let _ = response(Bytes::from(b"* 8045 FETCH (UID 8225 ENVELOPE (\"Sun, 21 Mar 2021 18:44:10 -0700\" \"SUBJECT\" ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"norepjy\" NIL \"noreply\" \"example.com\")) ((\"NAME\" NIL \"user\" \"gmail.com\")) NIL NIL NIL \"\") FLAGS () INTERNALDATE \"22-Mar-2021 01:64:12 \x7f0000\" RFC822.SIZE 13503)".to_vec())); } #[test] fn test_date() { - assert_eq!(date_year(Bytes::from(b"2021")).unwrap().1, 2021); + assert_eq!(date_year(Bytes::from(b"2021")).unwrap().1, 2021); - assert_eq!( - date_time(Bytes::from(b"\"22-Mar-2021 01:44:12 +0000\"")) - .unwrap() - .1, - Timestamp(FixedOffset::east(0).ymd(2021, 3, 22).and_hms(1, 44, 12)), - ); + assert_eq!( + date_time(Bytes::from(b"\"22-Mar-2021 01:44:12 +0000\"")) + .unwrap() + .1, + Timestamp(FixedOffset::east(0).ymd(2021, 3, 22).and_hms(1, 44, 12)), + ); } #[test] fn test_fetch() { - assert!(flag_list(Bytes::from(b"()")).unwrap().1.is_empty()); + assert!(flag_list(Bytes::from(b"()")).unwrap().1.is_empty()); - use nom::Err; - use panorama_proto_common::convert_error; - let buf = Bytes::from(b"* 8045 FETCH (UID 8225 ENVELOPE (\"Sun, 21 Mar 2021 18:44:10 -0700\" \"SUBJECT\" ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"noreply\" NIL \"noreply\" \"example.com\")) ((\"NAME\" NIL \"user\" \"gmail.com\")) NIL NIL NIL \"\") FLAGS () INTERNALDATE \"22-Mar-2021 01:44:12 +0000\" RFC822.SIZE 13503)\r\n".to_vec()); - let res = response(buf.clone()); - println!("response: {:?}", res); - assert!(matches!(res.unwrap().1, Response::Fetch(8045, _))); + use nom::Err; + use panorama_proto_common::convert_error; + let buf = Bytes::from(b"* 8045 FETCH (UID 8225 ENVELOPE (\"Sun, 21 Mar 2021 18:44:10 -0700\" \"SUBJECT\" ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"SENDER\" NIL \"sender\" \"example.com\")) ((\"noreply\" NIL \"noreply\" \"example.com\")) ((\"NAME\" NIL \"user\" \"gmail.com\")) NIL NIL NIL \"\") FLAGS () INTERNALDATE \"22-Mar-2021 01:44:12 +0000\" RFC822.SIZE 13503)\r\n".to_vec()); + let res = response(buf.clone()); + println!("response: {:?}", res); + assert!(matches!(res.unwrap().1, Response::Fetch(8045, _))); } #[test] fn test_capabilities() { - assert_eq!( - capability(Bytes::from(b"UNSELECT\r\n")).unwrap().1, - Capability::Atom(Bytes::from(b"UNSELECT")) - ); + assert_eq!( + capability(Bytes::from(b"UNSELECT\r\n")).unwrap().1, + Capability::Atom(Bytes::from(b"UNSELECT")) + ); - // trivial case - assert_eq!( - capability_data(Bytes::from(b"CAPABILITY IMAP4rev1\r\n")) - .unwrap() - .1, - vec![] - ); + // trivial case + assert_eq!( + capability_data(Bytes::from(b"CAPABILITY IMAP4rev1\r\n")) + .unwrap() + .1, + vec![] + ); - assert_eq!( - capability_data(Bytes::from( - b"CAPABILITY UNSELECT IMAP4rev1 NAMESPACE\r\n" - )) - .unwrap() - .1, - vec![ - Capability::Atom(Bytes::from(b"UNSELECT")), - Capability::Atom(Bytes::from(b"NAMESPACE")) - ] - ); + assert_eq!( + capability_data(Bytes::from( + b"CAPABILITY UNSELECT IMAP4rev1 NAMESPACE\r\n" + )) + .unwrap() + .1, + vec![ + Capability::Atom(Bytes::from(b"UNSELECT")), + Capability::Atom(Bytes::from(b"NAMESPACE")) + ] + ); } #[test] fn test_list() { - assert!(matches!( - response(Bytes::from( - b"* LIST (\\HasChildren \\UnMarked \\Trash) \".\" Trash\r\n", - )) - .unwrap() - .1, - Response::MailboxData(MailboxData::List(MailboxList { - flags, - delimiter: Some(b'.'), - mailbox: Mailbox::Name(mailbox), - }) ) if flags.len() == 3 && - flags.contains(&MailboxListFlag::Extension(Atom::from(b"HasChildren"))) && - flags.contains(&MailboxListFlag::Extension(Atom::from(b"UnMarked"))) && - flags.contains(&MailboxListFlag::Extension(Atom::from(b"Trash"))) && - &*mailbox == &b"Trash"[..] - )); + assert!(matches!( + response(Bytes::from( + b"* LIST (\\HasChildren \\UnMarked \\Trash) \".\" Trash\r\n", + )) + .unwrap() + .1, + Response::MailboxData(MailboxData::List(MailboxList { + flags, + delimiter: Some(b'.'), + mailbox: Mailbox::Name(mailbox), + }) ) if flags.len() == 3 && + flags.contains(&MailboxListFlag::Extension(Atom::from(b"HasChildren"))) && + flags.contains(&MailboxListFlag::Extension(Atom::from(b"UnMarked"))) && + flags.contains(&MailboxListFlag::Extension(Atom::from(b"Trash"))) && + &*mailbox == &b"Trash"[..] + )); } #[test] fn test_gmail_is_shit() { - // FUCK YOU GMAIL! - let res = response(Bytes::from(b"* OK [HIGHESTMODSEQ 694968]\r\n")) - .unwrap() - .1; - assert!(matches!(res, - Response::Condition(Condition { - status: Status::Ok, - code: Some(ResponseCode::Other(c, Some(d))), - info: e, - }) - if c == Bytes::from(b"HIGHESTMODSEQ") && d == Bytes::from(b"694968") && e.is_empty() - )); + // FUCK YOU GMAIL! + let res = response(Bytes::from(b"* OK [HIGHESTMODSEQ 694968]\r\n")) + .unwrap() + .1; + assert!(matches!(res, + Response::Condition(Condition { + status: Status::Ok, + code: Some(ResponseCode::Other(c, Some(d))), + info: e, + }) + if c == Bytes::from(b"HIGHESTMODSEQ") && d == Bytes::from(b"694968") && e.is_empty() + )); - let res = resp_text(Bytes::from(b"[PERMANENTFLAGS (\\Answered \\Flagged \\Draft \\Deleted \\Seen $NotPhishing $Phishing \\*)] Flags permitted.\r".to_vec())).unwrap().1; - eprintln!("{:?}", res); - eprintln!(); + let res = resp_text(Bytes::from(b"[PERMANENTFLAGS (\\Answered \\Flagged \\Draft \\Deleted \\Seen $NotPhishing $Phishing \\*)] Flags permitted.\r".to_vec())).unwrap().1; + eprintln!("{:?}", res); + eprintln!(); - let res = response(Bytes::from(b"* OK [PERMANENTFLAGS (\\Answered \\Flagged \\Draft \\Deleted \\Seen $NotPhishing $Phishing \\*)] Flags permitted.\r\n".to_vec())).unwrap().1; - eprintln!("{:?}", res); - assert!(matches!(res, Response::Condition(_))); + let res = response(Bytes::from(b"* OK [PERMANENTFLAGS (\\Answered \\Flagged \\Draft \\Deleted \\Seen $NotPhishing $Phishing \\*)] Flags permitted.\r\n".to_vec())).unwrap().1; + eprintln!("{:?}", res); + assert!(matches!(res, Response::Condition(_))); } diff --git a/proto-common/src/bytes.rs b/proto-common/src/bytes.rs index 120e6a6..a3e6ea4 100644 --- a/proto-common/src/bytes.rs +++ b/proto-common/src/bytes.rs @@ -3,8 +3,8 @@ use std::ops::{Deref, RangeBounds}; use format_bytes::DisplayBytes; use nom::{ - error::{ErrorKind, ParseError}, - CompareResult, Err, HexDisplay, IResult, InputLength, Needed, + error::{ErrorKind, ParseError}, + CompareResult, Err, HexDisplay, IResult, InputLength, Needed, }; #[cfg(feature = "fuzzing")] @@ -16,254 +16,254 @@ pub struct Bytes(bytes::Bytes); #[cfg(feature = "fuzzing")] impl<'a> Arbitrary<'a> for Bytes { - fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result { - Ok(Bytes::from(>::arbitrary(u)?)) - } + fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result { + Ok(Bytes::from(>::arbitrary(u)?)) + } } impl Bytes { - /// Length of the internal `Bytes`. - /// - /// # Example - /// - /// ``` - /// # use panorama_proto_common::Bytes; - /// // the &, [..] is needed since &[u8; N] doesn't coerce automatically to &[u8] - /// let b = Bytes::from(&b"hello"[..]); - /// assert_eq!(b.len(), 5); - /// ``` - pub fn len(&self) -> usize { self.0.len() } + /// Length of the internal `Bytes`. + /// + /// # Example + /// + /// ``` + /// # use panorama_proto_common::Bytes; + /// // the &, [..] is needed since &[u8; N] doesn't coerce automatically to &[u8] + /// let b = Bytes::from(&b"hello"[..]); + /// assert_eq!(b.len(), 5); + /// ``` + pub fn len(&self) -> usize { self.0.len() } - /// Consumes the wrapper, returning the original `Bytes`. - pub fn inner(self) -> bytes::Bytes { self.0 } + /// Consumes the wrapper, returning the original `Bytes`. + pub fn inner(self) -> bytes::Bytes { self.0 } } impl DisplayBytes for Bytes { - fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { - w.write(&*self.0).map(|_| ()) - } + fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { + w.write(&*self.0).map(|_| ()) + } } static CHARS: &[u8] = b"0123456789abcdef"; impl HexDisplay for Bytes { - fn to_hex(&self, chunk_size: usize) -> String { - self.to_hex_from(chunk_size, 0) - } + fn to_hex(&self, chunk_size: usize) -> String { + self.to_hex_from(chunk_size, 0) + } - fn to_hex_from(&self, chunk_size: usize, from: usize) -> String { - let mut v = Vec::with_capacity(self.len() * 3); - let mut i = from; - for chunk in self.chunks(chunk_size) { - let s = format!("{:08x}", i); - for &ch in s.as_bytes().iter() { - v.push(ch); - } - v.push(b'\t'); + fn to_hex_from(&self, chunk_size: usize, from: usize) -> String { + let mut v = Vec::with_capacity(self.len() * 3); + let mut i = from; + for chunk in self.chunks(chunk_size) { + let s = format!("{:08x}", i); + for &ch in s.as_bytes().iter() { + v.push(ch); + } + v.push(b'\t'); - i += chunk_size; + i += chunk_size; - for &byte in chunk { - v.push(CHARS[(byte >> 4) as usize]); - v.push(CHARS[(byte & 0xf) as usize]); - v.push(b' '); - } - if chunk_size > chunk.len() { - for _ in 0..(chunk_size - chunk.len()) { - v.push(b' '); - v.push(b' '); - v.push(b' '); - } - } - v.push(b'\t'); - - for &byte in chunk { - if (byte >= 32 && byte <= 126) || byte >= 128 { - v.push(byte); - } else { - v.push(b'.'); - } - } - v.push(b'\n'); + for &byte in chunk { + v.push(CHARS[(byte >> 4) as usize]); + v.push(CHARS[(byte & 0xf) as usize]); + v.push(b' '); + } + if chunk_size > chunk.len() { + for _ in 0..(chunk_size - chunk.len()) { + v.push(b' '); + v.push(b' '); + v.push(b' '); } + } + v.push(b'\t'); - String::from_utf8_lossy(&v[..]).into_owned() + for &byte in chunk { + if (byte >= 32 && byte <= 126) || byte >= 128 { + v.push(byte); + } else { + v.push(b'.'); + } + } + v.push(b'\n'); } + + String::from_utf8_lossy(&v[..]).into_owned() + } } impl From for Bytes { - fn from(b: bytes::Bytes) -> Self { Bytes(b) } + fn from(b: bytes::Bytes) -> Self { Bytes(b) } } impl From<&'static [u8]> for Bytes { - fn from(slice: &'static [u8]) -> Self { Bytes(bytes::Bytes::from(slice)) } + fn from(slice: &'static [u8]) -> Self { Bytes(bytes::Bytes::from(slice)) } } impl From> for Bytes { - fn from(slice: Vec) -> Self { Bytes(bytes::Bytes::from(slice)) } + fn from(slice: Vec) -> Self { Bytes(bytes::Bytes::from(slice)) } } impl From<&'static str> for Bytes { - fn from(s: &'static str) -> Self { Bytes(bytes::Bytes::from(s.as_bytes())) } + fn from(s: &'static str) -> Self { Bytes(bytes::Bytes::from(s.as_bytes())) } } impl From for Bytes { - fn from(slice: String) -> Self { Bytes(bytes::Bytes::from(slice)) } + fn from(slice: String) -> Self { Bytes(bytes::Bytes::from(slice)) } } pub trait ShitNeededForParsing: Sized { - type Item; - type Sliced; - fn slice>(&self, range: R) -> Self::Sliced; + type Item; + type Sliced; + fn slice>(&self, range: R) -> Self::Sliced; - fn first(&self) -> Option; - fn slice_index(&self, count: usize) -> Result; + fn first(&self) -> Option; + fn slice_index(&self, count: usize) -> Result; - // InputTake - fn take(&self, count: usize) -> Self; - fn take_split(&self, count: usize) -> (Self, Self); + // InputTake + fn take(&self, count: usize) -> Self; + fn take_split(&self, count: usize) -> (Self, Self); - // InputTakeAtPosition - fn split_at_position>( - &self, - predicate: P, - ) -> IResult - where - P: Fn(Self::Item) -> bool; - fn split_at_position1>( - &self, - predicate: P, - e: ErrorKind, - ) -> IResult - where - P: Fn(Self::Item) -> bool; - fn split_at_position_complete>( - &self, - predicate: P, - ) -> IResult - where - P: Fn(Self::Item) -> bool; - fn split_at_position1_complete>( - &self, - predicate: P, - e: ErrorKind, - ) -> IResult - where - P: Fn(Self::Item) -> bool; + // InputTakeAtPosition + fn split_at_position>( + &self, + predicate: P, + ) -> IResult + where + P: Fn(Self::Item) -> bool; + fn split_at_position1>( + &self, + predicate: P, + e: ErrorKind, + ) -> IResult + where + P: Fn(Self::Item) -> bool; + fn split_at_position_complete>( + &self, + predicate: P, + ) -> IResult + where + P: Fn(Self::Item) -> bool; + fn split_at_position1_complete>( + &self, + predicate: P, + e: ErrorKind, + ) -> IResult + where + P: Fn(Self::Item) -> bool; } impl ShitNeededForParsing for Bytes { - type Item = u8; + type Item = u8; - type Sliced = Bytes; - fn slice>(&self, range: R) -> Self::Sliced { - Self(self.0.slice(range)) + type Sliced = Bytes; + fn slice>(&self, range: R) -> Self::Sliced { + Self(self.0.slice(range)) + } + + fn first(&self) -> Option { self.0.first().copied() } + fn slice_index(&self, count: usize) -> Result { + if self.len() >= count { + Ok(count) + } else { + Err(Needed::new(count - self.len())) } + } - fn first(&self) -> Option { self.0.first().copied() } - fn slice_index(&self, count: usize) -> Result { - if self.len() >= count { - Ok(count) + // InputTake + fn take(&self, count: usize) -> Self { self.slice(..count) } + fn take_split(&self, count: usize) -> (Self, Self) { + let mut prefix = self.clone(); + let suffix = Self(prefix.0.split_off(count)); + (suffix, prefix) + } + + // InputTakeAtPosition + fn split_at_position>( + &self, + predicate: P, + ) -> IResult + where + P: Fn(Self::Item) -> bool, + { + match (0..self.len()).find(|b| predicate(self[*b])) { + Some(i) => Ok((self.slice(i..), self.slice(..i))), + None => Err(Err::Incomplete(Needed::new(1))), + } + } + + fn split_at_position1>( + &self, + predicate: P, + e: ErrorKind, + ) -> IResult + where + P: Fn(Self::Item) -> bool, + { + match (0..self.len()).find(|b| predicate(self[*b])) { + Some(0) => Err(Err::Error(E::from_error_kind(self.clone(), e))), + Some(i) => Ok((self.slice(i..), self.slice(..i))), + None => Err(Err::Incomplete(Needed::new(1))), + } + } + + fn split_at_position_complete>( + &self, + predicate: P, + ) -> IResult + where + P: Fn(Self::Item) -> bool, + { + match (0..self.len()).find(|b| predicate(self[*b])) { + Some(i) => Ok((self.slice(i..), self.slice(..i))), + None => Ok(self.take_split(self.input_len())), + } + } + + fn split_at_position1_complete>( + &self, + predicate: P, + e: ErrorKind, + ) -> IResult + where + P: Fn(Self::Item) -> bool, + { + match (0..self.len()).find(|b| predicate(self[*b])) { + Some(0) => Err(Err::Error(E::from_error_kind(self.clone(), e))), + Some(i) => Ok((self.slice(i..), self.slice(..i))), + None => { + if self.is_empty() { + Err(Err::Error(E::from_error_kind(self.clone(), e))) } else { - Err(Needed::new(count - self.len())) - } - } - - // InputTake - fn take(&self, count: usize) -> Self { self.slice(..count) } - fn take_split(&self, count: usize) -> (Self, Self) { - let mut prefix = self.clone(); - let suffix = Self(prefix.0.split_off(count)); - (suffix, prefix) - } - - // InputTakeAtPosition - fn split_at_position>( - &self, - predicate: P, - ) -> IResult - where - P: Fn(Self::Item) -> bool, - { - match (0..self.len()).find(|b| predicate(self[*b])) { - Some(i) => Ok((self.slice(i..), self.slice(..i))), - None => Err(Err::Incomplete(Needed::new(1))), - } - } - - fn split_at_position1>( - &self, - predicate: P, - e: ErrorKind, - ) -> IResult - where - P: Fn(Self::Item) -> bool, - { - match (0..self.len()).find(|b| predicate(self[*b])) { - Some(0) => Err(Err::Error(E::from_error_kind(self.clone(), e))), - Some(i) => Ok((self.slice(i..), self.slice(..i))), - None => Err(Err::Incomplete(Needed::new(1))), - } - } - - fn split_at_position_complete>( - &self, - predicate: P, - ) -> IResult - where - P: Fn(Self::Item) -> bool, - { - match (0..self.len()).find(|b| predicate(self[*b])) { - Some(i) => Ok((self.slice(i..), self.slice(..i))), - None => Ok(self.take_split(self.input_len())), - } - } - - fn split_at_position1_complete>( - &self, - predicate: P, - e: ErrorKind, - ) -> IResult - where - P: Fn(Self::Item) -> bool, - { - match (0..self.len()).find(|b| predicate(self[*b])) { - Some(0) => Err(Err::Error(E::from_error_kind(self.clone(), e))), - Some(i) => Ok((self.slice(i..), self.slice(..i))), - None => { - if self.is_empty() { - Err(Err::Error(E::from_error_kind(self.clone(), e))) - } else { - Ok(self.take_split(self.input_len())) - } - } + Ok(self.take_split(self.input_len())) } + } } + } } pub trait ShitCompare { - fn compare(&self, t: T) -> CompareResult; - fn compare_no_case(&self, t: T) -> CompareResult; + fn compare(&self, t: T) -> CompareResult; + fn compare_no_case(&self, t: T) -> CompareResult; } impl ShitCompare<&[u8]> for Bytes { - fn compare(&self, other: &[u8]) -> CompareResult { - match self.iter().zip(other.iter()).any(|(a, b)| a != b) { - true => CompareResult::Error, - false if self.len() < other.len() => CompareResult::Incomplete, - false => CompareResult::Ok, - } + fn compare(&self, other: &[u8]) -> CompareResult { + match self.iter().zip(other.iter()).any(|(a, b)| a != b) { + true => CompareResult::Error, + false if self.len() < other.len() => CompareResult::Incomplete, + false => CompareResult::Ok, } - fn compare_no_case(&self, other: &[u8]) -> CompareResult { - match self - .iter() - .zip(other.iter()) - .any(|(a, b)| (a | 0x20) != (b | 0x20)) - { - true => CompareResult::Error, - false if self.len() < other.len() => CompareResult::Incomplete, - false => CompareResult::Ok, - } + } + fn compare_no_case(&self, other: &[u8]) -> CompareResult { + match self + .iter() + .zip(other.iter()) + .any(|(a, b)| (a | 0x20) != (b | 0x20)) + { + true => CompareResult::Error, + false if self.len() < other.len() => CompareResult::Incomplete, + false => CompareResult::Ok, } + } } macro_rules! array_impls { @@ -308,14 +308,14 @@ array_impls! { } impl Deref for Bytes { - type Target = [u8]; - fn deref(&self) -> &Self::Target { &*self.0 } + type Target = [u8]; + fn deref(&self) -> &Self::Target { &*self.0 } } impl AsRef<[u8]> for Bytes { - fn as_ref(&self) -> &[u8] { &*self.0 } + fn as_ref(&self) -> &[u8] { &*self.0 } } impl InputLength for Bytes { - fn input_len(&self) -> usize { self.0.len() } + fn input_len(&self) -> usize { self.0.len() } } diff --git a/proto-common/src/convert_error.rs b/proto-common/src/convert_error.rs index e77db7b..bfca185 100644 --- a/proto-common/src/convert_error.rs +++ b/proto-common/src/convert_error.rs @@ -4,155 +4,146 @@ use std::ops::Deref; use bstr::ByteSlice; use nom::{ - error::{VerboseError, VerboseErrorKind}, - Err, HexDisplay, Offset, + error::{VerboseError, VerboseErrorKind}, + Err, HexDisplay, Offset, }; use crate::VResult; /// Same as nom's dbg_dmp, except operates on Bytes pub fn dbg_dmp<'a, T, F, O>( - mut f: F, - context: &'static str, + mut f: F, + context: &'static str, ) -> impl FnMut(T) -> VResult where - F: FnMut(T) -> VResult, - T: AsRef<[u8]> + HexDisplay + Clone + Debug + Deref, + F: FnMut(T) -> VResult, + T: AsRef<[u8]> + HexDisplay + Clone + Debug + Deref, { - move |i: T| match f(i.clone()) { - Err(Err::Failure(e)) => { - println!( - "{}: Error({}) at:\n{}", - context, - convert_error(i.clone(), &e), - i.to_hex(16) - ); - Err(Err::Failure(e)) - } - a => a, + move |i: T| match f(i.clone()) { + Err(Err::Failure(e)) => { + println!( + "{}: Error({}) at:\n{}", + context, + convert_error(i.clone(), &e), + i.to_hex(16) + ); + Err(Err::Failure(e)) } + a => a, + } } /// Same as nom's convert_error, except operates on u8 pub fn convert_error + Debug>( - input: I, - e: &VerboseError, + input: I, + e: &VerboseError, ) -> String { - let mut result = String::new(); - debug!("e: {:?}", e); + let mut result = String::new(); + debug!("e: {:?}", e); - for (i, (substring, kind)) in e.errors.iter().enumerate() { - let offset = input.offset(substring); + for (i, (substring, kind)) in e.errors.iter().enumerate() { + let offset = input.offset(substring); - if input.is_empty() { - match kind { - VerboseErrorKind::Char(c) => { - write!( - &mut result, - "{}: expected '{}', got empty input\n\n", - i, c - ) - } - VerboseErrorKind::Context(s) => { - write!(&mut result, "{}: in {}, got empty input\n\n", i, s) - } - VerboseErrorKind::Nom(e) => { - write!( - &mut result, - "{}: in {:?}, got empty input\n\n", - i, e - ) - } - } - } else { - let prefix = &input.as_bytes()[..offset]; + if input.is_empty() { + match kind { + VerboseErrorKind::Char(c) => { + write!(&mut result, "{}: expected '{}', got empty input\n\n", i, c) + } + VerboseErrorKind::Context(s) => { + write!(&mut result, "{}: in {}, got empty input\n\n", i, s) + } + VerboseErrorKind::Nom(e) => { + write!(&mut result, "{}: in {:?}, got empty input\n\n", i, e) + } + } + } else { + let prefix = &input.as_bytes()[..offset]; - // Count the number of newlines in the first `offset` bytes of input - let line_number = - prefix.iter().filter(|&&b| b == b'\n').count() + 1; + // Count the number of newlines in the first `offset` bytes of input + let line_number = prefix.iter().filter(|&&b| b == b'\n').count() + 1; - // Find the line that includes the subslice: - // Find the *last* newline before the substring starts - let line_begin = prefix - .iter() - .rev() - .position(|&b| b == b'\n') - .map(|pos| offset - pos) - .unwrap_or(0); + // Find the line that includes the subslice: + // Find the *last* newline before the substring starts + let line_begin = prefix + .iter() + .rev() + .position(|&b| b == b'\n') + .map(|pos| offset - pos) + .unwrap_or(0); - // Find the full line after that newline - let line = input[line_begin..] - .lines() - .next() - .unwrap_or(&input[line_begin..]) - .trim_end(); + // Find the full line after that newline + let line = input[line_begin..] + .lines() + .next() + .unwrap_or(&input[line_begin..]) + .trim_end(); - // The (1-indexed) column number is the offset of our substring into - // that line - let column_number = line.offset(substring) + 1; + // The (1-indexed) column number is the offset of our substring into + // that line + let column_number = line.offset(substring) + 1; - match kind { - VerboseErrorKind::Char(c) => { - if let Some(actual) = substring.chars().next() { - write!( - &mut result, - "{i}: at line {line_number}:\n\ + match kind { + VerboseErrorKind::Char(c) => { + if let Some(actual) = substring.chars().next() { + write!( + &mut result, + "{i}: at line {line_number}:\n\ {line}\n\ {caret:>column$}\n\ expected '{expected}', found {actual}\n\n", - i = i, - line_number = line_number, - line = String::from_utf8_lossy(line), - caret = '^', - column = column_number, - expected = c, - actual = actual, - ) - } else { - write!( - &mut result, - "{i}: at line {line_number}:\n\ + i = i, + line_number = line_number, + line = String::from_utf8_lossy(line), + caret = '^', + column = column_number, + expected = c, + actual = actual, + ) + } else { + write!( + &mut result, + "{i}: at line {line_number}:\n\ {line}\n\ {caret:>column$}\n\ expected '{expected}', got end of input\n\n", - i = i, - line_number = line_number, - line = String::from_utf8_lossy(line), - caret = '^', - column = column_number, - expected = c, - ) - } - } - VerboseErrorKind::Context(s) => write!( - &mut result, - "{i}: at line {line_number}, in {context}:\n\ - {line}\n\ - {caret:>column$}\n\n", - i = i, - line_number = line_number, - context = s, - line = String::from_utf8_lossy(line), - caret = '^', - column = column_number, - ), - VerboseErrorKind::Nom(e) => write!( - &mut result, - "{i}: at line {line_number}, in {nom_err:?}:\n\ - {line}\n\ - {caret:>column$}\n\n", - i = i, - line_number = line_number, - nom_err = e, - line = String::from_utf8_lossy(line), - caret = '^', - column = column_number, - ), - } + i = i, + line_number = line_number, + line = String::from_utf8_lossy(line), + caret = '^', + column = column_number, + expected = c, + ) + } } - // Because `write!` to a `String` is infallible, this `unwrap` is fine. - .unwrap(); + VerboseErrorKind::Context(s) => write!( + &mut result, + "{i}: at line {line_number}, in {context}:\n\ + {line}\n\ + {caret:>column$}\n\n", + i = i, + line_number = line_number, + context = s, + line = String::from_utf8_lossy(line), + caret = '^', + column = column_number, + ), + VerboseErrorKind::Nom(e) => write!( + &mut result, + "{i}: at line {line_number}, in {nom_err:?}:\n\ + {line}\n\ + {caret:>column$}\n\n", + i = i, + line_number = line_number, + nom_err = e, + line = String::from_utf8_lossy(line), + caret = '^', + column = column_number, + ), + } } + // Because `write!` to a `String` is infallible, this `unwrap` is fine. + .unwrap(); + } - result + result } diff --git a/proto-common/src/formatter.rs b/proto-common/src/formatter.rs index 1c78ad5..ae8bf4d 100644 --- a/proto-common/src/formatter.rs +++ b/proto-common/src/formatter.rs @@ -8,27 +8,27 @@ /// assert_eq!(quote(b"hello \"' world"), b"\"hello \\\"\\' world\""); /// ``` pub fn quote_string( - quote: u8, - escape: u8, - should_escape: F, + quote: u8, + escape: u8, + should_escape: F, ) -> impl Fn(B) -> Vec where - B: AsRef<[u8]>, - F: Fn(u8) -> bool, + B: AsRef<[u8]>, + F: Fn(u8) -> bool, { - move |input: B| { - let input = input.as_ref(); - let mut ret = Vec::with_capacity(input.len() + 2); + move |input: B| { + let input = input.as_ref(); + let mut ret = Vec::with_capacity(input.len() + 2); - ret.push(quote); - for c in input { - if should_escape(*c) { - ret.push(escape); - } - ret.push(*c); - } - ret.push(quote); - - ret + ret.push(quote); + for c in input { + if should_escape(*c) { + ret.push(escape); + } + ret.push(*c); } + ret.push(quote); + + ret + } } diff --git a/proto-common/src/lib.rs b/proto-common/src/lib.rs index 2f7d1d7..d6a1c30 100644 --- a/proto-common/src/lib.rs +++ b/proto-common/src/lib.rs @@ -13,5 +13,5 @@ pub use crate::bytes::{Bytes, ShitCompare, ShitNeededForParsing}; pub use crate::convert_error::{convert_error, dbg_dmp}; pub use crate::formatter::quote_string; pub use crate::parsers::{ - byte, never, parse_num, satisfy, skip, tagi, take, take_while1, VResult, + byte, never, parse_num, satisfy, skip, tagi, take, take_while1, VResult, }; diff --git a/proto-common/src/parsers.rs b/proto-common/src/parsers.rs index f68bbed..3e5087d 100644 --- a/proto-common/src/parsers.rs +++ b/proto-common/src/parsers.rs @@ -1,7 +1,7 @@ use anyhow::Result; use nom::{ - error::{ErrorKind, ParseError, VerboseError}, - CompareResult, Err, IResult, InputLength, Needed, Parser, ToUsize, + error::{ErrorKind, ParseError, VerboseError}, + CompareResult, Err, IResult, InputLength, Needed, Parser, ToUsize, }; use num_traits::{CheckedAdd, CheckedMul, FromPrimitive, Zero}; @@ -19,83 +19,85 @@ pub type VResult = IResult>; /// If `d` is not passed then it defaults to `SP`. #[macro_export] macro_rules! sep_list { - ($t:expr) => { - map( - pair( - $t, - many0(preceded(panorama_proto_common::byte(b'\x20'), $t)), - ), - |(hd, mut tl)| { - tl.insert(0, hd); - tl - }, - ) - }; - ($t:expr, $d:expr) => { - map(pair($t, many0(preceded($d, $t))), |(hd, mut tl)| { + ($t:expr) => { + map( + pair( + $t, + many0(preceded(panorama_proto_common::byte(b'\x20'), $t)), + ), + |(hd, mut tl)| { + tl.insert(0, hd); + tl + }, + ) + }; + ($t:expr, $d:expr) => { + map(pair($t, many0(preceded($d, $t))), |(hd, mut tl)| { + tl.insert(0, hd); + tl + }) + }; + (? $t:expr) => { + map( + opt(pair( + $t, + many0(preceded(panorama_proto_common::byte(b'\x20'), $t)), + )), + |opt| { + opt + .map(|(hd, mut tl)| { tl.insert(0, hd); tl + }) + .unwrap_or_else(Vec::new) + }, + ) + }; + (? $t:expr, $d:expr) => { + map(opt(pair($t, many0(preceded($d, $t)))), |opt| { + opt + .map(|(hd, mut tl)| { + tl.insert(0, hd); + tl }) - }; - (? $t:expr) => { - map( - opt(pair( - $t, - many0(preceded(panorama_proto_common::byte(b'\x20'), $t)), - )), - |opt| { - opt.map(|(hd, mut tl)| { - tl.insert(0, hd); - tl - }) - .unwrap_or_else(Vec::new) - }, - ) - }; - (? $t:expr, $d:expr) => { - map(opt(pair($t, many0(preceded($d, $t)))), |opt| { - opt.map(|(hd, mut tl)| { - tl.insert(0, hd); - tl - }) - .unwrap_or_else(Vec::new) - }) - }; + .unwrap_or_else(Vec::new) + }) + }; } /// Helper macro for wrapping a parser in parentheses. #[macro_export] macro_rules! paren { - ($t:expr) => { - delimited(byte(b'('), $t, byte(b')')) - }; + ($t:expr) => { + delimited(byte(b'('), $t, byte(b')')) + }; } /// Parse from a [u8] into a u32 without first decoding it to UTF-8. pub fn parse_num(s: S) -> Result where - S: AsRef<[u8]>, - T: CheckedMul + Zero + CheckedAdd + FromPrimitive, + S: AsRef<[u8]>, + T: CheckedMul + Zero + CheckedAdd + FromPrimitive, { - let mut total = T::zero(); - let ten = T::from_u8(10).unwrap(); - let s = s.as_ref(); - for digit in s.iter() { - let digit = *digit; - total = match total.checked_mul(&ten) { - Some(v) => v, - None => bail!("number {:?} overflow", s), - }; - if !(digit >= b'0' && digit <= b'9') { - bail!("invalid digit {}", digit) - } - let new_digit = T::from_u8(digit - b'\x30').unwrap(); - total = match total.checked_add(&new_digit) { - Some(v) => v, - None => bail!("number {:?} overflow", s), - }; + let mut total = T::zero(); + let ten = T::from_u8(10).unwrap(); + let s = s.as_ref(); + for digit in s.iter() { + let digit = *digit; + total = match total.checked_mul(&ten) { + Some(v) => v, + None => bail!("number {:?} overflow", s), + }; + if !(digit >= b'0' && digit <= b'9') { + bail!("invalid digit {}", digit) } - Ok(total) + let new_digit = T::from_u8(digit - b'\x30').unwrap(); + total = match total.checked_add(&new_digit) { + Some(v) => v, + None => bail!("number {:?} overflow", s), + }; + } + Ok(total) } // /// Parse from a [u8] into a u32 without first decoding it to UTF-8. @@ -119,96 +121,96 @@ where /// Always fails, used as a no-op. pub fn never(i: I) -> IResult where - E: ParseError, + E: ParseError, { - Err(Err::Error(E::from_error_kind(i, ErrorKind::Not))) + Err(Err::Error(E::from_error_kind(i, ErrorKind::Not))) } /// Skip the part of the input matched by the given parser. pub fn skip(mut f: F) -> impl FnMut(I) -> IResult where - I: Clone, - F: Parser, + I: Clone, + F: Parser, { - move |i: I| match f.parse(i.clone()) { - Ok(_) => Ok((i, ())), - Err(err) => Err(err), - } + move |i: I| match f.parse(i.clone()) { + Ok(_) => Ok((i, ())), + Err(err) => Err(err), + } } /// Same as nom's streaming take, but operates on Bytes pub fn take(count: C) -> impl Fn(I) -> IResult where - I: ShitNeededForParsing + InputLength, - E: ParseError, - C: ToUsize, + I: ShitNeededForParsing + InputLength, + E: ParseError, + C: ToUsize, { - let c = count.to_usize(); - move |i: I| match i.slice_index(c) { - Err(i) => Err(Err::Incomplete(i)), - Ok(index) => Ok(i.take_split(index)), - } + let c = count.to_usize(); + move |i: I| match i.slice_index(c) { + Err(i) => Err(Err::Incomplete(i)), + Ok(index) => Ok(i.take_split(index)), + } } /// Same as nom's streaming take_while1, but operates on Bytes pub fn take_while1(cond: F) -> impl Fn(I) -> IResult where - I: ShitNeededForParsing, - E: ParseError, - F: Fn(T) -> bool, + I: ShitNeededForParsing, + E: ParseError, + F: Fn(T) -> bool, { - move |i: I| { - let e: ErrorKind = ErrorKind::TakeWhile1; - i.split_at_position1(|c| !cond(c), e) - } + move |i: I| { + let e: ErrorKind = ErrorKind::TakeWhile1; + i.split_at_position1(|c| !cond(c), e) + } } /// Tag (case-)Insensitive: Same as nom's tag_no_case, but operates on Bytes pub fn tagi(tag: T) -> impl Fn(I) -> IResult where - I: ShitNeededForParsing + InputLength + ShitCompare, - T: InputLength + Clone, - E: ParseError, + I: ShitNeededForParsing + InputLength + ShitCompare, + T: InputLength + Clone, + E: ParseError, { - let tag_len = tag.input_len(); - move |i: I| match i.compare_no_case(tag.clone()) { - CompareResult::Ok => { - let fst = i.slice(0..tag_len); - let snd = i.slice(tag_len..); - Ok((snd, fst)) - } - CompareResult::Incomplete => { - Err(Err::Incomplete(Needed::new(tag_len - i.input_len()))) - } - CompareResult::Error => { - let e: ErrorKind = ErrorKind::Tag; - Err(Err::Error(E::from_error_kind(i, e))) - } + let tag_len = tag.input_len(); + move |i: I| match i.compare_no_case(tag.clone()) { + CompareResult::Ok => { + let fst = i.slice(0..tag_len); + let snd = i.slice(tag_len..); + Ok((snd, fst)) } + CompareResult::Incomplete => { + Err(Err::Incomplete(Needed::new(tag_len - i.input_len()))) + } + CompareResult::Error => { + let e: ErrorKind = ErrorKind::Tag; + Err(Err::Error(E::from_error_kind(i, e))) + } + } } /// Same as nom's satisfy, but operates on `Bytes` instead of `&str`. pub fn satisfy(f: F) -> impl Fn(I) -> IResult where - I: ShitNeededForParsing, - F: Fn(T) -> bool, - E: ParseError, - T: Copy, + I: ShitNeededForParsing, + F: Fn(T) -> bool, + E: ParseError, + T: Copy, { - move |i: I| match i.first().map(|t| (f(t), t)) { - Some((true, ft)) => Ok((i.slice(1..), ft)), - Some((false, _)) => Err(Err::Error(E::from_error_kind( - i.slice(1..), - ErrorKind::Satisfy, - ))), - None => Err(Err::Incomplete(Needed::Unknown)), - } + move |i: I| match i.first().map(|t| (f(t), t)) { + Some((true, ft)) => Ok((i.slice(1..), ft)), + Some((false, _)) => Err(Err::Error(E::from_error_kind( + i.slice(1..), + ErrorKind::Satisfy, + ))), + None => Err(Err::Incomplete(Needed::Unknown)), + } } /// Match a single byte exactly. pub fn byte>(b: u8) -> impl Fn(I) -> IResult where - I: ShitNeededForParsing, + I: ShitNeededForParsing, { - satisfy(move |c| c == b) + satisfy(move |c| c == b) } diff --git a/rustfmt.toml b/rustfmt.toml index 5106ca8..0c2b72c 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,4 @@ fn_single_line = true max_width = 80 wrap_comments = true +tab_spaces = 2 diff --git a/tui/src/main.rs b/tui/src/main.rs index e7a11a9..80a1832 100644 --- a/tui/src/main.rs +++ b/tui/src/main.rs @@ -1,3 +1,3 @@ fn main() { - println!("Hello, world!"); + println!("Hello, world!"); }