Begin service work...

This commit is contained in:
Michael Zhang 2021-11-04 03:44:41 -05:00
parent 7c36d9ef7d
commit 0899cf3276
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
7 changed files with 202 additions and 188 deletions

View file

@ -1,3 +1,4 @@
[*.rs] [*.rs]
indent_style = space indent_style = space
indent_size = 2 indent_size = 2
max_line_length = 80

View file

@ -1,11 +1,17 @@
set dotenv-load := false
ct: ct:
tokei tokei
fmt: fmt:
cargo +nightly fmt --all cargo +nightly fmt --all
doc: doc shouldOpen="":
cargo doc --workspace --no-deps #!/bin/bash
if [[ -n "{{shouldOpen}}" ]]; then
OPEN=--open
fi
cargo doc --workspace --document-private-items --no-deps $OPEN
test: test:
cargo test --all cargo test --all

View file

@ -1,3 +1,5 @@
//! Everything related to config handling within the panorama daemon.
#[cfg(feature = "config-watch")] #[cfg(feature = "config-watch")]
mod watcher; mod watcher;

15
daemon/src/exit.rs Normal file
View file

@ -0,0 +1,15 @@
use tokio::sync::oneshot;
/// Receiver whose sole purpose is to receive an "exit" notification.
///
/// This exit "pattern", along with message passing over channels, allows
/// panorama to introspect on running loops. For example, a server listen loop
/// would select over the `ExitListener`, and break the loop once it receives
/// something on that channel. Then it would perform any clean up it needs and
/// exit, returning any resources that can be reused.
///
/// This is useful, for example, during TLS upgrade with STARTTLS, where the
/// connection needs to stay open. We can send the existing loop an "exit"
/// notification, then it returns the open channel so we can perform TLS
/// negotiation and create a new tunneled connection.
pub type ExitListener = oneshot::Receiver<()>;

View file

@ -1,3 +1,7 @@
//! Library functions used by the panorama daemon.
// #![deny(missing_docs)]
#[macro_use] #[macro_use]
extern crate serde; extern crate serde;
#[macro_use] #[macro_use]
@ -12,6 +16,9 @@ extern crate derivative;
pub mod config; pub mod config;
pub mod mail; pub mod mail;
mod exit;
mod service;
use std::path::PathBuf; use std::path::PathBuf;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
@ -24,10 +31,9 @@ use tokio::{
use xdg::BaseDirectories; use xdg::BaseDirectories;
use crate::config::{Config, MailAccountConfig}; use crate::config::{Config, MailAccountConfig};
pub use crate::exit::ExitListener;
use crate::mail::{mail_main, MailStore}; use crate::mail::{mail_main, MailStore};
type ExitListener = oneshot::Receiver<()>;
/// The panorama daemon runs in the background and communicates with other /// The panorama daemon runs in the background and communicates with other
/// panorama components over Unix sockets. /// panorama components over Unix sockets.
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
@ -41,6 +47,11 @@ struct Options {
verbose: usize, verbose: usize,
} }
/// Primary entrypoint; this is the function that is called by main with no
/// arguments.
///
/// The purpose of this function is to parse command line arguments
/// and set up config watching, then call [`run_with_config`] with the config.
pub async fn run() -> Result<()> { pub async fn run() -> Result<()> {
let opt = Options::parse(); let opt = Options::parse();
@ -118,8 +129,8 @@ async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> {
/// The main loop for a single mail account. /// The main loop for a single mail account.
/// ///
/// This loop is restarted each time the config watcher gets a new config, at /// The exit listener may allow the loop to be interrupted by the outside. When
/// which point `exit` is sent a single value telling us to break the loop. /// anything is received on the exit listener, the loop exits gracefully.
async fn run_single_mail_account( async fn run_single_mail_account(
account_name: String, account_name: String,
account: MailAccountConfig, account: MailAccountConfig,

View file

@ -6,7 +6,7 @@ mod store;
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use panorama_imap::{ use panorama_imap::{
client::{auth::Login, ConfigBuilder}, client::{ClientAuthenticated, ConfigBuilder},
pool::{ImapPool, PoolConfig}, pool::{ImapPool, PoolConfig},
proto::{ proto::{
command::FetchItems, command::FetchItems,
@ -16,14 +16,18 @@ use panorama_imap::{
use sqlx::migrate::Migrator; use sqlx::migrate::Migrator;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use crate::config::{ImapAuth, MailAccountConfig, TlsMethod}; use crate::config::{MailAccountConfig, TlsMethod};
pub use self::event::MailEvent; pub use self::event::MailEvent;
pub use self::store::MailStore; pub use self::store::MailStore;
static MIGRATOR: Migrator = sqlx::migrate!(); static MIGRATOR: Migrator = sqlx::migrate!();
/// The main function for the IMAP syncing thread /// The main function for the IMAP thread.
///
/// This spawns two processes:
/// - A background synchronization thread
/// - An event listener / handler
pub async fn mail_main( pub async fn mail_main(
acct_name: impl AsRef<str>, acct_name: impl AsRef<str>,
acct: MailAccountConfig, acct: MailAccountConfig,
@ -50,188 +54,159 @@ pub async fn mail_main(
// grab one connection from that pool and start running a background // grab one connection from that pool and start running a background
// synchronization thread // synchronization thread
let sync_conn = pool.acquire().await?; let sync_conn = pool.acquire().await?;
let sync_loop = run_sync_loop(sync_conn);
// let the rest of the pool respond to requests coming from the channel // let the rest of the pool respond to requests coming from the channel
// TODO: // TODO:
return Ok(()); sync_loop.await?;
// loop ensures that the connection is retried after it dies Ok(())
loop { }
let client = ConfigBuilder::default()
.hostname(acct.imap.server.clone()) async fn run_sync_loop(mut conn: ClientAuthenticated) -> Result<()> {
.port(acct.imap.port) // get the list of folders first
.tls(matches!(acct.imap.tls, TlsMethod::On)) debug!(target: "run_sync_loop", "Retrieving folder list...");
.open() let folder_list = conn.list().await?;
.await
.map_err(|err| anyhow!("err: {}", err))?; // let _ = mail2ui_tx.send(MailEvent::FolderList(
debug!("Connected to {}:{}.", &acct.imap.server, acct.imap.port); // acct_name.clone(),
// folder_list.clone(),
debug!("TLS Upgrade option: {:?}", acct.imap.tls); // ));
let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) {
debug!("attempting to upgrade"); debug!("mailbox list: {:?}", folder_list);
let client = client for folder in folder_list.iter() {
.upgrade() debug!("folder: {:?}", folder);
.await let select = conn.select("INBOX").await?;
.context("could not upgrade connection")?; debug!("select response: {:?}", select);
debug!("upgrade successful"); if let (Some(_exists), Some(_uidvalidity)) =
client (select.exists, select.uid_validity)
} else { {
warn!("Continuing with unencrypted connection!"); // figure out which uids don't exist locally yet
client let new_uids = vec![];
}; // let new_uids =
// stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
debug!("preparing to auth"); // todo!()
// check if the authentication method is supported // // mail_store.try_identify_email(&acct_name, &folder,
let mut authed = match &acct.imap.auth { // uid, uidvalidity, None) // //
ImapAuth::Plain { username, password } => { // invert the option to only select uids that
let login = Login { // haven't been downloaded //
username: username.clone(), // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
password: password.clone(), // // .map_err(|err| err.context("error checking if
}; // the email is already downloaded
unauth.auth(login).await? // [try_identify_email]")) }).try_collect::
} // <Vec<_>>().await?;
}; if !new_uids.is_empty() {
debug!("fetching uids {:?}", new_uids);
debug!("authentication successful!"); let _fetched = conn
let folder_list = authed.list().await?; .uid_fetch(&new_uids, &[], FetchItems::Envelope)
// let _ = mail2ui_tx.send(MailEvent::FolderList( .await
// acct_name.clone(), .context("error fetching uids")?;
// folder_list.clone(), // fetched
// )); // .map(Ok)
// .try_for_each_concurrent(None, |(uid, attrs)| {
debug!("mailbox list: {:?}", folder_list); // mail_store.store_email(&acct_name, &folder, uid,
for folder in folder_list.iter() { // uidvalidity, attrs) })
debug!("folder: {:?}", folder); // .await
let select = authed.select("INBOX").await?; // .context("error during fetch-store")?;
debug!("select response: {:?}", select); }
if let (Some(_exists), Some(_uidvalidity)) = }
(select.exists, select.uid_validity) }
{ tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// figure out which uids don't exist locally yet
let new_uids = vec![]; // TODO: remove this later
// let new_uids = // continue;
// stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
// todo!() // let's just select INBOX for now, maybe have a config for default
// // mail_store.try_identify_email(&acct_name, &folder, // mailbox later?
// uid, uidvalidity, None) // //
// invert the option to only select uids that debug!("selecting the INBOX mailbox");
// haven't been downloaded // let select = conn.select("INBOX").await?;
// .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) debug!("select result: {:?}", select);
// // .map_err(|err| err.context("error checking if
// the email is already downloaded loop {
// [try_identify_email]")) }).try_collect:: let message_uids = conn.uid_search().await?;
// <Vec<_>>().await?; let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
if !new_uids.is_empty() { // let _ = mail2ui_tx.send(MailEvent::MessageUids(
debug!("fetching uids {:?}", new_uids); // acct_name.clone(),
let _fetched = authed // message_uids.clone(),
.uid_fetch(&new_uids, &[], FetchItems::Envelope) // ));
.await // TODO: make this happen concurrently with the main loop?
.context("error fetching uids")?; let mut message_list = conn
// fetched .uid_fetch(&message_uids, &[], FetchItems::All)
// .map(Ok) .await
// .try_for_each_concurrent(None, |(uid, attrs)| { .unwrap();
// mail_store.store_email(&acct_name, &folder, uid, while let Some((_uid, _attrs)) = message_list.next().await {
// uidvalidity, attrs) }) // let evt = MailEvent::UpdateUid(acct_name.clone(), uid,
// .await // attrs); TODO: probably odn't care about this?
// .context("error during fetch-store")?; // let _ = mail2ui_tx.send(evt);
} }
}
} // TODO: check if IDLE is supported
tokio::time::sleep(std::time::Duration::from_secs(50)).await; let supports_idle = true; // authed.has_capability("IDLE").await?;
// TODO: remove this later if supports_idle {
// continue; let mut idle_stream = conn.idle().await?;
loop {
// let's just select INBOX for now, maybe have a config for default let evt = match idle_stream.next().await {
// mailbox later? Some(v) => v,
None => break,
debug!("selecting the INBOX mailbox"); };
let select = authed.select("INBOX").await?;
debug!("select result: {:?}", select); debug!("got an event: {:?}", evt);
match evt {
loop { Response::MailboxData(MailboxData::Exists(uid)) => {
let message_uids = authed.uid_search().await?; debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>(); // send DONE to stop the idle
// let _ = mail2ui_tx.send(MailEvent::MessageUids( std::mem::drop(idle_stream);
// acct_name.clone(), // let handle = Notification::new()
// message_uids.clone(), // .summary("New Email")
// )); // .body("TODO")
// TODO: make this happen concurrently with the main loop? // .icon("firefox")
let mut message_list = authed // .timeout(Timeout::Milliseconds(6000))
.uid_fetch(&message_uids, &[], FetchItems::All) // .show()?;
.await let message_uids = conn.uid_search().await?;
.unwrap(); let message_uids =
while let Some((_uid, _attrs)) = message_list.next().await { message_uids.into_iter().take(20).collect::<Vec<_>>();
// let evt = MailEvent::UpdateUid(acct_name.clone(), uid, // let _ = mail2ui_tx.send(MailEvent::MessageUids(
// attrs); TODO: probably odn't care about this? // acct_name.clone(),
// let _ = mail2ui_tx.send(evt); // message_uids.clone(),
} // ));
// TODO: make this happen concurrently with the main
// TODO: check if IDLE is supported // loop?
let supports_idle = true; // authed.has_capability("IDLE").await?; let mut message_list = conn
.uid_fetch(&message_uids, &[], FetchItems::All)
if supports_idle { .await
let mut idle_stream = authed.idle().await?; .unwrap();
loop { while let Some((_uid, _attrs)) = message_list.next().await {
let evt = match idle_stream.next().await { // let evt = MailEvent::UpdateUid(acct_name.
Some(v) => v, // clone(), uid, attrs);
None => break, // debug!("sent {:?}", evt);
}; // mail2ui_tx.send(evt);
}
debug!("got an event: {:?}", evt);
match evt { idle_stream = conn.idle().await?;
Response::MailboxData(MailboxData::Exists(uid)) => { }
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); _ => {}
// send DONE to stop the idle }
std::mem::drop(idle_stream); }
// let handle = Notification::new() } else {
// .summary("New Email") loop {
// .body("TODO") tokio::time::sleep(std::time::Duration::from_secs(20)).await;
// .icon("firefox") debug!("heartbeat");
// .timeout(Timeout::Milliseconds(6000)) }
// .show()?; }
let message_uids = authed.uid_search().await?; if false {
let message_uids = break;
message_uids.into_iter().take(20).collect::<Vec<_>>(); }
// let _ = mail2ui_tx.send(MailEvent::MessageUids( }
// acct_name.clone(),
// message_uids.clone(), // wait a bit so we're not hitting the server really fast if the fail
// )); // happens early on
// TODO: make this happen concurrently with the main //
// loop? // TODO: some kind of smart exponential backoff that considers some time
let mut message_list = authed // threshold to be a failing case?
.uid_fetch(&message_uids, &[], FetchItems::All) tokio::time::sleep(std::time::Duration::from_secs(5)).await;
.await
.unwrap(); Ok(())
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.
// clone(), uid, attrs);
// debug!("sent {:?}", evt);
// mail2ui_tx.send(evt);
}
idle_stream = authed.idle().await?;
}
_ => {}
}
}
} else {
loop {
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
debug!("heartbeat");
}
}
if false {
break;
}
}
// wait a bit so we're not hitting the server really fast if the fail
// happens early on
//
// TODO: some kind of smart exponential backoff that considers some time
// threshold to be a failing case?
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
} }

4
daemon/src/service.rs Normal file
View file

@ -0,0 +1,4 @@
pub trait Service<Request> {
type Response;
type Error;
}