Testing with sync loop again

This commit is contained in:
Michael Zhang 2021-11-05 08:51:58 -05:00
parent 6f504dc71c
commit fdf86c7167
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
7 changed files with 232 additions and 169 deletions

View file

@ -11,7 +11,11 @@ CREATE TABLE "mailboxes" (
); );
CREATE TABLE "messages" ( CREATE TABLE "messages" (
"id" TEXT PRIMARY KEY, -- "id" TEXT PRIMARY KEY,
"id" INTEGER PRIMARY KEY,
"mailbox_acct" TEXT NOT NULL,
"mailbox" TEXT NOT NULL,
"uid" INTEGER NOT NULL,
"date" DATETIME, "date" DATETIME,
"subject" TEXT, "subject" TEXT,
"from" JSON, "from" JSON,
@ -22,5 +26,8 @@ CREATE TABLE "messages" (
"bcc" JSON, "bcc" JSON,
"in_reply_to" TEXT, "in_reply_to" TEXT,
"message_id" TEXT, "message_id" TEXT,
"mbox" BLOB "mbox" BLOB,
FOREIGN KEY ("mailbox_acct") REFERENCES "mailboxes" ("account"),
FOREIGN KEY ("mailbox") REFERENCES "mailboxes" ("name")
); );

View file

@ -2,21 +2,20 @@
mod sessions; mod sessions;
mod store; mod store;
mod sync;
use anyhow::{Context, Result}; use anyhow::Result;
use futures::stream::StreamExt;
use panorama_imap::{ use panorama_imap::{
client::{ClientAuthenticated, ConfigBuilder}, client::ConfigBuilder,
pool::{ImapPool, PoolConfig}, pool::{ImapPool, PoolConfig},
proto::{
command::FetchItems,
response::{MailboxData, Response},
},
}; };
use sqlx::migrate::Migrator; use sqlx::migrate::Migrator;
use tokio::sync::mpsc::UnboundedSender; use tokio::sync::mpsc::UnboundedSender;
use crate::config::{MailAccountConfig, TlsMethod}; use crate::{
config::{MailAccountConfig, TlsMethod},
mail::sync::run_sync_loop,
};
// pub use self::event::MailEvent; // pub use self::event::MailEvent;
#[derive(Debug)] #[derive(Debug)]
@ -34,7 +33,7 @@ pub async fn mail_main(
acct_name: impl AsRef<str>, acct_name: impl AsRef<str>,
acct: MailAccountConfig, acct: MailAccountConfig,
_mail2ui_tx: UnboundedSender<MailEvent>, _mail2ui_tx: UnboundedSender<MailEvent>,
_mail_store: MailStore, mail_store: MailStore,
) -> Result<()> { ) -> Result<()> {
let acct_name = acct_name.as_ref(); let acct_name = acct_name.as_ref();
debug!( debug!(
@ -58,7 +57,7 @@ pub async fn mail_main(
// grab one connection from that pool and start running a background // grab one connection from that pool and start running a background
// synchronization thread // synchronization thread
let sync_conn = pool.acquire().await?; let sync_conn = pool.acquire().await?;
let sync_loop = run_sync_loop(sync_conn); let sync_loop = run_sync_loop(acct_name, sync_conn, mail_store);
// let the rest of the pool respond to requests coming from the channel // let the rest of the pool respond to requests coming from the channel
// TODO: // TODO:
@ -67,150 +66,3 @@ pub async fn mail_main(
Ok(()) Ok(())
} }
async fn run_sync_loop(mut conn: ClientAuthenticated) -> Result<()> {
// get the list of folders first
debug!("Retrieving folder list...");
let folder_list = conn.list().await?;
debug!("Mailbox list: {:?}", folder_list);
// let _ = mail2ui_tx.send(MailEvent::FolderList(
// acct_name.clone(),
// folder_list.clone(),
// ));
for folder in folder_list.iter() {
debug!("folder: {:?}", folder);
let select = conn.select("INBOX").await?;
debug!("select response: {:?}", select);
if let (Some(_exists), Some(_uidvalidity)) =
(select.exists, select.uid_validity)
{
// figure out which uids don't exist locally yet
let new_uids = vec![];
// let new_uids =
// stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
// todo!()
// // mail_store.try_identify_email(&acct_name, &folder,
// uid, uidvalidity, None) // //
// invert the option to only select uids that
// haven't been downloaded //
// .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
// // .map_err(|err| err.context("error checking if
// the email is already downloaded
// [try_identify_email]")) }).try_collect::
// <Vec<_>>().await?;
if !new_uids.is_empty() {
debug!("fetching uids {:?}", new_uids);
let _fetched = conn
.uid_fetch(&new_uids, &[], FetchItems::Envelope)
.await
.context("error fetching uids")?;
// fetched
// .map(Ok)
// .try_for_each_concurrent(None, |(uid, attrs)| {
// mail_store.store_email(&acct_name, &folder, uid,
// uidvalidity, attrs) })
// .await
// .context("error during fetch-store")?;
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// TODO: remove this later
// continue;
// let's just select INBOX for now, maybe have a config for default
// mailbox later?
debug!("selecting the INBOX mailbox");
let select = conn.select("INBOX").await?;
debug!("select result: {:?}", select);
loop {
let message_uids = conn.uid_search().await?;
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(),
// message_uids.clone(),
// ));
// TODO: make this happen concurrently with the main loop?
let mut message_list = conn
.uid_fetch(&message_uids, &[], FetchItems::All)
.await
.unwrap();
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.clone(), uid,
// attrs); TODO: probably odn't care about this?
// let _ = mail2ui_tx.send(evt);
}
// TODO: check if IDLE is supported
let supports_idle = true; // authed.has_capability("IDLE").await?;
if supports_idle {
let mut idle_stream = conn.idle().await?;
loop {
let evt = match idle_stream.next().await {
Some(v) => v,
None => break,
};
debug!("got an event: {:?}", evt);
match evt {
Response::MailboxData(MailboxData::Exists(uid)) => {
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
// send DONE to stop the idle
std::mem::drop(idle_stream);
// let handle = Notification::new()
// .summary("New Email")
// .body("TODO")
// .icon("firefox")
// .timeout(Timeout::Milliseconds(6000))
// .show()?;
let message_uids = conn.uid_search().await?;
let message_uids =
message_uids.into_iter().take(20).collect::<Vec<_>>();
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(),
// message_uids.clone(),
// ));
// TODO: make this happen concurrently with the main
// loop?
let mut message_list = conn
.uid_fetch(&message_uids, &[], FetchItems::All)
.await
.unwrap();
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.
// clone(), uid, attrs);
// debug!("sent {:?}", evt);
// mail2ui_tx.send(evt);
}
idle_stream = conn.idle().await?;
}
_ => {}
}
}
} else {
loop {
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
debug!("heartbeat");
}
}
if false {
break;
}
}
// wait a bit so we're not hitting the server really fast if the fail
// happens early on
//
// TODO: some kind of smart exponential backoff that considers some time
// threshold to be a failing case?
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(())
}

View file

@ -1,8 +1,10 @@
use anyhow::{Context, Result}; use anyhow::{Context, Result};
use panorama_imap::proto::response::{Mailbox, MessageAttribute};
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions}; use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
use super::MIGRATOR; use super::MIGRATOR;
#[derive(Clone)]
pub struct MailStore { pub struct MailStore {
pool: SqlitePool, pool: SqlitePool,
} }
@ -20,4 +22,30 @@ impl MailStore {
Ok(MailStore { pool }) Ok(MailStore { pool })
} }
pub async fn store_email(
&self,
acct_name: impl AsRef<str>,
mailbox: &Mailbox,
uid: u32,
uidvalidity: u32,
attrs: Vec<MessageAttribute>,
) -> Result<()> {
let acct_name = acct_name.as_ref();
sqlx::query(
r#"
INSERT INTO messages
(mailbox_acct, mailbox, uid, uidvalidity)
VALUES ($1, $2, $3, $4)
"#,
)
.bind(acct_name)
.bind(mailbox.to_string())
.bind(uid)
.bind(uidvalidity)
.execute(&self.pool)
.await
.context("could not insert message into store")?;
Ok(())
}
} }

165
daemon/src/mail/sync.rs Normal file
View file

@ -0,0 +1,165 @@
use anyhow::{Context, Result};
use futures::{StreamExt, TryStreamExt};
use panorama_imap::{
client::ClientAuthenticated,
proto::{
command::FetchItems,
response::{MailboxData, Response},
},
};
use super::MailStore;
pub async fn run_sync_loop(
acct_name: impl AsRef<str>,
mut conn: ClientAuthenticated,
mail_store: MailStore,
) -> Result<()> {
let acct_name = acct_name.as_ref();
// get the list of folders first
debug!("Retrieving folder list...");
let folder_list = conn.list().await?;
debug!("Mailbox list: {:?}", folder_list);
// let _ = mail2ui_tx.send(MailEvent::FolderList(
// acct_name.clone(),
// folder_list.clone(),
// ));
for folder in folder_list.iter() {
debug!("folder: {:?}", folder);
let select = conn.select("INBOX").await?;
debug!("select response: {:?}", select);
if let (Some(_exists), Some(uidvalidity)) =
(select.exists, select.uid_validity)
{
// figure out which uids don't exist locally yet
// let new_uids = vec![];
// let new_uids =
// stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
// todo!()
// // mail_store.try_identify_email(&acct_name, &folder,
// uid, uidvalidity, None) // //
// invert the option to only select uids that
// haven't been downloaded //
// .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
// // .map_err(|err| err.context("error checking if
// the email is already downloaded
// [try_identify_email]")) }).try_collect::
// <Vec<_>>().await?;
if true {
// !new_uids.is_empty() {
// debug!("fetching uids {:?}", new_uids);
let fetched = conn
.uid_fetch(&[], &[1..200], FetchItems::Envelope)
.await
.context("error fetching uids")?;
fetched
.map(Ok)
.try_for_each_concurrent(None, |(uid, attrs)| {
debug!("GOT MESSAGE {} {:?}", uid, attrs);
mail_store.store_email(&acct_name, folder, uid, uidvalidity, attrs)
})
.await
.context("error during fetch-store")?;
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// TODO: remove this later
// continue;
// let's just select INBOX for now, maybe have a config for default
// mailbox later?
debug!("selecting the INBOX mailbox");
let select = conn.select("INBOX").await?;
debug!("select result: {:?}", select);
loop {
let message_uids = conn.uid_search().await?;
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(),
// message_uids.clone(),
// ));
// TODO: make this happen concurrently with the main loop?
let mut message_list = conn
.uid_fetch(&message_uids, &[], FetchItems::All)
.await
.unwrap();
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.clone(), uid,
// attrs); TODO: probably odn't care about this?
// let _ = mail2ui_tx.send(evt);
}
// TODO: check if IDLE is supported
let supports_idle = true; // authed.has_capability("IDLE").await?;
if supports_idle {
let mut idle_stream = conn.idle().await?;
loop {
let evt = match idle_stream.next().await {
Some(v) => v,
None => break,
};
debug!("got an event: {:?}", evt);
match evt {
Response::MailboxData(MailboxData::Exists(uid)) => {
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
// send DONE to stop the idle
std::mem::drop(idle_stream);
// let handle = Notification::new()
// .summary("New Email")
// .body("TODO")
// .icon("firefox")
// .timeout(Timeout::Milliseconds(6000))
// .show()?;
let message_uids = conn.uid_search().await?;
let message_uids =
message_uids.into_iter().take(20).collect::<Vec<_>>();
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(),
// message_uids.clone(),
// ));
// TODO: make this happen concurrently with the main
// loop?
let mut message_list = conn
.uid_fetch(&message_uids, &[], FetchItems::All)
.await
.unwrap();
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.
// clone(), uid, attrs);
// debug!("sent {:?}", evt);
// mail2ui_tx.send(evt);
}
idle_stream = conn.idle().await?;
}
_ => {}
}
}
} else {
loop {
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
debug!("heartbeat");
}
}
if false {
break;
}
}
// wait a bit so we're not hitting the server really fast if the fail
// happens early on
//
// TODO: some kind of smart exponential backoff that considers some time
// threshold to be a failing case?
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
Ok(())
}

View file

@ -37,8 +37,8 @@ impl ImapAuth {
password: Bytes::from(password.clone()), password: Bytes::from(password.clone()),
}); });
let result = inner.execute(command).await?; inner.execute(command).await?;
todo!() Ok(())
} }
} }
} }

View file

@ -64,11 +64,11 @@ impl InnerPool {
pub async fn acquire(&self) -> Result<ClientAuthenticated> { pub async fn acquire(&self) -> Result<ClientAuthenticated> {
debug!("Trying to acquire a connection from the pool..."); debug!("Trying to acquire a connection from the pool...");
let guard = match self.connections.pop() {
// TODO: concurrency shit
match self.connections.pop() {
// we can reuse // we can reuse
Some(conn) => { Some(conn) => Ok(conn),
return Ok(conn);
}
// no existing connection, time to make a new one // no existing connection, time to make a new one
None => { None => {
@ -81,10 +81,11 @@ impl InnerPool {
debug!("Client connected to {}", self.config.client_config.hostname); debug!("Client connected to {}", self.config.client_config.hostname);
// authenticate // authenticate
let client_auth = client.auth(&self.config.auth_config); let client_auth = client.auth(&self.config.auth_config).await?;
} debug!("Authenticated using {:?}", self.config.auth_config);
};
todo!() Ok(client_auth)
}
}
} }
} }

View file

@ -1,3 +1,4 @@
use std::fmt;
use std::io::{self, Write}; use std::io::{self, Write};
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
@ -304,6 +305,15 @@ pub enum Mailbox {
Name(Bytes), Name(Bytes),
} }
impl fmt::Display for Mailbox {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Mailbox::Inbox => write!(f, "INBOX"),
Mailbox::Name(b) => write!(f, "{}", String::from_utf8_lossy(b)),
}
}
}
impl DisplayBytes for Mailbox { impl DisplayBytes for Mailbox {
fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> {
match self { match self {