diff --git a/daemon/migrations/20211013053733_initial.up.sql b/daemon/migrations/20211013053733_initial.up.sql index bc7e94c..f1e118e 100644 --- a/daemon/migrations/20211013053733_initial.up.sql +++ b/daemon/migrations/20211013053733_initial.up.sql @@ -11,7 +11,11 @@ CREATE TABLE "mailboxes" ( ); CREATE TABLE "messages" ( - "id" TEXT PRIMARY KEY, + -- "id" TEXT PRIMARY KEY, + "id" INTEGER PRIMARY KEY, + "mailbox_acct" TEXT NOT NULL, + "mailbox" TEXT NOT NULL, + "uid" INTEGER NOT NULL, "date" DATETIME, "subject" TEXT, "from" JSON, @@ -22,5 +26,8 @@ CREATE TABLE "messages" ( "bcc" JSON, "in_reply_to" TEXT, "message_id" TEXT, - "mbox" BLOB + "mbox" BLOB, + + FOREIGN KEY ("mailbox_acct") REFERENCES "mailboxes" ("account"), + FOREIGN KEY ("mailbox") REFERENCES "mailboxes" ("name") ); diff --git a/daemon/src/mail/mod.rs b/daemon/src/mail/mod.rs index 91aa4fb..3792ade 100644 --- a/daemon/src/mail/mod.rs +++ b/daemon/src/mail/mod.rs @@ -2,21 +2,20 @@ mod sessions; mod store; +mod sync; -use anyhow::{Context, Result}; -use futures::stream::StreamExt; +use anyhow::Result; use panorama_imap::{ - client::{ClientAuthenticated, ConfigBuilder}, + client::ConfigBuilder, pool::{ImapPool, PoolConfig}, - proto::{ - command::FetchItems, - response::{MailboxData, Response}, - }, }; use sqlx::migrate::Migrator; use tokio::sync::mpsc::UnboundedSender; -use crate::config::{MailAccountConfig, TlsMethod}; +use crate::{ + config::{MailAccountConfig, TlsMethod}, + mail::sync::run_sync_loop, +}; // pub use self::event::MailEvent; #[derive(Debug)] @@ -34,7 +33,7 @@ pub async fn mail_main( acct_name: impl AsRef, acct: MailAccountConfig, _mail2ui_tx: UnboundedSender, - _mail_store: MailStore, + mail_store: MailStore, ) -> Result<()> { let acct_name = acct_name.as_ref(); debug!( @@ -58,7 +57,7 @@ pub async fn mail_main( // grab one connection from that pool and start running a background // synchronization thread let sync_conn = pool.acquire().await?; - let sync_loop = run_sync_loop(sync_conn); + let sync_loop = run_sync_loop(acct_name, sync_conn, mail_store); // let the rest of the pool respond to requests coming from the channel // TODO: @@ -67,150 +66,3 @@ pub async fn mail_main( Ok(()) } - -async fn run_sync_loop(mut conn: ClientAuthenticated) -> Result<()> { - // get the list of folders first - debug!("Retrieving folder list..."); - let folder_list = conn.list().await?; - debug!("Mailbox list: {:?}", folder_list); - - // let _ = mail2ui_tx.send(MailEvent::FolderList( - // acct_name.clone(), - // folder_list.clone(), - // )); - - for folder in folder_list.iter() { - debug!("folder: {:?}", folder); - let select = conn.select("INBOX").await?; - debug!("select response: {:?}", select); - if let (Some(_exists), Some(_uidvalidity)) = - (select.exists, select.uid_validity) - { - // figure out which uids don't exist locally yet - let new_uids = vec![]; - // let new_uids = - // stream::iter(1..exists).map(Ok).try_filter_map(|uid| { - // todo!() - // // mail_store.try_identify_email(&acct_name, &folder, - // uid, uidvalidity, None) // // - // invert the option to only select uids that - // haven't been downloaded // - // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) - // // .map_err(|err| err.context("error checking if - // the email is already downloaded - // [try_identify_email]")) }).try_collect:: - // >().await?; - if !new_uids.is_empty() { - debug!("fetching uids {:?}", new_uids); - let _fetched = conn - .uid_fetch(&new_uids, &[], FetchItems::Envelope) - .await - .context("error fetching uids")?; - // fetched - // .map(Ok) - // .try_for_each_concurrent(None, |(uid, attrs)| { - // mail_store.store_email(&acct_name, &folder, uid, - // uidvalidity, attrs) }) - // .await - // .context("error during fetch-store")?; - } - } - } - tokio::time::sleep(std::time::Duration::from_secs(50)).await; - - // TODO: remove this later - // continue; - - // let's just select INBOX for now, maybe have a config for default - // mailbox later? - - debug!("selecting the INBOX mailbox"); - let select = conn.select("INBOX").await?; - debug!("select result: {:?}", select); - - loop { - let message_uids = conn.uid_search().await?; - let message_uids = message_uids.into_iter().take(30).collect::>(); - // let _ = mail2ui_tx.send(MailEvent::MessageUids( - // acct_name.clone(), - // message_uids.clone(), - // )); - // TODO: make this happen concurrently with the main loop? - let mut message_list = conn - .uid_fetch(&message_uids, &[], FetchItems::All) - .await - .unwrap(); - while let Some((_uid, _attrs)) = message_list.next().await { - // let evt = MailEvent::UpdateUid(acct_name.clone(), uid, - // attrs); TODO: probably odn't care about this? - // let _ = mail2ui_tx.send(evt); - } - - // TODO: check if IDLE is supported - let supports_idle = true; // authed.has_capability("IDLE").await?; - - if supports_idle { - let mut idle_stream = conn.idle().await?; - loop { - let evt = match idle_stream.next().await { - Some(v) => v, - None => break, - }; - - debug!("got an event: {:?}", evt); - match evt { - Response::MailboxData(MailboxData::Exists(uid)) => { - debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); - // send DONE to stop the idle - std::mem::drop(idle_stream); - // let handle = Notification::new() - // .summary("New Email") - // .body("TODO") - // .icon("firefox") - // .timeout(Timeout::Milliseconds(6000)) - // .show()?; - let message_uids = conn.uid_search().await?; - let message_uids = - message_uids.into_iter().take(20).collect::>(); - // let _ = mail2ui_tx.send(MailEvent::MessageUids( - // acct_name.clone(), - // message_uids.clone(), - // )); - // TODO: make this happen concurrently with the main - // loop? - let mut message_list = conn - .uid_fetch(&message_uids, &[], FetchItems::All) - .await - .unwrap(); - while let Some((_uid, _attrs)) = message_list.next().await { - // let evt = MailEvent::UpdateUid(acct_name. - // clone(), uid, attrs); - // debug!("sent {:?}", evt); - // mail2ui_tx.send(evt); - } - - idle_stream = conn.idle().await?; - } - _ => {} - } - } - } else { - loop { - tokio::time::sleep(std::time::Duration::from_secs(20)).await; - debug!("heartbeat"); - } - } - if false { - break; - } - } - - // wait a bit so we're not hitting the server really fast if the fail - // happens early on - // - // TODO: some kind of smart exponential backoff that considers some time - // threshold to be a failing case? - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - - Ok(()) -} diff --git a/daemon/src/mail/store.rs b/daemon/src/mail/store.rs index d7cff7e..07c0313 100644 --- a/daemon/src/mail/store.rs +++ b/daemon/src/mail/store.rs @@ -1,8 +1,10 @@ use anyhow::{Context, Result}; +use panorama_imap::proto::response::{Mailbox, MessageAttribute}; use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; use super::MIGRATOR; +#[derive(Clone)] pub struct MailStore { pool: SqlitePool, } @@ -20,4 +22,30 @@ impl MailStore { Ok(MailStore { pool }) } + + pub async fn store_email( + &self, + acct_name: impl AsRef, + mailbox: &Mailbox, + uid: u32, + uidvalidity: u32, + attrs: Vec, + ) -> Result<()> { + let acct_name = acct_name.as_ref(); + sqlx::query( + r#" + INSERT INTO messages + (mailbox_acct, mailbox, uid, uidvalidity) + VALUES ($1, $2, $3, $4) + "#, + ) + .bind(acct_name) + .bind(mailbox.to_string()) + .bind(uid) + .bind(uidvalidity) + .execute(&self.pool) + .await + .context("could not insert message into store")?; + Ok(()) + } } diff --git a/daemon/src/mail/sync.rs b/daemon/src/mail/sync.rs new file mode 100644 index 0000000..f6a2a82 --- /dev/null +++ b/daemon/src/mail/sync.rs @@ -0,0 +1,165 @@ +use anyhow::{Context, Result}; +use futures::{StreamExt, TryStreamExt}; +use panorama_imap::{ + client::ClientAuthenticated, + proto::{ + command::FetchItems, + response::{MailboxData, Response}, + }, +}; + +use super::MailStore; + +pub async fn run_sync_loop( + acct_name: impl AsRef, + mut conn: ClientAuthenticated, + mail_store: MailStore, +) -> Result<()> { + let acct_name = acct_name.as_ref(); + // get the list of folders first + debug!("Retrieving folder list..."); + let folder_list = conn.list().await?; + debug!("Mailbox list: {:?}", folder_list); + + // let _ = mail2ui_tx.send(MailEvent::FolderList( + // acct_name.clone(), + // folder_list.clone(), + // )); + + for folder in folder_list.iter() { + debug!("folder: {:?}", folder); + let select = conn.select("INBOX").await?; + debug!("select response: {:?}", select); + if let (Some(_exists), Some(uidvalidity)) = + (select.exists, select.uid_validity) + { + // figure out which uids don't exist locally yet + // let new_uids = vec![]; + // let new_uids = + // stream::iter(1..exists).map(Ok).try_filter_map(|uid| { + // todo!() + // // mail_store.try_identify_email(&acct_name, &folder, + // uid, uidvalidity, None) // // + // invert the option to only select uids that + // haven't been downloaded // + // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) + // // .map_err(|err| err.context("error checking if + // the email is already downloaded + // [try_identify_email]")) }).try_collect:: + // >().await?; + if true { + // !new_uids.is_empty() { + // debug!("fetching uids {:?}", new_uids); + let fetched = conn + .uid_fetch(&[], &[1..200], FetchItems::Envelope) + .await + .context("error fetching uids")?; + fetched + .map(Ok) + .try_for_each_concurrent(None, |(uid, attrs)| { + debug!("GOT MESSAGE {} {:?}", uid, attrs); + mail_store.store_email(&acct_name, folder, uid, uidvalidity, attrs) + }) + .await + .context("error during fetch-store")?; + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(50)).await; + + // TODO: remove this later + // continue; + + // let's just select INBOX for now, maybe have a config for default + // mailbox later? + + debug!("selecting the INBOX mailbox"); + let select = conn.select("INBOX").await?; + debug!("select result: {:?}", select); + + loop { + let message_uids = conn.uid_search().await?; + let message_uids = message_uids.into_iter().take(30).collect::>(); + // let _ = mail2ui_tx.send(MailEvent::MessageUids( + // acct_name.clone(), + // message_uids.clone(), + // )); + // TODO: make this happen concurrently with the main loop? + let mut message_list = conn + .uid_fetch(&message_uids, &[], FetchItems::All) + .await + .unwrap(); + while let Some((_uid, _attrs)) = message_list.next().await { + // let evt = MailEvent::UpdateUid(acct_name.clone(), uid, + // attrs); TODO: probably odn't care about this? + // let _ = mail2ui_tx.send(evt); + } + + // TODO: check if IDLE is supported + let supports_idle = true; // authed.has_capability("IDLE").await?; + + if supports_idle { + let mut idle_stream = conn.idle().await?; + loop { + let evt = match idle_stream.next().await { + Some(v) => v, + None => break, + }; + + debug!("got an event: {:?}", evt); + match evt { + Response::MailboxData(MailboxData::Exists(uid)) => { + debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); + // send DONE to stop the idle + std::mem::drop(idle_stream); + // let handle = Notification::new() + // .summary("New Email") + // .body("TODO") + // .icon("firefox") + // .timeout(Timeout::Milliseconds(6000)) + // .show()?; + let message_uids = conn.uid_search().await?; + let message_uids = + message_uids.into_iter().take(20).collect::>(); + // let _ = mail2ui_tx.send(MailEvent::MessageUids( + // acct_name.clone(), + // message_uids.clone(), + // )); + // TODO: make this happen concurrently with the main + // loop? + let mut message_list = conn + .uid_fetch(&message_uids, &[], FetchItems::All) + .await + .unwrap(); + while let Some((_uid, _attrs)) = message_list.next().await { + // let evt = MailEvent::UpdateUid(acct_name. + // clone(), uid, attrs); + // debug!("sent {:?}", evt); + // mail2ui_tx.send(evt); + } + + idle_stream = conn.idle().await?; + } + _ => {} + } + } + } else { + loop { + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + debug!("heartbeat"); + } + } + if false { + break; + } + } + + // wait a bit so we're not hitting the server really fast if the fail + // happens early on + // + // TODO: some kind of smart exponential backoff that considers some time + // threshold to be a failing case? + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + Ok(()) +} diff --git a/imap/src/client/auth.rs b/imap/src/client/auth.rs index d79b896..29bb7ea 100644 --- a/imap/src/client/auth.rs +++ b/imap/src/client/auth.rs @@ -37,8 +37,8 @@ impl ImapAuth { password: Bytes::from(password.clone()), }); - let result = inner.execute(command).await?; - todo!() + inner.execute(command).await?; + Ok(()) } } } diff --git a/imap/src/pool/mod.rs b/imap/src/pool/mod.rs index 77e9df8..885e0fc 100644 --- a/imap/src/pool/mod.rs +++ b/imap/src/pool/mod.rs @@ -64,11 +64,11 @@ impl InnerPool { pub async fn acquire(&self) -> Result { debug!("Trying to acquire a connection from the pool..."); - let guard = match self.connections.pop() { + + // TODO: concurrency shit + match self.connections.pop() { // we can reuse - Some(conn) => { - return Ok(conn); - } + Some(conn) => Ok(conn), // no existing connection, time to make a new one None => { @@ -81,10 +81,11 @@ impl InnerPool { debug!("Client connected to {}", self.config.client_config.hostname); // authenticate - let client_auth = client.auth(&self.config.auth_config); - } - }; + let client_auth = client.auth(&self.config.auth_config).await?; + debug!("Authenticated using {:?}", self.config.auth_config); - todo!() + Ok(client_auth) + } + } } } diff --git a/imap/src/proto/response.rs b/imap/src/proto/response.rs index bbb51be..f756533 100644 --- a/imap/src/proto/response.rs +++ b/imap/src/proto/response.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::io::{self, Write}; use std::ops::RangeInclusive; @@ -304,6 +305,15 @@ pub enum Mailbox { Name(Bytes), } +impl fmt::Display for Mailbox { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Mailbox::Inbox => write!(f, "INBOX"), + Mailbox::Name(b) => write!(f, "{}", String::from_utf8_lossy(b)), + } + } +} + impl DisplayBytes for Mailbox { fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { match self {