Move everything into lib.

This commit is contained in:
Michael Zhang 2021-10-28 17:37:35 -05:00
parent 3c51289429
commit aa4e336d32
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
5 changed files with 184 additions and 183 deletions

3
Cargo.lock generated
View file

@ -163,7 +163,8 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "git+https://github.com/chronotope/chrono?rev=fea3392034e146c041f0ad940c1fe95aee314d38#fea3392034e146c041f0ad940c1fe95aee314d38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",

View file

@ -3,9 +3,185 @@ extern crate serde;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate log;
#[macro_use]
extern crate derivative;
pub mod config;
pub mod mail;
use anyhow::{Context, Result};
use clap::Parser;
use futures::future::FutureExt;
use tokio::{
fs::{self, OpenOptions},
sync::{mpsc, oneshot},
};
use xdg::BaseDirectories;
use crate::config::{Config, MailAccountConfig};
use crate::mail::{sync_main, MailStore};
type ExitListener = oneshot::Receiver<()>;
/// The panorama daemon runs in the background and communicates with other
/// panorama components over Unix sockets.
#[derive(Debug, Parser)]
struct Options {
// /// Config file path (defaults to XDG)
// #[clap(long = "config", short = 'c')]
// config_file: Option<PathBuf>,
/// Verbose mode (-v, -vv, -vvv, etc)
#[clap(short = 'v', long = "verbose", parse(from_occurrences))]
verbose: usize,
}
#[tokio::main]
pub async fn run() -> Result<()> {
let opt = Options::parse();
stderrlog::new()
.module(module_path!())
.module("panorama_daemon")
.module("panorama_imap")
.verbosity(opt.verbose)
.init()
.unwrap();
// if we're using a config-watcher, then start the watcher system
#[cfg(feature = "config-watch")]
{
let (_, mut config_watcher) = config::spawn_config_watcher_system()?;
loop {
let (exit_tx, exit_rx) = oneshot::channel();
let new_config = config_watcher.borrow().clone();
tokio::spawn(run_with_config(new_config, exit_rx));
// wait till the config has changed, then tell the current thread to
// stop
config_watcher.changed().await?;
let _ = exit_tx.send(());
}
}
// TODO: handle SIGHUP on Unix? pretty common for systemd to send this when
// reloading config files
// if not, just read the config once and run the daemon
#[cfg(not(feature = "config-watch"))]
{
let xdg = BaseDirectories::new()?;
let config_home = xdg.get_config_home().join("panorama");
if !config_home.exists() {
fs::create_dir_all(&config_home).await?;
}
let config_path = config_home.join("panorama.toml");
let config_path_c = config_path.canonicalize().context("cfg_path")?;
let config = Config::from_file(config_path_c).await.context("read")?;
let (_, exit_rx) = oneshot::channel();
run_with_config(config, exit_rx).await
}
}
async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> {
debug!("New config: {:?}", config);
// keep track of which threads need to be stopped when this function is
// stopped
let mut notify_mail_threads = Vec::new();
for (account_name, account) in config.mail_accounts {
let (exit_tx, exit_rx) = oneshot::channel();
tokio::spawn(async {
match run_single_mail_account(account_name, account, exit_rx).await
{
Ok(_) => {}
Err(err) => panic!("failed: {:?}", err),
}
});
notify_mail_threads.push(exit_tx);
}
exit.await?;
for exit_tx in notify_mail_threads {
let _ = exit_tx.send(());
}
Ok(())
}
/// The main loop for a single mail account.
///
/// This loop is restarted each time the config watcher gets a new config, at
/// which point `exit` is sent a single value telling us to break the loop.
async fn run_single_mail_account(
account_name: String,
account: MailAccountConfig,
exit: ExitListener,
) -> Result<()> {
debug!("run_single_mail_account({}, {:?})", account_name, account);
let mut exit = exit.fuse();
let xdg = BaseDirectories::new()?;
let data_home = xdg.get_data_home().join("panorama");
if !data_home.exists() {
fs::create_dir_all(&data_home)
.await
.context("could not create config directory")?;
}
let db_path = data_home.join("db.sqlite3");
debug!("Opening database at path: {:?}", db_path);
if !db_path.exists() {
OpenOptions::new()
.create(true)
.write(true)
.open(&db_path)
.await
.context("could not touch db path")?;
}
let db_path = db_path
.canonicalize()
.context("could not canonicalize db path")?;
let store =
MailStore::open(format!("sqlite://{}", db_path.to_string_lossy()))
.await
.context("couldn't open mail store")?;
let (tx, mut rx) = mpsc::unbounded_channel();
let sync_fut = sync_main(&account_name, account, tx, store).fuse();
pin_mut!(sync_fut);
debug!("Mail account loop for {}.", account_name);
loop {
select! {
res = sync_fut => match res {
Ok(_) => {},
Err(err) => {
error!("sync_main died with: {:?}", err);
break;
}
},
evt_opt = rx.recv().fuse() => {
let evt = match evt_opt {
Some(evt) => evt,
None => break,
};
debug!("Event: {:?}", evt);
},
// we're being told to exit the loop
_ = exit => break,
}
}
debug!("disconnecting from account {}", account_name);
Ok(())
}

2
daemon/src/mail/event.rs Normal file
View file

@ -0,0 +1,2 @@
#[derive(Debug)]
pub enum MailEvent {}

View file

@ -1,5 +1,6 @@
#![allow(dead_code)]
mod event;
mod store;
use anyhow::{Context, Result};
@ -16,13 +17,11 @@ use tokio::sync::mpsc::UnboundedSender;
use crate::config::{ImapAuth, MailAccountConfig, TlsMethod};
pub use self::event::MailEvent;
pub use self::store::MailStore;
static MIGRATOR: Migrator = sqlx::migrate!();
#[derive(Debug)]
pub enum MailEvent {}
/// The main function for the IMAP syncing thread
pub async fn sync_main(
acct_name: impl AsRef<str>,

View file

@ -1,181 +1,4 @@
#[macro_use]
extern crate log;
#[macro_use]
extern crate futures;
use anyhow::{Context, Result};
use clap::Parser;
use futures::future::FutureExt;
use panorama_daemon::{
config::{Config, MailAccountConfig},
mail::{sync_main, MailStore},
};
use tokio::{
fs::{self, OpenOptions},
sync::{mpsc, oneshot},
};
use xdg::BaseDirectories;
type ExitListener = oneshot::Receiver<()>;
/// The panorama daemon runs in the background and communicates with other
/// panorama components over Unix sockets.
#[derive(Debug, Parser)]
struct Options {
// /// Config file path (defaults to XDG)
// #[clap(long = "config", short = 'c')]
// config_file: Option<PathBuf>,
/// Verbose mode (-v, -vv, -vvv, etc)
#[clap(short = 'v', long = "verbose", parse(from_occurrences))]
verbose: usize,
}
use anyhow::Result;
#[tokio::main]
async fn main() -> Result<()> {
let opt = Options::parse();
stderrlog::new()
.module(module_path!())
.module("panorama_daemon")
.module("panorama_imap")
.verbosity(opt.verbose)
.init()
.unwrap();
// if we're using a config-watcher, then start the watcher system
#[cfg(feature = "config-watch")]
{
use panorama_daemon::config;
let (_, mut config_watcher) = config::spawn_config_watcher_system()?;
loop {
let (exit_tx, exit_rx) = oneshot::channel();
let new_config = config_watcher.borrow().clone();
tokio::spawn(run_with_config(new_config, exit_rx));
// wait till the config has changed, then tell the current thread to
// stop
config_watcher.changed().await?;
let _ = exit_tx.send(());
}
}
// TODO: handle SIGHUP on Unix? pretty common for systemd to send this when
// reloading config files
// if not, just read the config once and run the daemon
#[cfg(not(feature = "config-watch"))]
{
let xdg = BaseDirectories::new()?;
let config_home = xdg.get_config_home().join("panorama");
if !config_home.exists() {
fs::create_dir_all(&config_home).await?;
}
let config_path = config_home.join("panorama.toml");
let config_path_c = config_path.canonicalize().context("cfg_path")?;
let config = Config::from_file(config_path_c).await.context("read")?;
let (_, exit_rx) = oneshot::channel();
run_with_config(config, exit_rx).await
}
}
async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> {
debug!("New config: {:?}", config);
// keep track of which threads need to be stopped when this function is
// stopped
let mut notify_mail_threads = Vec::new();
for (account_name, account) in config.mail_accounts {
let (exit_tx, exit_rx) = oneshot::channel();
tokio::spawn(async {
match run_single_mail_account(account_name, account, exit_rx).await
{
Ok(_) => {}
Err(err) => panic!("failed: {:?}", err),
}
});
notify_mail_threads.push(exit_tx);
}
exit.await?;
for exit_tx in notify_mail_threads {
let _ = exit_tx.send(());
}
Ok(())
}
/// The main loop for a single mail account.
///
/// This loop is restarted each time the config watcher gets a new config, at
/// which point `exit` is sent a single value telling us to break the loop.
async fn run_single_mail_account(
account_name: String,
account: MailAccountConfig,
exit: ExitListener,
) -> Result<()> {
debug!("run_single_mail_account({}, {:?})", account_name, account);
let mut exit = exit.fuse();
let xdg = BaseDirectories::new()?;
let data_home = xdg.get_data_home().join("panorama");
if !data_home.exists() {
fs::create_dir_all(&data_home)
.await
.context("could not create config directory")?;
}
let db_path = data_home.join("db.sqlite3");
debug!("Opening database at path: {:?}", db_path);
if !db_path.exists() {
OpenOptions::new()
.create(true)
.write(true)
.open(&db_path)
.await
.context("could not touch db path")?;
}
let db_path = db_path
.canonicalize()
.context("could not canonicalize db path")?;
let store =
MailStore::open(format!("sqlite://{}", db_path.to_string_lossy()))
.await
.context("couldn't open mail store")?;
let (tx, mut rx) = mpsc::unbounded_channel();
let sync_fut = sync_main(&account_name, account, tx, store).fuse();
pin_mut!(sync_fut);
debug!("Mail account loop for {}.", account_name);
loop {
select! {
res = sync_fut => match res {
Ok(_) => {},
Err(err) => {
error!("sync_main died with: {:?}", err);
break;
}
},
evt_opt = rx.recv().fuse() => {
let evt = match evt_opt {
Some(evt) => evt,
None => break,
};
debug!("Event: {:?}", evt);
},
// we're being told to exit the loop
_ = exit => break,
}
}
debug!("disconnecting from account {}", account_name);
Ok(())
}
async fn main() -> Result<()> { panorama_daemon::run() }