From 5ad3a844e11f0eff33a9f43c374a8c967f998193 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Sun, 14 Feb 2021 07:20:35 -0600 Subject: [PATCH] i hate code --- src/config.rs | 26 +++++++++++++------------- src/lib.rs | 2 +- src/mail.rs | 18 +++++++++++++++--- src/main.rs | 27 +++++++++++++++++++-------- src/ui/mod.rs | 3 ++- 5 files changed, 50 insertions(+), 26 deletions(-) diff --git a/src/config.rs b/src/config.rs index 964f44d..e75a978 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,14 +3,14 @@ //! One of the primary goals of panorama is to be able to always hot-reload configuration files. use std::fs::File; -use std::sync::mpsc::{self, Receiver}; -use std::time::Duration; use std::io::Read; use std::path::Path; +use std::sync::mpsc::{self, Receiver}; +use std::time::Duration; -use anyhow::{Result, Context}; +use anyhow::{Context, Result}; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher}; -use tokio::sync::watch; +use tokio::{sync::watch, task::JoinHandle}; use xdg::BaseDirectories; /// Alias for a MailConfig receiver. @@ -34,17 +34,16 @@ pub struct MailConfig { /// Spawns a notify::RecommendedWatcher to watch the XDG config directory. Whenever the config file /// is updated, the config file is parsed and sent to the receiver. -fn start_watcher() -> Result<( - RecommendedWatcher, - Receiver, -)> { +fn start_watcher() -> Result<(RecommendedWatcher, Receiver)> { let (tx, rx) = mpsc::channel(); let mut watcher = RecommendedWatcher::new(tx, Duration::from_secs(5))?; let xdg = BaseDirectories::new()?; let config_home = xdg.get_config_home(); debug!("config_home: {:?}", config_home); - watcher.watch(config_home.join("panorama"), RecursiveMode::Recursive).context("could not watch config_home")?; + watcher + .watch(config_home.join("panorama"), RecursiveMode::Recursive) + .context("could not watch config_home")?; Ok((watcher, rx)) } @@ -85,11 +84,12 @@ async fn watcher_loop( /// Start the entire config watcher system, and return a [ConfigWatcher][self::ConfigWatcher], /// which is a cloneable receiver of config update events. -pub fn spawn_config_watcher() -> Result { - let (_watcher, config_rx) = start_watcher()?; +pub fn spawn_config_watcher() -> Result<(JoinHandle<()>, ConfigWatcher)> { + let (watcher, config_rx) = start_watcher()?; let (config_tx, config_update) = watch::channel(None); - tokio::spawn(async move { + let config_thread = tokio::spawn(async move { + let _watcher = watcher; match watcher_loop(config_rx, config_tx).await { Ok(_) => {} Err(err) => { @@ -98,5 +98,5 @@ pub fn spawn_config_watcher() -> Result { } }); - Ok(config_update) + Ok((config_thread, config_update)) } diff --git a/src/lib.rs b/src/lib.rs index 84b6f59..3a91e7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,4 +17,4 @@ pub mod mail; pub mod ui; /// A cloneable type that allows sending an exit-"signal" to stop the application. -pub type ExitSender = tokio::sync::oneshot::Sender<()>; +pub type ExitSender = tokio::sync::mpsc::Sender<()>; diff --git a/src/mail.rs b/src/mail.rs index 638bfc8..ca80f2d 100644 --- a/src/mail.rs +++ b/src/mail.rs @@ -19,12 +19,13 @@ use panorama_imap::{ use tokio::{ net::TcpStream, sync::mpsc::{self, UnboundedReceiver}, + task::JoinHandle, }; use tokio_rustls::{rustls::ClientConfig, webpki::DNSNameRef, TlsConnector}; use tokio_stream::wrappers::WatchStream; use tokio_util::codec::{Decoder, LinesCodec, LinesCodecError}; -use crate::config::{MailConfig, ConfigWatcher}; +use crate::config::{ConfigWatcher, MailConfig}; /// Command sent to the mail thread by something else (i.e. UI) pub enum MailCommand { @@ -40,7 +41,7 @@ pub async fn run_mail( config_watcher: ConfigWatcher, cmd_in: UnboundedReceiver, ) -> Result<()> { - let mut curr_conn = None; + let mut curr_conn: Option> = None; let mut config_watcher = WatchStream::new(config_watcher); loop { @@ -49,6 +50,13 @@ pub async fn run_mail( _ => break, }; + // TODO: gracefully shut down connection + // just gonna drop the connection for now + if let Some(mut curr_conn) = curr_conn.take() { + debug!("dropping connection..."); + curr_conn.abort(); + } + let handle = tokio::spawn(open_imap_connection(config)); curr_conn = Some(handle); } @@ -61,6 +69,7 @@ async fn open_imap_connection(config: MailConfig) -> Result<()> { "Opening imap connection to {}:{}", config.server, config.port ); + let server = config.server.as_str(); let port = config.port; @@ -71,6 +80,7 @@ async fn open_imap_connection(config: MailConfig) -> Result<()> { let (sink, stream) = framed.split::(); let result = listen_loop(config.clone(), &mut state, sink, stream, false).await?; + if let LoopExit::NegotiateTls(stream, sink) = result { debug!("negotiating tls"); let mut tls_config = ClientConfig::new(); @@ -97,7 +107,9 @@ async fn open_imap_connection(config: MailConfig) -> Result<()> { /// Action that should be taken after the connection loop quits. enum LoopExit { /// Used in case the STARTTLS command is issued; perform TLS negotiation on top of the current - /// stream + /// stream. + /// + /// S and S2 are stream and sink types, respectively. NegotiateTls(S, S2), Closed, } diff --git a/src/main.rs b/src/main.rs index aa2a783..7999dd7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,19 @@ +#[macro_use] +extern crate log; + use std::fs::File; use std::io::Read; use std::path::PathBuf; use anyhow::Result; use futures::future::TryFutureExt; +use panorama::{ + config::{spawn_config_watcher, MailConfig}, + mail, ui, +}; use structopt::StructOpt; use tokio::sync::{mpsc, oneshot}; use xdg::BaseDirectories; -use panorama::{config::{spawn_config_watcher, MailConfig}, mail, ui}; #[derive(Debug, StructOpt)] #[structopt(author, about)] @@ -30,7 +36,7 @@ async fn main() -> Result<()> { setup_logger(&opt)?; let xdg = BaseDirectories::new()?; - let config_update = spawn_config_watcher()?; + let (config_thread, config_update) = spawn_config_watcher()?; // let config: MailConfig = { // let config_path = opt @@ -44,17 +50,22 @@ async fn main() -> Result<()> { // }; // used to notify the runtime that the process should exit - let (exit_tx, exit_rx) = oneshot::channel::<()>(); + let (exit_tx, mut exit_rx) = mpsc::channel::<()>(1); // used to send commands to the mail service let (mail_tx, mail_rx) = mpsc::unbounded_channel(); - tokio::spawn(mail::run_mail(config_update.clone(), mail_rx).unwrap_or_else(report_err)); + let mail_thread = tokio::spawn(mail::run_mail(config_update.clone(), mail_rx).unwrap_or_else(report_err)); let stdout = std::io::stdout(); - tokio::spawn(ui::run_ui(stdout, exit_tx).unwrap_or_else(report_err)); + let ui_thread = tokio::spawn(ui::run_ui(stdout, exit_tx).unwrap_or_else(report_err)); - exit_rx.await?; - Ok(()) + exit_rx.recv().await; + + // TODO: graceful shutdown + // yada yada create a background process and pass off the connections so they can be safely + // shutdown + std::process::exit(0); + // Ok(()) } fn setup_logger(opt: &Opt) -> Result<()> { @@ -79,5 +90,5 @@ fn setup_logger(opt: &Opt) -> Result<()> { } fn report_err(err: anyhow::Error) { - log::error!("error: {:?}", err); + error!("error: {:?}", err); } diff --git a/src/ui/mod.rs b/src/ui/mod.rs index ad13c42..46e567b 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -76,6 +76,7 @@ pub async fn run_ui(mut w: impl Write, exit: ExitSender) -> Result<()> { )?; terminal::disable_raw_mode()?; - exit.send(()).expect("fake news?"); + exit.send(()).await?; + debug!("sent exit"); Ok(()) }