From aa4e336d32468f450103f58b06c283a00da326a9 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Thu, 28 Oct 2021 17:37:35 -0500 Subject: [PATCH] Move everything into lib. --- Cargo.lock | 3 +- daemon/src/lib.rs | 176 +++++++++++++++++++++++++++++++++++++ daemon/src/mail/event.rs | 2 + daemon/src/mail/mod.rs | 5 +- daemon/src/main.rs | 181 +-------------------------------------- 5 files changed, 184 insertions(+), 183 deletions(-) create mode 100644 daemon/src/mail/event.rs diff --git a/Cargo.lock b/Cargo.lock index 6fd2284..1002174 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,8 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" version = "0.4.19" -source = "git+https://github.com/chronotope/chrono?rev=fea3392034e146c041f0ad940c1fe95aee314d38#fea3392034e146c041f0ad940c1fe95aee314d38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" dependencies = [ "libc", "num-integer", diff --git a/daemon/src/lib.rs b/daemon/src/lib.rs index bf632e5..2a5ab54 100644 --- a/daemon/src/lib.rs +++ b/daemon/src/lib.rs @@ -3,9 +3,185 @@ extern crate serde; #[macro_use] extern crate anyhow; #[macro_use] +extern crate futures; +#[macro_use] extern crate log; #[macro_use] extern crate derivative; pub mod config; pub mod mail; + +use anyhow::{Context, Result}; +use clap::Parser; +use futures::future::FutureExt; +use tokio::{ + fs::{self, OpenOptions}, + sync::{mpsc, oneshot}, +}; +use xdg::BaseDirectories; + +use crate::config::{Config, MailAccountConfig}; +use crate::mail::{sync_main, MailStore}; + +type ExitListener = oneshot::Receiver<()>; + +/// The panorama daemon runs in the background and communicates with other +/// panorama components over Unix sockets. +#[derive(Debug, Parser)] +struct Options { + // /// Config file path (defaults to XDG) + // #[clap(long = "config", short = 'c')] + // config_file: Option, + /// Verbose mode (-v, -vv, -vvv, etc) + #[clap(short = 'v', long = "verbose", parse(from_occurrences))] + verbose: usize, +} + +#[tokio::main] +pub async fn run() -> Result<()> { + let opt = Options::parse(); + + stderrlog::new() + .module(module_path!()) + .module("panorama_daemon") + .module("panorama_imap") + .verbosity(opt.verbose) + .init() + .unwrap(); + + // if we're using a config-watcher, then start the watcher system + #[cfg(feature = "config-watch")] + { + let (_, mut config_watcher) = config::spawn_config_watcher_system()?; + + loop { + let (exit_tx, exit_rx) = oneshot::channel(); + let new_config = config_watcher.borrow().clone(); + tokio::spawn(run_with_config(new_config, exit_rx)); + + // wait till the config has changed, then tell the current thread to + // stop + config_watcher.changed().await?; + let _ = exit_tx.send(()); + } + } + + // TODO: handle SIGHUP on Unix? pretty common for systemd to send this when + // reloading config files + + // if not, just read the config once and run the daemon + #[cfg(not(feature = "config-watch"))] + { + let xdg = BaseDirectories::new()?; + let config_home = xdg.get_config_home().join("panorama"); + if !config_home.exists() { + fs::create_dir_all(&config_home).await?; + } + + let config_path = config_home.join("panorama.toml"); + let config_path_c = config_path.canonicalize().context("cfg_path")?; + let config = Config::from_file(config_path_c).await.context("read")?; + + let (_, exit_rx) = oneshot::channel(); + run_with_config(config, exit_rx).await + } +} + +async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> { + debug!("New config: {:?}", config); + + // keep track of which threads need to be stopped when this function is + // stopped + let mut notify_mail_threads = Vec::new(); + + for (account_name, account) in config.mail_accounts { + let (exit_tx, exit_rx) = oneshot::channel(); + tokio::spawn(async { + match run_single_mail_account(account_name, account, exit_rx).await + { + Ok(_) => {} + Err(err) => panic!("failed: {:?}", err), + } + }); + notify_mail_threads.push(exit_tx); + } + + exit.await?; + for exit_tx in notify_mail_threads { + let _ = exit_tx.send(()); + } + + Ok(()) +} + +/// The main loop for a single mail account. +/// +/// This loop is restarted each time the config watcher gets a new config, at +/// which point `exit` is sent a single value telling us to break the loop. +async fn run_single_mail_account( + account_name: String, + account: MailAccountConfig, + exit: ExitListener, +) -> Result<()> { + debug!("run_single_mail_account({}, {:?})", account_name, account); + let mut exit = exit.fuse(); + + let xdg = BaseDirectories::new()?; + let data_home = xdg.get_data_home().join("panorama"); + if !data_home.exists() { + fs::create_dir_all(&data_home) + .await + .context("could not create config directory")?; + } + let db_path = data_home.join("db.sqlite3"); + debug!("Opening database at path: {:?}", db_path); + if !db_path.exists() { + OpenOptions::new() + .create(true) + .write(true) + .open(&db_path) + .await + .context("could not touch db path")?; + } + let db_path = db_path + .canonicalize() + .context("could not canonicalize db path")?; + let store = + MailStore::open(format!("sqlite://{}", db_path.to_string_lossy())) + .await + .context("couldn't open mail store")?; + + let (tx, mut rx) = mpsc::unbounded_channel(); + + let sync_fut = sync_main(&account_name, account, tx, store).fuse(); + pin_mut!(sync_fut); + + debug!("Mail account loop for {}.", account_name); + loop { + select! { + res = sync_fut => match res { + Ok(_) => {}, + Err(err) => { + error!("sync_main died with: {:?}", err); + break; + } + }, + + evt_opt = rx.recv().fuse() => { + let evt = match evt_opt { + Some(evt) => evt, + None => break, + }; + + debug!("Event: {:?}", evt); + }, + + // we're being told to exit the loop + _ = exit => break, + } + } + + debug!("disconnecting from account {}", account_name); + Ok(()) +} diff --git a/daemon/src/mail/event.rs b/daemon/src/mail/event.rs new file mode 100644 index 0000000..0a05f2c --- /dev/null +++ b/daemon/src/mail/event.rs @@ -0,0 +1,2 @@ +#[derive(Debug)] +pub enum MailEvent {} diff --git a/daemon/src/mail/mod.rs b/daemon/src/mail/mod.rs index 568d886..7147196 100644 --- a/daemon/src/mail/mod.rs +++ b/daemon/src/mail/mod.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +mod event; mod store; use anyhow::{Context, Result}; @@ -16,13 +17,11 @@ use tokio::sync::mpsc::UnboundedSender; use crate::config::{ImapAuth, MailAccountConfig, TlsMethod}; +pub use self::event::MailEvent; pub use self::store::MailStore; static MIGRATOR: Migrator = sqlx::migrate!(); -#[derive(Debug)] -pub enum MailEvent {} - /// The main function for the IMAP syncing thread pub async fn sync_main( acct_name: impl AsRef, diff --git a/daemon/src/main.rs b/daemon/src/main.rs index 28f30fe..4f21b33 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -1,181 +1,4 @@ -#[macro_use] -extern crate log; -#[macro_use] -extern crate futures; - -use anyhow::{Context, Result}; -use clap::Parser; -use futures::future::FutureExt; -use panorama_daemon::{ - config::{Config, MailAccountConfig}, - mail::{sync_main, MailStore}, -}; -use tokio::{ - fs::{self, OpenOptions}, - sync::{mpsc, oneshot}, -}; -use xdg::BaseDirectories; - -type ExitListener = oneshot::Receiver<()>; - -/// The panorama daemon runs in the background and communicates with other -/// panorama components over Unix sockets. -#[derive(Debug, Parser)] -struct Options { - // /// Config file path (defaults to XDG) - // #[clap(long = "config", short = 'c')] - // config_file: Option, - /// Verbose mode (-v, -vv, -vvv, etc) - #[clap(short = 'v', long = "verbose", parse(from_occurrences))] - verbose: usize, -} +use anyhow::Result; #[tokio::main] -async fn main() -> Result<()> { - let opt = Options::parse(); - - stderrlog::new() - .module(module_path!()) - .module("panorama_daemon") - .module("panorama_imap") - .verbosity(opt.verbose) - .init() - .unwrap(); - - // if we're using a config-watcher, then start the watcher system - #[cfg(feature = "config-watch")] - { - use panorama_daemon::config; - - let (_, mut config_watcher) = config::spawn_config_watcher_system()?; - - loop { - let (exit_tx, exit_rx) = oneshot::channel(); - let new_config = config_watcher.borrow().clone(); - tokio::spawn(run_with_config(new_config, exit_rx)); - - // wait till the config has changed, then tell the current thread to - // stop - config_watcher.changed().await?; - let _ = exit_tx.send(()); - } - } - - // TODO: handle SIGHUP on Unix? pretty common for systemd to send this when - // reloading config files - - // if not, just read the config once and run the daemon - #[cfg(not(feature = "config-watch"))] - { - let xdg = BaseDirectories::new()?; - let config_home = xdg.get_config_home().join("panorama"); - if !config_home.exists() { - fs::create_dir_all(&config_home).await?; - } - - let config_path = config_home.join("panorama.toml"); - let config_path_c = config_path.canonicalize().context("cfg_path")?; - let config = Config::from_file(config_path_c).await.context("read")?; - - let (_, exit_rx) = oneshot::channel(); - run_with_config(config, exit_rx).await - } -} - -async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> { - debug!("New config: {:?}", config); - - // keep track of which threads need to be stopped when this function is - // stopped - let mut notify_mail_threads = Vec::new(); - - for (account_name, account) in config.mail_accounts { - let (exit_tx, exit_rx) = oneshot::channel(); - tokio::spawn(async { - match run_single_mail_account(account_name, account, exit_rx).await - { - Ok(_) => {} - Err(err) => panic!("failed: {:?}", err), - } - }); - notify_mail_threads.push(exit_tx); - } - - exit.await?; - for exit_tx in notify_mail_threads { - let _ = exit_tx.send(()); - } - - Ok(()) -} - -/// The main loop for a single mail account. -/// -/// This loop is restarted each time the config watcher gets a new config, at -/// which point `exit` is sent a single value telling us to break the loop. -async fn run_single_mail_account( - account_name: String, - account: MailAccountConfig, - exit: ExitListener, -) -> Result<()> { - debug!("run_single_mail_account({}, {:?})", account_name, account); - let mut exit = exit.fuse(); - - let xdg = BaseDirectories::new()?; - let data_home = xdg.get_data_home().join("panorama"); - if !data_home.exists() { - fs::create_dir_all(&data_home) - .await - .context("could not create config directory")?; - } - let db_path = data_home.join("db.sqlite3"); - debug!("Opening database at path: {:?}", db_path); - if !db_path.exists() { - OpenOptions::new() - .create(true) - .write(true) - .open(&db_path) - .await - .context("could not touch db path")?; - } - let db_path = db_path - .canonicalize() - .context("could not canonicalize db path")?; - let store = - MailStore::open(format!("sqlite://{}", db_path.to_string_lossy())) - .await - .context("couldn't open mail store")?; - - let (tx, mut rx) = mpsc::unbounded_channel(); - - let sync_fut = sync_main(&account_name, account, tx, store).fuse(); - pin_mut!(sync_fut); - - debug!("Mail account loop for {}.", account_name); - loop { - select! { - res = sync_fut => match res { - Ok(_) => {}, - Err(err) => { - error!("sync_main died with: {:?}", err); - break; - } - }, - - evt_opt = rx.recv().fuse() => { - let evt = match evt_opt { - Some(evt) => evt, - None => break, - }; - - debug!("Event: {:?}", evt); - }, - - // we're being told to exit the loop - _ = exit => break, - } - } - - debug!("disconnecting from account {}", account_name); - Ok(()) -} +async fn main() -> Result<()> { panorama_daemon::run() }