Change to 2 spaces
This commit is contained in:
parent
1ab69aadb9
commit
ff18e98eff
24 changed files with 2051 additions and 2088 deletions
|
@ -5,8 +5,8 @@ use std::sync::mpsc as stdmpsc;
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use futures::future::TryFutureExt;
|
use futures::future::TryFutureExt;
|
||||||
use notify::{
|
use notify::{
|
||||||
recommended_watcher, Event as NotifyEvent, RecommendedWatcher,
|
recommended_watcher, Event as NotifyEvent, RecommendedWatcher, RecursiveMode,
|
||||||
RecursiveMode, Watcher,
|
Watcher,
|
||||||
};
|
};
|
||||||
use tokio::{sync::watch, task::JoinHandle};
|
use tokio::{sync::watch, task::JoinHandle};
|
||||||
use xdg::BaseDirectories;
|
use xdg::BaseDirectories;
|
||||||
|
@ -87,19 +87,14 @@ async fn start_notify_stream(
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO: any better way to do this?
|
// TODO: any better way to do this?
|
||||||
let config_path_c =
|
let config_path_c = config_path.canonicalize().context("cfg_path")?;
|
||||||
config_path.canonicalize().context("cfg_path")?;
|
|
||||||
if config_path_c != path_expect {
|
if config_path_c != path_expect {
|
||||||
debug!(
|
debug!("did not match {:?} {:?}", config_path_c, path_expect);
|
||||||
"did not match {:?} {:?}",
|
|
||||||
config_path_c, path_expect
|
|
||||||
);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("reading config from {:?}", path_expect);
|
debug!("reading config from {:?}", path_expect);
|
||||||
let config =
|
let config = Config::from_file(path_expect).await.context("read")?;
|
||||||
Config::from_file(path_expect).await.context("read")?;
|
|
||||||
// debug!("sending config {:?}", config);
|
// debug!("sending config {:?}", config);
|
||||||
config_tx.send(config)?;
|
config_tx.send(config)?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -98,8 +98,7 @@ async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> {
|
||||||
for (account_name, account) in config.mail_accounts {
|
for (account_name, account) in config.mail_accounts {
|
||||||
let (exit_tx, exit_rx) = oneshot::channel();
|
let (exit_tx, exit_rx) = oneshot::channel();
|
||||||
tokio::spawn(async {
|
tokio::spawn(async {
|
||||||
match run_single_mail_account(account_name, account, exit_rx).await
|
match run_single_mail_account(account_name, account, exit_rx).await {
|
||||||
{
|
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(err) => panic!("failed: {:?}", err),
|
Err(err) => panic!("failed: {:?}", err),
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,16 +118,17 @@ pub async fn sync_main(
|
||||||
|
|
||||||
// TODO: remove this later
|
// TODO: remove this later
|
||||||
// continue;
|
// continue;
|
||||||
|
|
||||||
// let's just select INBOX for now, maybe have a config for default
|
// let's just select INBOX for now, maybe have a config for default
|
||||||
// mailbox later?
|
// mailbox later?
|
||||||
|
|
||||||
debug!("selecting the INBOX mailbox");
|
debug!("selecting the INBOX mailbox");
|
||||||
let select = authed.select("INBOX").await?;
|
let select = authed.select("INBOX").await?;
|
||||||
debug!("select result: {:?}", select);
|
debug!("select result: {:?}", select);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let message_uids = authed.uid_search().await?;
|
let message_uids = authed.uid_search().await?;
|
||||||
let message_uids =
|
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
|
||||||
message_uids.into_iter().take(30).collect::<Vec<_>>();
|
|
||||||
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
|
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
|
||||||
// acct_name.clone(),
|
// acct_name.clone(),
|
||||||
// message_uids.clone(),
|
// message_uids.clone(),
|
||||||
|
@ -157,10 +158,7 @@ pub async fn sync_main(
|
||||||
debug!("got an event: {:?}", evt);
|
debug!("got an event: {:?}", evt);
|
||||||
match evt {
|
match evt {
|
||||||
Response::MailboxData(MailboxData::Exists(uid)) => {
|
Response::MailboxData(MailboxData::Exists(uid)) => {
|
||||||
debug!(
|
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
|
||||||
"NEW MESSAGE WITH UID {:?}, droping everything",
|
|
||||||
uid
|
|
||||||
);
|
|
||||||
// send DONE to stop the idle
|
// send DONE to stop the idle
|
||||||
std::mem::drop(idle_stream);
|
std::mem::drop(idle_stream);
|
||||||
// let handle = Notification::new()
|
// let handle = Notification::new()
|
||||||
|
@ -170,10 +168,8 @@ pub async fn sync_main(
|
||||||
// .timeout(Timeout::Milliseconds(6000))
|
// .timeout(Timeout::Milliseconds(6000))
|
||||||
// .show()?;
|
// .show()?;
|
||||||
let message_uids = authed.uid_search().await?;
|
let message_uids = authed.uid_search().await?;
|
||||||
let message_uids = message_uids
|
let message_uids =
|
||||||
.into_iter()
|
message_uids.into_iter().take(20).collect::<Vec<_>>();
|
||||||
.take(20)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
|
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
|
||||||
// acct_name.clone(),
|
// acct_name.clone(),
|
||||||
// message_uids.clone(),
|
// message_uids.clone(),
|
||||||
|
@ -184,9 +180,7 @@ pub async fn sync_main(
|
||||||
.uid_fetch(&message_uids, &[], FetchItems::All)
|
.uid_fetch(&message_uids, &[], FetchItems::All)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
while let Some((_uid, _attrs)) =
|
while let Some((_uid, _attrs)) = message_list.next().await {
|
||||||
message_list.next().await
|
|
||||||
{
|
|
||||||
// let evt = MailEvent::UpdateUid(acct_name.
|
// let evt = MailEvent::UpdateUid(acct_name.
|
||||||
// clone(), uid, attrs);
|
// clone(), uid, attrs);
|
||||||
// debug!("sent {:?}", evt);
|
// debug!("sent {:?}", evt);
|
||||||
|
@ -200,8 +194,7 @@ pub async fn sync_main(
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
loop {
|
loop {
|
||||||
tokio::time::sleep(std::time::Duration::from_secs(20))
|
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
|
||||||
.await;
|
|
||||||
debug!("heartbeat");
|
debug!("heartbeat");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ pub struct MailStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MailStore {
|
impl MailStore {
|
||||||
/// Creates a new connection to a SQLite database.
|
/// Creates a new store tied to a SQLite database.
|
||||||
pub async fn open(uri: impl AsRef<str>) -> Result<Self> {
|
pub async fn open(uri: impl AsRef<str>) -> Result<Self> {
|
||||||
let pool = SqlitePoolOptions::new().connect(uri.as_ref()).await?;
|
let pool = SqlitePoolOptions::new().connect(uri.as_ref()).await?;
|
||||||
|
|
||||||
|
|
|
@ -201,12 +201,8 @@ impl ClientAuthenticated {
|
||||||
..
|
..
|
||||||
}) => match code {
|
}) => match code {
|
||||||
ResponseCode::Unseen(value) => select.unseen = Some(value),
|
ResponseCode::Unseen(value) => select.unseen = Some(value),
|
||||||
ResponseCode::UidNext(value) => {
|
ResponseCode::UidNext(value) => select.uid_next = Some(value),
|
||||||
select.uid_next = Some(value)
|
ResponseCode::UidValidity(value) => select.uid_validity = Some(value),
|
||||||
}
|
|
||||||
ResponseCode::UidValidity(value) => {
|
|
||||||
select.uid_validity = Some(value)
|
|
||||||
}
|
|
||||||
_ => {}
|
_ => {}
|
||||||
},
|
},
|
||||||
_ => warn!("unknown response {:?}", resp),
|
_ => warn!("unknown response {:?}", resp),
|
||||||
|
@ -250,9 +246,7 @@ impl ClientAuthenticated {
|
||||||
let stream = self.execute(cmd).await?;
|
let stream = self.execute(cmd).await?;
|
||||||
// let (done, data) = stream.wait().await?;
|
// let (done, data) = stream.wait().await?;
|
||||||
Ok(stream.filter_map(|resp| match resp {
|
Ok(stream.filter_map(|resp| match resp {
|
||||||
Response::Fetch(n, attrs) => {
|
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
|
||||||
future::ready(Some((n, attrs))).boxed()
|
|
||||||
}
|
|
||||||
Response::Done(_) => future::ready(None).boxed(),
|
Response::Done(_) => future::ready(None).boxed(),
|
||||||
_ => future::pending().boxed(),
|
_ => future::pending().boxed(),
|
||||||
}))
|
}))
|
||||||
|
@ -277,9 +271,7 @@ impl ClientAuthenticated {
|
||||||
let stream = self.execute(cmd).await?;
|
let stream = self.execute(cmd).await?;
|
||||||
// let (done, data) = stream.wait().await?;
|
// let (done, data) = stream.wait().await?;
|
||||||
Ok(stream.filter_map(|resp| match resp {
|
Ok(stream.filter_map(|resp| match resp {
|
||||||
Response::Fetch(n, attrs) => {
|
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
|
||||||
future::ready(Some((n, attrs))).boxed()
|
|
||||||
}
|
|
||||||
Response::Done(_) => future::ready(None).boxed(),
|
Response::Done(_) => future::ready(None).boxed(),
|
||||||
_ => future::pending().boxed(),
|
_ => future::pending().boxed(),
|
||||||
}))
|
}))
|
||||||
|
|
|
@ -12,8 +12,7 @@ use futures::{
|
||||||
use panorama_proto_common::Bytes;
|
use panorama_proto_common::Bytes;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::{
|
io::{
|
||||||
split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf,
|
split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf,
|
||||||
WriteHalf,
|
|
||||||
},
|
},
|
||||||
sync::{mpsc, oneshot, RwLock},
|
sync::{mpsc, oneshot, RwLock},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
|
@ -93,8 +92,7 @@ where
|
||||||
|
|
||||||
// spawn the client->server loop
|
// spawn the client->server loop
|
||||||
let (write_exit, exit_rx) = oneshot::channel();
|
let (write_exit, exit_rx) = oneshot::channel();
|
||||||
let write_handle =
|
let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx));
|
||||||
tokio::spawn(write_loop(write_half, exit_rx, write_rx));
|
|
||||||
|
|
||||||
let tag_number = AtomicU32::new(0);
|
let tag_number = AtomicU32::new(0);
|
||||||
let capabilities = Arc::new(RwLock::new(None));
|
let capabilities = Arc::new(RwLock::new(None));
|
||||||
|
@ -112,10 +110,7 @@ where
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn execute(
|
pub async fn execute(&mut self, command: Command) -> Result<ResponseStream> {
|
||||||
&mut self,
|
|
||||||
command: Command,
|
|
||||||
) -> Result<ResponseStream> {
|
|
||||||
let id = self.tag_number.fetch_add(1, Ordering::SeqCst);
|
let id = self.tag_number.fetch_add(1, Ordering::SeqCst);
|
||||||
let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id)));
|
let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id)));
|
||||||
|
|
||||||
|
@ -130,10 +125,7 @@ where
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn has_capability(
|
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
|
||||||
&mut self,
|
|
||||||
cap: impl AsRef<str>,
|
|
||||||
) -> Result<bool> {
|
|
||||||
let mut cap_slice = cap.as_ref().as_bytes().to_vec();
|
let mut cap_slice = cap.as_ref().as_bytes().to_vec();
|
||||||
|
|
||||||
// since we're doing incremental parsing, we have to finish this off
|
// since we're doing incremental parsing, we have to finish this off
|
||||||
|
@ -143,8 +135,8 @@ where
|
||||||
let cap_bytes = Bytes::from(cap_slice);
|
let cap_bytes = Bytes::from(cap_slice);
|
||||||
trace!("CAP_BYTES: {:?}", cap_bytes);
|
trace!("CAP_BYTES: {:?}", cap_bytes);
|
||||||
|
|
||||||
let (_, cap) = parse_capability(cap_bytes)
|
let (_, cap) =
|
||||||
.context("could not parse capability")?;
|
parse_capability(cap_bytes).context("could not parse capability")?;
|
||||||
|
|
||||||
let contains = {
|
let contains = {
|
||||||
let read = self.capabilities.read().await;
|
let read = self.capabilities.read().await;
|
||||||
|
@ -193,7 +185,8 @@ where
|
||||||
.execute(Command::Starttls)
|
.execute(Command::Starttls)
|
||||||
.await
|
.await
|
||||||
.context("could not send starttls command")?;
|
.context("could not send starttls command")?;
|
||||||
resp.wait()
|
resp
|
||||||
|
.wait()
|
||||||
.await
|
.await
|
||||||
.context("could not receive starttls response")?;
|
.context("could not receive starttls response")?;
|
||||||
debug!("received OK from server");
|
debug!("received OK from server");
|
||||||
|
|
|
@ -25,9 +25,7 @@ impl ResponseStream {
|
||||||
|
|
||||||
/// Waits for the entire stream to finish, returning the DONE status and the
|
/// Waits for the entire stream to finish, returning the DONE status and the
|
||||||
/// stream
|
/// stream
|
||||||
pub async fn wait(
|
pub async fn wait(mut self) -> Result<(Option<ResponseDone>, Vec<Response>)> {
|
||||||
mut self,
|
|
||||||
) -> Result<(Option<ResponseDone>, Vec<Response>)> {
|
|
||||||
let mut done = None;
|
let mut done = None;
|
||||||
let mut vec = Vec::new();
|
let mut vec = Vec::new();
|
||||||
while let Some(resp) = self.inner.recv().await {
|
while let Some(resp) = self.inner.recv().await {
|
||||||
|
|
|
@ -5,8 +5,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tokio_rustls::{
|
use tokio_rustls::{
|
||||||
client::TlsStream,
|
client::TlsStream,
|
||||||
rustls::{
|
rustls::{
|
||||||
ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore,
|
ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore, ServerName,
|
||||||
ServerName,
|
|
||||||
},
|
},
|
||||||
TlsConnector,
|
TlsConnector,
|
||||||
};
|
};
|
||||||
|
|
|
@ -47,29 +47,20 @@ pub fn convert_error<I: Deref<Target = [u8]> + Debug>(
|
||||||
if input.is_empty() {
|
if input.is_empty() {
|
||||||
match kind {
|
match kind {
|
||||||
VerboseErrorKind::Char(c) => {
|
VerboseErrorKind::Char(c) => {
|
||||||
write!(
|
write!(&mut result, "{}: expected '{}', got empty input\n\n", i, c)
|
||||||
&mut result,
|
|
||||||
"{}: expected '{}', got empty input\n\n",
|
|
||||||
i, c
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
VerboseErrorKind::Context(s) => {
|
VerboseErrorKind::Context(s) => {
|
||||||
write!(&mut result, "{}: in {}, got empty input\n\n", i, s)
|
write!(&mut result, "{}: in {}, got empty input\n\n", i, s)
|
||||||
}
|
}
|
||||||
VerboseErrorKind::Nom(e) => {
|
VerboseErrorKind::Nom(e) => {
|
||||||
write!(
|
write!(&mut result, "{}: in {:?}, got empty input\n\n", i, e)
|
||||||
&mut result,
|
|
||||||
"{}: in {:?}, got empty input\n\n",
|
|
||||||
i, e
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let prefix = &input.as_bytes()[..offset];
|
let prefix = &input.as_bytes()[..offset];
|
||||||
|
|
||||||
// Count the number of newlines in the first `offset` bytes of input
|
// Count the number of newlines in the first `offset` bytes of input
|
||||||
let line_number =
|
let line_number = prefix.iter().filter(|&&b| b == b'\n').count() + 1;
|
||||||
prefix.iter().filter(|&&b| b == b'\n').count() + 1;
|
|
||||||
|
|
||||||
// Find the line that includes the subslice:
|
// Find the line that includes the subslice:
|
||||||
// Find the *last* newline before the substring starts
|
// Find the *last* newline before the substring starts
|
||||||
|
|
|
@ -44,7 +44,8 @@ macro_rules! sep_list {
|
||||||
many0(preceded(panorama_proto_common::byte(b'\x20'), $t)),
|
many0(preceded(panorama_proto_common::byte(b'\x20'), $t)),
|
||||||
)),
|
)),
|
||||||
|opt| {
|
|opt| {
|
||||||
opt.map(|(hd, mut tl)| {
|
opt
|
||||||
|
.map(|(hd, mut tl)| {
|
||||||
tl.insert(0, hd);
|
tl.insert(0, hd);
|
||||||
tl
|
tl
|
||||||
})
|
})
|
||||||
|
@ -54,7 +55,8 @@ macro_rules! sep_list {
|
||||||
};
|
};
|
||||||
(? $t:expr, $d:expr) => {
|
(? $t:expr, $d:expr) => {
|
||||||
map(opt(pair($t, many0(preceded($d, $t)))), |opt| {
|
map(opt(pair($t, many0(preceded($d, $t)))), |opt| {
|
||||||
opt.map(|(hd, mut tl)| {
|
opt
|
||||||
|
.map(|(hd, mut tl)| {
|
||||||
tl.insert(0, hd);
|
tl.insert(0, hd);
|
||||||
tl
|
tl
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
fn_single_line = true
|
fn_single_line = true
|
||||||
max_width = 80
|
max_width = 80
|
||||||
wrap_comments = true
|
wrap_comments = true
|
||||||
|
tab_spaces = 2
|
||||||
|
|
Loading…
Reference in a new issue