From 0899cf32763d572d86d6120a368286e8b2d1f48f Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Thu, 4 Nov 2021 03:44:41 -0500 Subject: [PATCH] Begin service work... --- .editorconfig | 1 + Justfile | 10 +- daemon/src/config/mod.rs | 2 + daemon/src/exit.rs | 15 ++ daemon/src/lib.rs | 19 ++- daemon/src/mail/mod.rs | 339 ++++++++++++++++++--------------------- daemon/src/service.rs | 4 + 7 files changed, 202 insertions(+), 188 deletions(-) create mode 100644 daemon/src/exit.rs create mode 100644 daemon/src/service.rs diff --git a/.editorconfig b/.editorconfig index 4d87686..9611619 100644 --- a/.editorconfig +++ b/.editorconfig @@ -1,3 +1,4 @@ [*.rs] indent_style = space indent_size = 2 +max_line_length = 80 diff --git a/Justfile b/Justfile index 839d4b7..366fac2 100644 --- a/Justfile +++ b/Justfile @@ -1,11 +1,17 @@ +set dotenv-load := false + ct: tokei fmt: cargo +nightly fmt --all -doc: - cargo doc --workspace --no-deps +doc shouldOpen="": + #!/bin/bash + if [[ -n "{{shouldOpen}}" ]]; then + OPEN=--open + fi + cargo doc --workspace --document-private-items --no-deps $OPEN test: cargo test --all diff --git a/daemon/src/config/mod.rs b/daemon/src/config/mod.rs index a9f564b..14f333c 100644 --- a/daemon/src/config/mod.rs +++ b/daemon/src/config/mod.rs @@ -1,3 +1,5 @@ +//! Everything related to config handling within the panorama daemon. + #[cfg(feature = "config-watch")] mod watcher; diff --git a/daemon/src/exit.rs b/daemon/src/exit.rs new file mode 100644 index 0000000..b07c5f4 --- /dev/null +++ b/daemon/src/exit.rs @@ -0,0 +1,15 @@ +use tokio::sync::oneshot; + +/// Receiver whose sole purpose is to receive an "exit" notification. +/// +/// This exit "pattern", along with message passing over channels, allows +/// panorama to introspect on running loops. For example, a server listen loop +/// would select over the `ExitListener`, and break the loop once it receives +/// something on that channel. Then it would perform any clean up it needs and +/// exit, returning any resources that can be reused. +/// +/// This is useful, for example, during TLS upgrade with STARTTLS, where the +/// connection needs to stay open. We can send the existing loop an "exit" +/// notification, then it returns the open channel so we can perform TLS +/// negotiation and create a new tunneled connection. +pub type ExitListener = oneshot::Receiver<()>; diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index bcd6abd..be75f68 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -1,3 +1,7 @@ +//! Library functions used by the panorama daemon. + +// #![deny(missing_docs)] + #[macro_use] extern crate serde; #[macro_use] @@ -12,6 +16,9 @@ extern crate derivative; pub mod config; pub mod mail; +mod exit; +mod service; + use std::path::PathBuf; use anyhow::{Context, Result}; @@ -24,10 +31,9 @@ use tokio::{ use xdg::BaseDirectories; use crate::config::{Config, MailAccountConfig}; +pub use crate::exit::ExitListener; use crate::mail::{mail_main, MailStore}; -type ExitListener = oneshot::Receiver<()>; - /// The panorama daemon runs in the background and communicates with other /// panorama components over Unix sockets. #[derive(Debug, Parser)] @@ -41,6 +47,11 @@ struct Options { verbose: usize, } +/// Primary entrypoint; this is the function that is called by main with no +/// arguments. +/// +/// The purpose of this function is to parse command line arguments +/// and set up config watching, then call [`run_with_config`] with the config. pub async fn run() -> Result<()> { let opt = Options::parse(); @@ -118,8 +129,8 @@ async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> { /// The main loop for a single mail account. /// -/// This loop is restarted each time the config watcher gets a new config, at -/// which point `exit` is sent a single value telling us to break the loop. +/// The exit listener may allow the loop to be interrupted by the outside. When +/// anything is received on the exit listener, the loop exits gracefully. async fn run_single_mail_account( account_name: String, account: MailAccountConfig, diff --git a/daemon/src/mail/mod.rs b/daemon/src/mail/mod.rs index 371e320..dfabc59 100644 --- a/daemon/src/mail/mod.rs +++ b/daemon/src/mail/mod.rs @@ -6,7 +6,7 @@ mod store; use anyhow::{Context, Result}; use futures::stream::StreamExt; use panorama_imap::{ - client::{auth::Login, ConfigBuilder}, + client::{ClientAuthenticated, ConfigBuilder}, pool::{ImapPool, PoolConfig}, proto::{ command::FetchItems, @@ -16,14 +16,18 @@ use panorama_imap::{ use sqlx::migrate::Migrator; use tokio::sync::mpsc::UnboundedSender; -use crate::config::{ImapAuth, MailAccountConfig, TlsMethod}; +use crate::config::{MailAccountConfig, TlsMethod}; pub use self::event::MailEvent; pub use self::store::MailStore; static MIGRATOR: Migrator = sqlx::migrate!(); -/// The main function for the IMAP syncing thread +/// The main function for the IMAP thread. +/// +/// This spawns two processes: +/// - A background synchronization thread +/// - An event listener / handler pub async fn mail_main( acct_name: impl AsRef, acct: MailAccountConfig, @@ -50,188 +54,159 @@ pub async fn mail_main( // grab one connection from that pool and start running a background // synchronization thread let sync_conn = pool.acquire().await?; + let sync_loop = run_sync_loop(sync_conn); // let the rest of the pool respond to requests coming from the channel // TODO: - return Ok(()); + sync_loop.await?; - // loop ensures that the connection is retried after it dies - loop { - let client = ConfigBuilder::default() - .hostname(acct.imap.server.clone()) - .port(acct.imap.port) - .tls(matches!(acct.imap.tls, TlsMethod::On)) - .open() - .await - .map_err(|err| anyhow!("err: {}", err))?; - debug!("Connected to {}:{}.", &acct.imap.server, acct.imap.port); - - debug!("TLS Upgrade option: {:?}", acct.imap.tls); - let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { - debug!("attempting to upgrade"); - let client = client - .upgrade() - .await - .context("could not upgrade connection")?; - debug!("upgrade successful"); - client - } else { - warn!("Continuing with unencrypted connection!"); - client - }; - - debug!("preparing to auth"); - // check if the authentication method is supported - let mut authed = match &acct.imap.auth { - ImapAuth::Plain { username, password } => { - let login = Login { - username: username.clone(), - password: password.clone(), - }; - unauth.auth(login).await? - } - }; - - debug!("authentication successful!"); - let folder_list = authed.list().await?; - // let _ = mail2ui_tx.send(MailEvent::FolderList( - // acct_name.clone(), - // folder_list.clone(), - // )); - - debug!("mailbox list: {:?}", folder_list); - for folder in folder_list.iter() { - debug!("folder: {:?}", folder); - let select = authed.select("INBOX").await?; - debug!("select response: {:?}", select); - if let (Some(_exists), Some(_uidvalidity)) = - (select.exists, select.uid_validity) - { - // figure out which uids don't exist locally yet - let new_uids = vec![]; - // let new_uids = - // stream::iter(1..exists).map(Ok).try_filter_map(|uid| { - // todo!() - // // mail_store.try_identify_email(&acct_name, &folder, - // uid, uidvalidity, None) // // - // invert the option to only select uids that - // haven't been downloaded // - // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) - // // .map_err(|err| err.context("error checking if - // the email is already downloaded - // [try_identify_email]")) }).try_collect:: - // >().await?; - if !new_uids.is_empty() { - debug!("fetching uids {:?}", new_uids); - let _fetched = authed - .uid_fetch(&new_uids, &[], FetchItems::Envelope) - .await - .context("error fetching uids")?; - // fetched - // .map(Ok) - // .try_for_each_concurrent(None, |(uid, attrs)| { - // mail_store.store_email(&acct_name, &folder, uid, - // uidvalidity, attrs) }) - // .await - // .context("error during fetch-store")?; - } - } - } - tokio::time::sleep(std::time::Duration::from_secs(50)).await; - - // TODO: remove this later - // continue; - - // let's just select INBOX for now, maybe have a config for default - // mailbox later? - - debug!("selecting the INBOX mailbox"); - let select = authed.select("INBOX").await?; - debug!("select result: {:?}", select); - - loop { - let message_uids = authed.uid_search().await?; - let message_uids = message_uids.into_iter().take(30).collect::>(); - // let _ = mail2ui_tx.send(MailEvent::MessageUids( - // acct_name.clone(), - // message_uids.clone(), - // )); - // TODO: make this happen concurrently with the main loop? - let mut message_list = authed - .uid_fetch(&message_uids, &[], FetchItems::All) - .await - .unwrap(); - while let Some((_uid, _attrs)) = message_list.next().await { - // let evt = MailEvent::UpdateUid(acct_name.clone(), uid, - // attrs); TODO: probably odn't care about this? - // let _ = mail2ui_tx.send(evt); - } - - // TODO: check if IDLE is supported - let supports_idle = true; // authed.has_capability("IDLE").await?; - - if supports_idle { - let mut idle_stream = authed.idle().await?; - loop { - let evt = match idle_stream.next().await { - Some(v) => v, - None => break, - }; - - debug!("got an event: {:?}", evt); - match evt { - Response::MailboxData(MailboxData::Exists(uid)) => { - debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); - // send DONE to stop the idle - std::mem::drop(idle_stream); - // let handle = Notification::new() - // .summary("New Email") - // .body("TODO") - // .icon("firefox") - // .timeout(Timeout::Milliseconds(6000)) - // .show()?; - let message_uids = authed.uid_search().await?; - let message_uids = - message_uids.into_iter().take(20).collect::>(); - // let _ = mail2ui_tx.send(MailEvent::MessageUids( - // acct_name.clone(), - // message_uids.clone(), - // )); - // TODO: make this happen concurrently with the main - // loop? - let mut message_list = authed - .uid_fetch(&message_uids, &[], FetchItems::All) - .await - .unwrap(); - while let Some((_uid, _attrs)) = message_list.next().await { - // let evt = MailEvent::UpdateUid(acct_name. - // clone(), uid, attrs); - // debug!("sent {:?}", evt); - // mail2ui_tx.send(evt); - } - - idle_stream = authed.idle().await?; - } - _ => {} - } - } - } else { - loop { - tokio::time::sleep(std::time::Duration::from_secs(20)).await; - debug!("heartbeat"); - } - } - if false { - break; - } - } - - // wait a bit so we're not hitting the server really fast if the fail - // happens early on - // - // TODO: some kind of smart exponential backoff that considers some time - // threshold to be a failing case? - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } + Ok(()) +} + +async fn run_sync_loop(mut conn: ClientAuthenticated) -> Result<()> { + // get the list of folders first + debug!(target: "run_sync_loop", "Retrieving folder list..."); + let folder_list = conn.list().await?; + + // let _ = mail2ui_tx.send(MailEvent::FolderList( + // acct_name.clone(), + // folder_list.clone(), + // )); + + debug!("mailbox list: {:?}", folder_list); + for folder in folder_list.iter() { + debug!("folder: {:?}", folder); + let select = conn.select("INBOX").await?; + debug!("select response: {:?}", select); + if let (Some(_exists), Some(_uidvalidity)) = + (select.exists, select.uid_validity) + { + // figure out which uids don't exist locally yet + let new_uids = vec![]; + // let new_uids = + // stream::iter(1..exists).map(Ok).try_filter_map(|uid| { + // todo!() + // // mail_store.try_identify_email(&acct_name, &folder, + // uid, uidvalidity, None) // // + // invert the option to only select uids that + // haven't been downloaded // + // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) + // // .map_err(|err| err.context("error checking if + // the email is already downloaded + // [try_identify_email]")) }).try_collect:: + // >().await?; + if !new_uids.is_empty() { + debug!("fetching uids {:?}", new_uids); + let _fetched = conn + .uid_fetch(&new_uids, &[], FetchItems::Envelope) + .await + .context("error fetching uids")?; + // fetched + // .map(Ok) + // .try_for_each_concurrent(None, |(uid, attrs)| { + // mail_store.store_email(&acct_name, &folder, uid, + // uidvalidity, attrs) }) + // .await + // .context("error during fetch-store")?; + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(50)).await; + + // TODO: remove this later + // continue; + + // let's just select INBOX for now, maybe have a config for default + // mailbox later? + + debug!("selecting the INBOX mailbox"); + let select = conn.select("INBOX").await?; + debug!("select result: {:?}", select); + + loop { + let message_uids = conn.uid_search().await?; + let message_uids = message_uids.into_iter().take(30).collect::>(); + // let _ = mail2ui_tx.send(MailEvent::MessageUids( + // acct_name.clone(), + // message_uids.clone(), + // )); + // TODO: make this happen concurrently with the main loop? + let mut message_list = conn + .uid_fetch(&message_uids, &[], FetchItems::All) + .await + .unwrap(); + while let Some((_uid, _attrs)) = message_list.next().await { + // let evt = MailEvent::UpdateUid(acct_name.clone(), uid, + // attrs); TODO: probably odn't care about this? + // let _ = mail2ui_tx.send(evt); + } + + // TODO: check if IDLE is supported + let supports_idle = true; // authed.has_capability("IDLE").await?; + + if supports_idle { + let mut idle_stream = conn.idle().await?; + loop { + let evt = match idle_stream.next().await { + Some(v) => v, + None => break, + }; + + debug!("got an event: {:?}", evt); + match evt { + Response::MailboxData(MailboxData::Exists(uid)) => { + debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); + // send DONE to stop the idle + std::mem::drop(idle_stream); + // let handle = Notification::new() + // .summary("New Email") + // .body("TODO") + // .icon("firefox") + // .timeout(Timeout::Milliseconds(6000)) + // .show()?; + let message_uids = conn.uid_search().await?; + let message_uids = + message_uids.into_iter().take(20).collect::>(); + // let _ = mail2ui_tx.send(MailEvent::MessageUids( + // acct_name.clone(), + // message_uids.clone(), + // )); + // TODO: make this happen concurrently with the main + // loop? + let mut message_list = conn + .uid_fetch(&message_uids, &[], FetchItems::All) + .await + .unwrap(); + while let Some((_uid, _attrs)) = message_list.next().await { + // let evt = MailEvent::UpdateUid(acct_name. + // clone(), uid, attrs); + // debug!("sent {:?}", evt); + // mail2ui_tx.send(evt); + } + + idle_stream = conn.idle().await?; + } + _ => {} + } + } + } else { + loop { + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + debug!("heartbeat"); + } + } + if false { + break; + } + } + + // wait a bit so we're not hitting the server really fast if the fail + // happens early on + // + // TODO: some kind of smart exponential backoff that considers some time + // threshold to be a failing case? + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + Ok(()) } diff --git a/daemon/src/service.rs b/daemon/src/service.rs new file mode 100644 index 0000000..b65a2b8 --- /dev/null +++ b/daemon/src/service.rs @@ -0,0 +1,4 @@ +pub trait Service { + type Response; + type Error; +}