- reformat with 80char
- add SQLx to daemon
- add backtrace to anyhow
This commit is contained in:
Michael Zhang 2021-10-13 00:50:48 -05:00
parent 9d89a47e8b
commit 3f09e8b55f
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
32 changed files with 1056 additions and 234 deletions

819
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -4,7 +4,7 @@ version = "0.0.1"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
anyhow = "1.0.42" anyhow = { version = "1.0.42", features = ["backtrace"] }
async-trait = "0.1.50" async-trait = "0.1.50"
clap = "3.0.0-beta.2" clap = "3.0.0-beta.2"
derivative = "2.2.0" derivative = "2.2.0"
@ -20,3 +20,7 @@ tokio = { version = "1.9.0", features = ["full"] }
tokio-rustls = "0.22.0" tokio-rustls = "0.22.0"
toml = "0.5.8" toml = "0.5.8"
xdg = "2.2.0" xdg = "2.2.0"
[dependencies.sqlx]
version = "0.5.9"
features = ["runtime-tokio-rustls", "sqlite", "json", "chrono"]

View file

@ -0,0 +1,3 @@
DROP TABLE "messages";
DROP TABLE "mailboxes";
DROP TABLE "accounts";

View file

@ -0,0 +1,14 @@
CREATE TABLE "accounts" (
"id" PRIMARY KEY AUTOINCREMENT
);
CREATE TABLE "mailboxes" (
"account" INTEGER,
"name" TEXT,
PRIMARY KEY ("account", "name")
);
CREATE TABLE "messages" (
"id" TEXT PRIMARY KEY
);

View file

@ -14,7 +14,8 @@ pub type ConfigWatcher = watch::Receiver<Config>;
/// Start the entire config watcher system, and return a /// Start the entire config watcher system, and return a
/// [ConfigWatcher][self::ConfigWatcher], which is a cloneable receiver of /// [ConfigWatcher][self::ConfigWatcher], which is a cloneable receiver of
/// config update events. /// config update events.
pub fn spawn_config_watcher_system() -> Result<(JoinHandle<()>, ConfigWatcher)> { pub fn spawn_config_watcher_system() -> Result<(JoinHandle<()>, ConfigWatcher)>
{
let mut inotify = Inotify::init()?; let mut inotify = Inotify::init()?;
let xdg = BaseDirectories::new()?; let xdg = BaseDirectories::new()?;
let config_home = xdg.get_config_home().join("panorama"); let config_home = xdg.get_config_home().join("panorama");
@ -29,7 +30,8 @@ pub fn spawn_config_watcher_system() -> Result<(JoinHandle<()>, ConfigWatcher)>
debug!("watching {:?}", config_home); debug!("watching {:?}", config_home);
let (config_tx, config_update) = watch::channel(Config::default()); let (config_tx, config_update) = watch::channel(Config::default());
let handle = tokio::spawn( let handle = tokio::spawn(
start_inotify_stream(inotify, config_home, config_tx).unwrap_or_else(|_err| todo!()), start_inotify_stream(inotify, config_home, config_tx)
.unwrap_or_else(|_err| todo!()),
); );
Ok((handle, config_update)) Ok((handle, config_update))
} }
@ -66,7 +68,8 @@ async fn start_inotify_stream(
continue; continue;
} }
// TODO: any better way to do this? // TODO: any better way to do this?
let config_path_c = config_path.canonicalize().context("cfg_path")?; let config_path_c =
config_path.canonicalize().context("cfg_path")?;
if config_path_c != path_c { if config_path_c != path_c {
debug!("did not match {:?} {:?}", config_path_c, path_c); debug!("did not match {:?} {:?}", config_path_c, path_c);
continue; continue;

15
daemon/src/lib.rs Normal file
View file

@ -0,0 +1,15 @@
#[macro_use]
extern crate serde;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
extern crate derivative;
pub mod config;
pub mod mail;
use sqlx::migrate::Migrator;
static MIGRATOR: Migrator = sqlx::migrate!();

View file

@ -68,18 +68,23 @@ pub async fn sync_main(
debug!("folder: {:?}", folder); debug!("folder: {:?}", folder);
let select = authed.select("INBOX").await?; let select = authed.select("INBOX").await?;
debug!("select response: {:?}", select); debug!("select response: {:?}", select);
if let (Some(_exists), Some(_uidvalidity)) = (select.exists, select.uid_validity) { if let (Some(_exists), Some(_uidvalidity)) =
(select.exists, select.uid_validity)
{
// figure out which uids don't exist locally yet // figure out which uids don't exist locally yet
let new_uids = vec![]; let new_uids = vec![];
// let new_uids = stream::iter(1..exists).map(Ok).try_filter_map(|uid| { // let new_uids =
// stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
// todo!() // todo!()
// // mail_store.try_identify_email(&acct_name, &folder, uid, // // mail_store.try_identify_email(&acct_name, &folder,
// uidvalidity, None) // // invert the option to // uid, uidvalidity, None) // //
// only select uids that haven't been downloaded // // invert the option to only select uids that
// haven't been downloaded //
// .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
// // .map_err(|err| err.context("error checking if the email is // // .map_err(|err| err.context("error checking if
// already downloaded [try_identify_email]")) // the email is already downloaded
// }).try_collect::<Vec<_>>().await?; // [try_identify_email]")) }).try_collect::
// <Vec<_>>().await?;
if !new_uids.is_empty() { if !new_uids.is_empty() {
debug!("fetching uids {:?}", new_uids); debug!("fetching uids {:?}", new_uids);
let _fetched = authed let _fetched = authed
@ -99,14 +104,15 @@ pub async fn sync_main(
tokio::time::sleep(std::time::Duration::from_secs(50)).await; tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// TODO: remove this later // TODO: remove this later
// continue; // continue;
// let's just select INBOX for now, maybe have a config for default mailbox // let's just select INBOX for now, maybe have a config for default
// later? // mailbox later?
debug!("selecting the INBOX mailbox"); debug!("selecting the INBOX mailbox");
let select = authed.select("INBOX").await?; let select = authed.select("INBOX").await?;
debug!("select result: {:?}", select); debug!("select result: {:?}", select);
loop { loop {
let message_uids = authed.uid_search().await?; let message_uids = authed.uid_search().await?;
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>(); let message_uids =
message_uids.into_iter().take(30).collect::<Vec<_>>();
// let _ = mail2ui_tx.send(MailEvent::MessageUids( // let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(), // acct_name.clone(),
// message_uids.clone(), // message_uids.clone(),
@ -133,7 +139,10 @@ pub async fn sync_main(
debug!("got an event: {:?}", evt); debug!("got an event: {:?}", evt);
match evt { match evt {
Response::MailboxData(MailboxData::Exists(uid)) => { Response::MailboxData(MailboxData::Exists(uid)) => {
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); debug!(
"NEW MESSAGE WITH UID {:?}, droping everything",
uid
);
// send DONE to stop the idle // send DONE to stop the idle
std::mem::drop(idle_stream); std::mem::drop(idle_stream);
// let handle = Notification::new() // let handle = Notification::new()
@ -143,18 +152,23 @@ pub async fn sync_main(
// .timeout(Timeout::Milliseconds(6000)) // .timeout(Timeout::Milliseconds(6000))
// .show()?; // .show()?;
let message_uids = authed.uid_search().await?; let message_uids = authed.uid_search().await?;
let message_uids = let message_uids = message_uids
message_uids.into_iter().take(20).collect::<Vec<_>>(); .into_iter()
.take(20)
.collect::<Vec<_>>();
// let _ = mail2ui_tx.send(MailEvent::MessageUids( // let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(), // acct_name.clone(),
// message_uids.clone(), // message_uids.clone(),
// )); // ));
// TODO: make this happen concurrently with the main loop? // TODO: make this happen concurrently with the main
// loop?
let mut message_list = authed let mut message_list = authed
.uid_fetch(&message_uids, &[], FetchItems::All) .uid_fetch(&message_uids, &[], FetchItems::All)
.await .await
.unwrap(); .unwrap();
while let Some((_uid, _attrs)) = message_list.next().await { while let Some((_uid, _attrs)) =
message_list.next().await
{
// let evt = MailEvent::UpdateUid(acct_name. // let evt = MailEvent::UpdateUid(acct_name.
// clone(), uid, attrs); // clone(), uid, attrs);
// debug!("sent {:?}", evt); // debug!("sent {:?}", evt);
@ -168,7 +182,8 @@ pub async fn sync_main(
} }
} else { } else {
loop { loop {
tokio::time::sleep(std::time::Duration::from_secs(20)).await; tokio::time::sleep(std::time::Duration::from_secs(20))
.await;
debug!("heartbeat"); debug!("heartbeat");
} }
} }
@ -177,8 +192,8 @@ pub async fn sync_main(
} }
} }
// wait a bit so we're not hitting the server really fast if the fail happens // wait a bit so we're not hitting the server really fast if the fail
// early on // happens early on
// //
// TODO: some kind of smart exponential backoff that considers some time // TODO: some kind of smart exponential backoff that considers some time
// threshold to be a failing case? // threshold to be a failing case?

View file

@ -1 +1,20 @@
pub struct MailStore; use anyhow::Result;
use sqlx::sqlite::{SqlitePool, SqlitePoolOptions};
use crate::MIGRATOR;
pub struct MailStore {
pool: SqlitePool,
}
impl MailStore {
/// Creates a new connection to a SQLite database.
pub async fn open(uri: impl AsRef<str>) -> Result<Self> {
let pool = SqlitePoolOptions::new().connect(uri.as_ref()).await?;
// run migrations, if available
MIGRATOR.run(&pool).await?;
Ok(MailStore { pool })
}
}

View file

@ -1,16 +1,7 @@
#[macro_use] #[macro_use]
extern crate serde;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
#[macro_use]
extern crate derivative;
mod config;
mod mail;
use anyhow::Result; use anyhow::Result;
use clap::Clap; use clap::Clap;
@ -19,11 +10,10 @@ use futures::future::{
Either::{Left, Right}, Either::{Left, Right},
FutureExt, FutureExt,
}; };
use panorama_daemon::config::{self, Config, MailAccountConfig, TlsMethod};
use panorama_imap::client::ConfigBuilder; use panorama_imap::client::ConfigBuilder;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use crate::config::{Config, MailAccountConfig, TlsMethod};
type ExitListener = oneshot::Receiver<()>; type ExitListener = oneshot::Receiver<()>;
/// The panorama daemon runs in the background and communicates with other /// The panorama daemon runs in the background and communicates with other
@ -55,7 +45,8 @@ async fn main() -> Result<()> {
let new_config = config_watcher.borrow().clone(); let new_config = config_watcher.borrow().clone();
tokio::spawn(run_with_config(new_config, exit_rx)); tokio::spawn(run_with_config(new_config, exit_rx));
// wait till the config has changed, then tell the current thread to stop // wait till the config has changed, then tell the current thread to
// stop
config_watcher.changed().await?; config_watcher.changed().await?;
let _ = exit_tx.send(()); let _ = exit_tx.send(());
} }
@ -79,6 +70,10 @@ async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> {
Ok(()) Ok(())
} }
/// The main loop for a single mail account.
///
/// This loop is restarted each time the config watcher gets a new config, at
/// which point `exit` is sent a single value telling us to break the loop.
async fn run_single_mail_account( async fn run_single_mail_account(
account_name: String, account_name: String,
account: MailAccountConfig, account: MailAccountConfig,
@ -108,6 +103,7 @@ async fn run_single_mail_account(
loop { loop {
select! { select! {
// we're being told to exit the loop
_ = exit => break, _ = exit => break,
} }
} }

View file

@ -22,7 +22,7 @@ rfc6154 = [] # list
fuzzing = ["arbitrary", "panorama-proto-common/fuzzing"] fuzzing = ["arbitrary", "panorama-proto-common/fuzzing"]
[dependencies] [dependencies]
anyhow = "1.0.42" anyhow = { version = "1.0.42", features = ["backtrace"] }
async-trait = "0.1.51" async-trait = "0.1.51"
bitflags = "1.2.1" bitflags = "1.2.1"
bytes = "1.0.1" bytes = "1.0.1"

View file

@ -6,8 +6,14 @@ use panorama_proto_common::Bytes;
use crate::client::inner::Inner; use crate::client::inner::Inner;
use crate::proto::command::{Command, CommandLogin}; use crate::proto::command::{Command, CommandLogin};
pub trait Client: AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static {} pub trait Client:
impl<C> Client for C where C: Send + Sync + Unpin + AsyncWrite + AsyncRead + 'static {} AsyncRead + AsyncWrite + Unpin + Sync + Send + 'static
{
}
impl<C> Client for C where
C: Send + Sync + Unpin + AsyncWrite + AsyncRead + 'static
{
}
#[async_trait] #[async_trait]
pub trait AuthMethod { pub trait AuthMethod {

View file

@ -13,12 +13,12 @@ use tokio_rustls::client::TlsStream;
use crate::proto::{ use crate::proto::{
command::{ command::{
Command, CommandFetch, CommandList, CommandSearch, CommandSelect, FetchItems, Command, CommandFetch, CommandList, CommandSearch, CommandSelect,
SearchCriteria, Sequence, FetchItems, SearchCriteria, Sequence,
}, },
response::{ response::{
Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute, Response, Condition, Flag, Mailbox, MailboxData, MailboxList, MessageAttribute,
ResponseCode, Status, Response, ResponseCode, Status,
}, },
}; };
@ -98,7 +98,10 @@ impl ClientUnauthenticated {
} }
} }
pub async fn auth(self, auth: impl AuthMethod) -> Result<ClientAuthenticated> { pub async fn auth(
self,
auth: impl AuthMethod,
) -> Result<ClientAuthenticated> {
match self { match self {
// this is a no-op, we don't need to upgrade // this is a no-op, we don't need to upgrade
ClientUnauthenticated::Encrypted(mut inner) => { ClientUnauthenticated::Encrypted(mut inner) => {
@ -148,7 +151,11 @@ impl ClientAuthenticated {
let mut folders = Vec::new(); let mut folders = Vec::new();
for resp in data { for resp in data {
if let Response::MailboxData(MailboxData::List(MailboxList { mailbox, .. })) = resp { if let Response::MailboxData(MailboxData::List(MailboxList {
mailbox,
..
})) = resp
{
folders.push(mailbox); folders.push(mailbox);
} }
} }
@ -157,7 +164,10 @@ impl ClientAuthenticated {
} }
/// Runs the SELECT command /// Runs the SELECT command
pub async fn select(&mut self, mailbox: impl AsRef<str>) -> Result<SelectResponse> { pub async fn select(
&mut self,
mailbox: impl AsRef<str>,
) -> Result<SelectResponse> {
let cmd = Command::Select(CommandSelect { let cmd = Command::Select(CommandSelect {
mailbox: Bytes::from(mailbox.as_ref().to_owned()), mailbox: Bytes::from(mailbox.as_ref().to_owned()),
}); });
@ -168,9 +178,15 @@ impl ClientAuthenticated {
let mut select = SelectResponse::default(); let mut select = SelectResponse::default();
for resp in data { for resp in data {
match resp { match resp {
Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags, Response::MailboxData(MailboxData::Flags(flags)) => {
Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists), select.flags = flags
Response::MailboxData(MailboxData::Recent(recent)) => select.recent = Some(recent), }
Response::MailboxData(MailboxData::Exists(exists)) => {
select.exists = Some(exists)
}
Response::MailboxData(MailboxData::Recent(recent)) => {
select.recent = Some(recent)
}
Response::Tagged( Response::Tagged(
_, _,
Condition { Condition {
@ -185,8 +201,12 @@ impl ClientAuthenticated {
.. ..
}) => match code { }) => match code {
ResponseCode::Unseen(value) => select.unseen = Some(value), ResponseCode::Unseen(value) => select.unseen = Some(value),
ResponseCode::UidNext(value) => select.uid_next = Some(value), ResponseCode::UidNext(value) => {
ResponseCode::UidValidity(value) => select.uid_validity = Some(value), select.uid_next = Some(value)
}
ResponseCode::UidValidity(value) => {
select.uid_validity = Some(value)
}
_ => {} _ => {}
}, },
_ => warn!("unknown response {:?}", resp), _ => warn!("unknown response {:?}", resp),
@ -230,7 +250,9 @@ impl ClientAuthenticated {
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?; // let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp { Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), Response::Fetch(n, attrs) => {
future::ready(Some((n, attrs))).boxed()
}
Response::Done(_) => future::ready(None).boxed(), Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(), _ => future::pending().boxed(),
})) }))
@ -255,7 +277,9 @@ impl ClientAuthenticated {
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?; // let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp { Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), Response::Fetch(n, attrs) => {
future::ready(Some((n, attrs))).boxed()
}
Response::Done(_) => future::ready(None).boxed(), Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(), _ => future::pending().boxed(),
})) }))
@ -305,7 +329,10 @@ impl Drop for IdleToken {
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
impl Stream for IdleToken { impl Stream for IdleToken {
type Item = Response; type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
let stream = Pin::new(&mut self.stream); let stream = Pin::new(&mut self.stream);
Stream::poll_next(stream, cx) Stream::poll_next(stream, cx)
} }

View file

@ -21,7 +21,10 @@ impl<'a> Decoder for ImapCodec {
type Item = Response; type Item = Response;
type Error = io::Error; type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> { fn decode(
&mut self,
buf: &mut BytesMut,
) -> Result<Option<Self::Item>, io::Error> {
use nom::Err; use nom::Err;
if self.decode_need_message_bytes > buf.len() { if self.decode_need_message_bytes > buf.len() {
@ -31,22 +34,23 @@ impl<'a> Decoder for ImapCodec {
// this is a pretty hot mess so here's my best attempt at explaining // this is a pretty hot mess so here's my best attempt at explaining
// buf, or buf1, is the original message // buf, or buf1, is the original message
// "split" mutably removes all the bytes from the self, and returns a new // "split" mutably removes all the bytes from the self, and returns a
// BytesMut with the contents. so buf2 now has all the original contents // new BytesMut with the contents. so buf2 now has all the
// and buf1 is now empty // original contents and buf1 is now empty
let buf2 = buf.split(); let buf2 = buf.split();
// now we're going to clone buf2 here, calling "freeze" turns the BytesMut // now we're going to clone buf2 here, calling "freeze" turns the
// back into Bytes so we can manipulate it. remember, none of this should be // BytesMut back into Bytes so we can manipulate it. remember,
// actually copying anything // none of this should be actually copying anything
let buf3 = buf2.clone().freeze(); let buf3 = buf2.clone().freeze();
debug!("going to parse a response since buffer len: {}", buf3.len()); debug!("going to parse a response since buffer len: {}", buf3.len());
// trace!("buf: {:?}", buf3); // trace!("buf: {:?}", buf3);
// we don't know how long the message is going to be yet, so parse it out of the // we don't know how long the message is going to be yet, so parse it
// Bytes right now, and since the buffer is being consumed, subtracting the // out of the Bytes right now, and since the buffer is being
// remainder of the string from the original total (buf4_len) will tell us how // consumed, subtracting the remainder of the string from the
// long the payload was. this also avoids unnecessary cloning // original total (buf4_len) will tell us how long the payload
// was. this also avoids unnecessary cloning
let buf4: Bytes = buf3.clone().into(); let buf4: Bytes = buf3.clone().into();
let buf4_len = buf4.len(); let buf4_len = buf4.len();
let (response, len) = match parse_response(buf4) { let (response, len) = match parse_response(buf4) {
@ -74,7 +78,8 @@ impl<'a> Decoder for ImapCodec {
}; };
info!("success, parsed as {:?}", response); info!("success, parsed as {:?}", response);
// "unsplit" is the opposite of split, we're getting back the original data here // "unsplit" is the opposite of split, we're getting back the original
// data here
buf.unsplit(buf2); buf.unsplit(buf2);
// and then move to after the message we just parsed // and then move to after the message we just parsed
@ -94,7 +99,11 @@ pub struct TaggedCommand(pub Tag, pub Command);
impl<'a> Encoder<&'a TaggedCommand> for ImapCodec { impl<'a> Encoder<&'a TaggedCommand> for ImapCodec {
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, tagged_cmd: &TaggedCommand, dst: &mut BytesMut) -> Result<(), io::Error> { fn encode(
&mut self,
tagged_cmd: &TaggedCommand,
dst: &mut BytesMut,
) -> Result<(), io::Error> {
let tag = &*tagged_cmd.0 .0; let tag = &*tagged_cmd.0 .0;
let command = &tagged_cmd.1; let command = &tagged_cmd.1;

View file

@ -5,11 +5,12 @@
use tokio_rustls::{ use tokio_rustls::{
rustls::{ rustls::{
Certificate, OwnedTrustAnchor, RootCertStore, ServerCertVerified, ServerCertVerifier, Certificate, OwnedTrustAnchor, RootCertStore, ServerCertVerified,
TLSError, ServerCertVerifier, TLSError,
}, },
webpki::{ webpki::{
self, DNSNameRef, EndEntityCert, SignatureAlgorithm, TLSServerTrustAnchors, TrustAnchor, self, DNSNameRef, EndEntityCert, SignatureAlgorithm,
TLSServerTrustAnchors, TrustAnchor,
}, },
}; };
@ -75,7 +76,8 @@ impl ServerCertVerifier for ConfigurableCertVerifier {
} }
} }
type CertChainAndRoots<'a, 'b> = (EndEntityCert<'a>, Vec<&'a [u8]>, Vec<TrustAnchor<'b>>); type CertChainAndRoots<'a, 'b> =
(EndEntityCert<'a>, Vec<&'a [u8]>, Vec<TrustAnchor<'b>>);
fn prepare<'a, 'b>( fn prepare<'a, 'b>(
roots: &'b RootCertStore, roots: &'b RootCertStore,
@ -86,7 +88,8 @@ fn prepare<'a, 'b>(
} }
// EE cert must appear first. // EE cert must appear first.
let cert = EndEntityCert::from(&presented_certs[0].0).map_err(TLSError::WebPKIError)?; let cert = EndEntityCert::from(&presented_certs[0].0)
.map_err(TLSError::WebPKIError)?;
let chain: Vec<&'a [u8]> = presented_certs let chain: Vec<&'a [u8]> = presented_certs
.iter() .iter()

View file

@ -11,7 +11,10 @@ use futures::{
}; };
use panorama_proto_common::Bytes; use panorama_proto_common::Bytes;
use tokio::{ use tokio::{
io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf}, io::{
split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf,
WriteHalf,
},
sync::{mpsc, oneshot, RwLock}, sync::{mpsc, oneshot, RwLock},
task::JoinHandle, task::JoinHandle,
}; };
@ -90,7 +93,8 @@ where
// spawn the client->server loop // spawn the client->server loop
let (write_exit, exit_rx) = oneshot::channel(); let (write_exit, exit_rx) = oneshot::channel();
let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx)); let write_handle =
tokio::spawn(write_loop(write_half, exit_rx, write_rx));
let tag_number = AtomicU32::new(0); let tag_number = AtomicU32::new(0);
let capabilities = Arc::new(RwLock::new(None)); let capabilities = Arc::new(RwLock::new(None));
@ -108,7 +112,10 @@ where
}) })
} }
pub async fn execute(&mut self, command: Command) -> Result<ResponseStream> { pub async fn execute(
&mut self,
command: Command,
) -> Result<ResponseStream> {
let id = self.tag_number.fetch_add(1, Ordering::SeqCst); let id = self.tag_number.fetch_add(1, Ordering::SeqCst);
let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id))); let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id)));
@ -123,7 +130,10 @@ where
Ok(stream) Ok(stream)
} }
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> { pub async fn has_capability(
&mut self,
cap: impl AsRef<str>,
) -> Result<bool> {
let cap_bytes = cap.as_ref().as_bytes().to_vec(); let cap_bytes = cap.as_ref().as_bytes().to_vec();
let (_, cap) = parse_capability(Bytes::from(cap_bytes))?; let (_, cap) = parse_capability(Bytes::from(cap_bytes))?;
@ -215,8 +225,8 @@ where
// only listen for a new command if there isn't one already // only listen for a new command if there isn't one already
let mut cmd_fut = if let Some(ref cmd) = curr_cmd { let mut cmd_fut = if let Some(ref cmd) = curr_cmd {
debug!("current command: {:?}", cmd); debug!("current command: {:?}", cmd);
// if there is one, just make a future that never resolves so it'll always pick // if there is one, just make a future that never resolves so it'll
// the other options in the select. // always pick the other options in the select.
future::pending().boxed().fuse() future::pending().boxed().fuse()
} else { } else {
command_rx.recv().boxed().fuse() command_rx.recv().boxed().fuse()

View file

@ -24,7 +24,9 @@ mod codec;
mod inner; mod inner;
mod tls; mod tls;
pub use self::client::{ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder}; pub use self::client::{
ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder,
};
pub use self::codec::{ImapCodec, TaggedCommand}; pub use self::codec::{ImapCodec, TaggedCommand};
#[cfg(feature = "low-level")] #[cfg(feature = "low-level")]

View file

@ -25,7 +25,9 @@ impl ResponseStream {
/// Waits for the entire stream to finish, returning the DONE status and the /// Waits for the entire stream to finish, returning the DONE status and the
/// stream /// stream
pub async fn wait(mut self) -> Result<(Option<ResponseDone>, Vec<Response>)> { pub async fn wait(
mut self,
) -> Result<(Option<ResponseDone>, Vec<Response>)> {
let mut done = None; let mut done = None;
let mut vec = Vec::new(); let mut vec = Vec::new();
while let Some(resp) = self.inner.recv().await { while let Some(resp) = self.inner.recv().await {
@ -42,7 +44,10 @@ impl ResponseStream {
impl Stream for ResponseStream { impl Stream for ResponseStream {
type Item = Response; type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx) self.inner.poll_recv(cx)
} }
} }

View file

@ -3,11 +3,15 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::{ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, client::TlsStream, rustls::ClientConfig as RustlsConfig,
webpki::DNSNameRef, TlsConnector,
}; };
/// Wraps the given async stream in TLS with the given hostname (required) /// Wraps the given async stream in TLS with the given hostname (required)
pub async fn wrap_tls<C>(c: C, hostname: impl AsRef<str>) -> Result<TlsStream<C>> pub async fn wrap_tls<C>(
c: C,
hostname: impl AsRef<str>,
) -> Result<TlsStream<C>>
where where
C: AsyncRead + AsyncWrite + Unpin, C: AsyncRead + AsyncWrite + Unpin,
{ {

View file

@ -79,7 +79,9 @@ impl DisplayBytes for Command {
quote(&list.mailbox) quote(&list.mailbox)
) )
} }
Command::Select(select) => write_bytes!(w, b"SELECT {}", quote(&select.mailbox)), Command::Select(select) => {
write_bytes!(w, b"SELECT {}", quote(&select.mailbox))
}
// selected // selected
Command::UidFetch(fetch) => write_bytes!(w, b"UID FETCH {}", fetch), Command::UidFetch(fetch) => write_bytes!(w, b"UID FETCH {}", fetch),

View file

@ -17,7 +17,9 @@ pub type Atom = Bytes;
pub struct Tag(pub Bytes); pub struct Tag(pub Bytes);
impl DisplayBytes for Tag { impl DisplayBytes for Tag {
fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { write_bytes!(w, b"{}", self.0) } fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> {
write_bytes!(w, b"{}", self.0)
}
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
@ -84,7 +86,9 @@ impl DisplayBytes for Response {
} }
Response::Expunge(n) => write_bytes!(w, b"{} EXPUNGE\r\n", n), Response::Expunge(n) => write_bytes!(w, b"{} EXPUNGE\r\n", n),
Response::Fatal(cond) => write_bytes!(w, b"* {}\r\n", cond), Response::Fatal(cond) => write_bytes!(w, b"* {}\r\n", cond),
Response::Tagged(tag, cond) => write_bytes!(w, b"{} {}\r\n", tag, cond), Response::Tagged(tag, cond) => {
write_bytes!(w, b"{} {}\r\n", tag, cond)
}
_ => todo!(), _ => todo!(),
} }
} }

View file

@ -18,12 +18,13 @@ use panorama_proto_common::{
}; };
use super::response::{ use super::response::{
Address, Atom, Capability, Condition, Envelope, Flag, Mailbox, MailboxData, MailboxList, Address, Atom, Capability, Condition, Envelope, Flag, Mailbox, MailboxData,
MailboxListFlag, MessageAttribute, Response, ResponseCode, ResponseText, Status, Tag, MailboxList, MailboxListFlag, MessageAttribute, Response, ResponseCode,
Timestamp, ResponseText, Status, Tag, Timestamp,
}; };
use super::rfc2234::{ use super::rfc2234::{
is_char, is_cr, is_ctl, is_digit, is_dquote, is_lf, is_sp, CRLF, DIGIT, DQUOTE, SP, is_char, is_cr, is_ctl, is_digit, is_dquote, is_lf, is_sp, CRLF, DIGIT,
DQUOTE, SP,
}; };
/// Grammar rule `T / nil` produces `Option<T>` /// Grammar rule `T / nil` produces `Option<T>`
@ -50,7 +51,9 @@ rule!(pub addr_name : Option<Bytes> => nstring);
rule!(pub astring : Bytes => alt((take_while1(is_astring_char), string))); rule!(pub astring : Bytes => alt((take_while1(is_astring_char), string)));
pub(crate) fn is_astring_char(c: u8) -> bool { is_atom_char(c) || is_resp_specials(c) } pub(crate) fn is_astring_char(c: u8) -> bool {
is_atom_char(c) || is_resp_specials(c)
}
rule!(pub ASTRING_CHAR : u8 => alt((ATOM_CHAR, resp_specials))); rule!(pub ASTRING_CHAR : u8 => alt((ATOM_CHAR, resp_specials)));
// really odd behavior about take_while1 is that if there isn't a character // really odd behavior about take_while1 is that if there isn't a character
@ -212,7 +215,8 @@ rule!(pub list_wildcards : u8 => satisfy(is_list_wildcards));
// determined to exceed a certain threshold so we don't have insane amounts of // determined to exceed a certain threshold so we don't have insane amounts of
// data in memory // data in memory
pub fn literal(i: Bytes) -> VResult<Bytes, Bytes> { pub fn literal(i: Bytes) -> VResult<Bytes, Bytes> {
let mut length_of = terminated(delimited(byte(b'{'), number, byte(b'}')), CRLF); let mut length_of =
terminated(delimited(byte(b'{'), number, byte(b'}')), CRLF);
let (i, length) = length_of(i)?; let (i, length) = length_of(i)?;
debug!("length is: {:?}", length); debug!("length is: {:?}", length);
map(take(length), Bytes::from)(i) map(take(length), Bytes::from)(i)
@ -373,7 +377,9 @@ rule!(pub tag : Tag => map(take_while1(is_tag_char), Tag));
rule!(pub text : Bytes => map(take_while1(is_text_char), Bytes::from)); rule!(pub text : Bytes => map(take_while1(is_text_char), Bytes::from));
pub(crate) fn is_text_char(c: u8) -> bool { is_char(c) && !is_cr(c) && !is_lf(c) } pub(crate) fn is_text_char(c: u8) -> bool {
is_char(c) && !is_cr(c) && !is_lf(c)
}
rule!(pub TEXT_CHAR : u8 => satisfy(is_text_char)); rule!(pub TEXT_CHAR : u8 => satisfy(is_text_char));
rule!(pub time : NaiveTime => map_res( rule!(pub time : NaiveTime => map_res(

View file

@ -66,7 +66,9 @@ fn test_capabilities() {
); );
assert_eq!( assert_eq!(
capability_data(Bytes::from(b"CAPABILITY UNSELECT IMAP4rev1 NAMESPACE\r\n")) capability_data(Bytes::from(
b"CAPABILITY UNSELECT IMAP4rev1 NAMESPACE\r\n"
))
.unwrap() .unwrap()
.1, .1,
vec![ vec![

View file

@ -12,7 +12,7 @@ readme = "README.md"
workspace = ".." workspace = ".."
[dependencies] [dependencies]
anyhow = "1.0.42" anyhow = { version = "1.0.42", features = ["backtrace"] }
bitflags = "1.2.1" bitflags = "1.2.1"
clap = "3.0.0-beta.2" clap = "3.0.0-beta.2"
derivative = "2.2.0" derivative = "2.2.0"

View file

@ -152,12 +152,16 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
}, },
"subfolders" => match current_section.as_mut() { "subfolders" => match current_section.as_mut() {
Some(Section::MaildirStore(ref mut builder)) => { Some(Section::MaildirStore(ref mut builder)) => {
builder.subfolders(match parts[1].to_lowercase().as_str() { builder.subfolders(
match parts[1].to_lowercase().as_str() {
"verbatim" => MaildirSubfolderStyle::Verbatim, "verbatim" => MaildirSubfolderStyle::Verbatim,
"maildir++" => MaildirSubfolderStyle::Maildirpp, "maildir++" => MaildirSubfolderStyle::Maildirpp,
"legacy" => MaildirSubfolderStyle::Legacy, "legacy" => MaildirSubfolderStyle::Legacy,
unknown => panic!("unknown subfolder style '{}'", unknown), unknown => {
}); panic!("unknown subfolder style '{}'", unknown)
}
},
);
} }
_ => panic!("unexpected subfolders keyword"), _ => panic!("unexpected subfolders keyword"),
}, },
@ -187,7 +191,9 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
}, },
"sync" => match current_section.as_mut() { "sync" => match current_section.as_mut() {
Some(Section::Channel(ref mut builder)) => { Some(Section::Channel(ref mut builder)) => {
builder.sync(parts[1..].iter().fold(ChannelSyncOps::empty(), |a, b| { builder.sync(parts[1..].iter().fold(
ChannelSyncOps::empty(),
|a, b| {
a | match b.to_lowercase().as_str() { a | match b.to_lowercase().as_str() {
"none" => ChannelSyncOps::empty(), "none" => ChannelSyncOps::empty(),
"pull" => ChannelSyncOps::PULL, "pull" => ChannelSyncOps::PULL,
@ -197,9 +203,12 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
"delete" => ChannelSyncOps::DELETE, "delete" => ChannelSyncOps::DELETE,
"flags" => ChannelSyncOps::FLAGS, "flags" => ChannelSyncOps::FLAGS,
"all" => ChannelSyncOps::all(), "all" => ChannelSyncOps::all(),
unknown => panic!("unknown sync op '{}'", unknown), unknown => {
panic!("unknown sync op '{}'", unknown)
} }
})); }
},
));
} }
_ => panic!("unexpected near keyword"), _ => panic!("unexpected near keyword"),
}, },

View file

@ -43,7 +43,9 @@ pub trait IStore {
async fn delete_mailbox(&mut self) -> Result<()> { todo!() } async fn delete_mailbox(&mut self) -> Result<()> { todo!() }
async fn prepare_load_mailbox(&mut self, opts: u32) -> Result<()> { todo!() } async fn prepare_load_mailbox(&mut self, opts: u32) -> Result<()> {
todo!()
}
async fn close_mailbox(&mut self) -> Result<()> { todo!() } async fn close_mailbox(&mut self) -> Result<()> { todo!() }
@ -112,7 +114,8 @@ impl IStore for ImapStore {
use futures::stream::StreamExt; use futures::stream::StreamExt;
use panorama_imap::proto::command::FetchItems; use panorama_imap::proto::command::FetchItems;
let mut result = client.uid_fetch(&[8225], &[], FetchItems::All).await?; let mut result =
client.uid_fetch(&[8225], &[], FetchItems::All).await?;
while let Some(item) = result.next().await { while let Some(item) = result.next().await {
println!("epic: {:?}", item); println!("epic: {:?}", item);
} }
@ -128,7 +131,9 @@ pub struct MaildirStore {
} }
impl MaildirStore { impl MaildirStore {
pub fn new(config: MaildirStoreConfig) -> Box<dyn IStore> { Box::new(MaildirStore { config }) } pub fn new(config: MaildirStoreConfig) -> Box<dyn IStore> {
Box::new(MaildirStore { config })
}
} }
#[async_trait] #[async_trait]

View file

@ -13,7 +13,7 @@ default = []
fuzzing = ["arbitrary"] fuzzing = ["arbitrary"]
[dependencies] [dependencies]
anyhow = "1.0.42" anyhow = { version = "1.0.42", features = ["backtrace"] }
bstr = "0.2.15" bstr = "0.2.15"
bytes = "1.0.1" bytes = "1.0.1"
format-bytes = "0.2.2" format-bytes = "0.2.2"

View file

@ -39,12 +39,16 @@ impl Bytes {
} }
impl DisplayBytes for Bytes { impl DisplayBytes for Bytes {
fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> { w.write(&*self.0).map(|_| ()) } fn display_bytes(&self, w: &mut dyn Write) -> io::Result<()> {
w.write(&*self.0).map(|_| ())
}
} }
static CHARS: &[u8] = b"0123456789abcdef"; static CHARS: &[u8] = b"0123456789abcdef";
impl HexDisplay for Bytes { impl HexDisplay for Bytes {
fn to_hex(&self, chunk_size: usize) -> String { self.to_hex_from(chunk_size, 0) } fn to_hex(&self, chunk_size: usize) -> String {
self.to_hex_from(chunk_size, 0)
}
fn to_hex_from(&self, chunk_size: usize, from: usize) -> String { fn to_hex_from(&self, chunk_size: usize, from: usize) -> String {
let mut v = Vec::with_capacity(self.len() * 3); let mut v = Vec::with_capacity(self.len() * 3);
@ -119,7 +123,10 @@ pub trait ShitNeededForParsing: Sized {
fn take_split(&self, count: usize) -> (Self, Self); fn take_split(&self, count: usize) -> (Self, Self);
// InputTakeAtPosition // InputTakeAtPosition
fn split_at_position<P, E: ParseError<Self>>(&self, predicate: P) -> IResult<Self, Self, E> fn split_at_position<P, E: ParseError<Self>>(
&self,
predicate: P,
) -> IResult<Self, Self, E>
where where
P: Fn(Self::Item) -> bool; P: Fn(Self::Item) -> bool;
fn split_at_position1<P, E: ParseError<Self>>( fn split_at_position1<P, E: ParseError<Self>>(
@ -148,7 +155,9 @@ impl ShitNeededForParsing for Bytes {
type Item = u8; type Item = u8;
type Sliced = Bytes; type Sliced = Bytes;
fn slice<R: RangeBounds<usize>>(&self, range: R) -> Self::Sliced { Self(self.0.slice(range)) } fn slice<R: RangeBounds<usize>>(&self, range: R) -> Self::Sliced {
Self(self.0.slice(range))
}
fn first(&self) -> Option<Self::Item> { self.0.first().copied() } fn first(&self) -> Option<Self::Item> { self.0.first().copied() }
fn slice_index(&self, count: usize) -> Result<usize, Needed> { fn slice_index(&self, count: usize) -> Result<usize, Needed> {
@ -168,7 +177,10 @@ impl ShitNeededForParsing for Bytes {
} }
// InputTakeAtPosition // InputTakeAtPosition
fn split_at_position<P, E: ParseError<Self>>(&self, predicate: P) -> IResult<Self, Self, E> fn split_at_position<P, E: ParseError<Self>>(
&self,
predicate: P,
) -> IResult<Self, Self, E>
where where
P: Fn(Self::Item) -> bool, P: Fn(Self::Item) -> bool,
{ {

View file

@ -11,7 +11,10 @@ use nom::{
use crate::VResult; use crate::VResult;
/// Same as nom's dbg_dmp, except operates on Bytes /// Same as nom's dbg_dmp, except operates on Bytes
pub fn dbg_dmp<'a, T, F, O>(mut f: F, context: &'static str) -> impl FnMut(T) -> VResult<T, O> pub fn dbg_dmp<'a, T, F, O>(
mut f: F,
context: &'static str,
) -> impl FnMut(T) -> VResult<T, O>
where where
F: FnMut(T) -> VResult<T, O>, F: FnMut(T) -> VResult<T, O>,
T: AsRef<[u8]> + HexDisplay + Clone + Debug + Deref<Target = [u8]>, T: AsRef<[u8]> + HexDisplay + Clone + Debug + Deref<Target = [u8]>,
@ -31,7 +34,10 @@ where
} }
/// Same as nom's convert_error, except operates on u8 /// Same as nom's convert_error, except operates on u8
pub fn convert_error<I: Deref<Target = [u8]> + Debug>(input: I, e: &VerboseError<I>) -> String { pub fn convert_error<I: Deref<Target = [u8]> + Debug>(
input: I,
e: &VerboseError<I>,
) -> String {
let mut result = String::new(); let mut result = String::new();
debug!("e: {:?}", e); debug!("e: {:?}", e);
@ -41,20 +47,29 @@ pub fn convert_error<I: Deref<Target = [u8]> + Debug>(input: I, e: &VerboseError
if input.is_empty() { if input.is_empty() {
match kind { match kind {
VerboseErrorKind::Char(c) => { VerboseErrorKind::Char(c) => {
write!(&mut result, "{}: expected '{}', got empty input\n\n", i, c) write!(
&mut result,
"{}: expected '{}', got empty input\n\n",
i, c
)
} }
VerboseErrorKind::Context(s) => { VerboseErrorKind::Context(s) => {
write!(&mut result, "{}: in {}, got empty input\n\n", i, s) write!(&mut result, "{}: in {}, got empty input\n\n", i, s)
} }
VerboseErrorKind::Nom(e) => { VerboseErrorKind::Nom(e) => {
write!(&mut result, "{}: in {:?}, got empty input\n\n", i, e) write!(
&mut result,
"{}: in {:?}, got empty input\n\n",
i, e
)
} }
} }
} else { } else {
let prefix = &input.as_bytes()[..offset]; let prefix = &input.as_bytes()[..offset];
// Count the number of newlines in the first `offset` bytes of input // Count the number of newlines in the first `offset` bytes of input
let line_number = prefix.iter().filter(|&&b| b == b'\n').count() + 1; let line_number =
prefix.iter().filter(|&&b| b == b'\n').count() + 1;
// Find the line that includes the subslice: // Find the line that includes the subslice:
// Find the *last* newline before the substring starts // Find the *last* newline before the substring starts
@ -72,7 +87,8 @@ pub fn convert_error<I: Deref<Target = [u8]> + Debug>(input: I, e: &VerboseError
.unwrap_or(&input[line_begin..]) .unwrap_or(&input[line_begin..])
.trim_end(); .trim_end();
// The (1-indexed) column number is the offset of our substring into that line // The (1-indexed) column number is the offset of our substring into
// that line
let column_number = line.offset(substring) + 1; let column_number = line.offset(substring) + 1;
match kind { match kind {

View file

@ -7,7 +7,11 @@
/// let quote = quote_string(b'\x22', b'\\', |c| c == b'\x22' || c == b'\x27'); /// let quote = quote_string(b'\x22', b'\\', |c| c == b'\x22' || c == b'\x27');
/// assert_eq!(quote(b"hello \"' world"), b"\"hello \\\"\\' world\""); /// assert_eq!(quote(b"hello \"' world"), b"\"hello \\\"\\' world\"");
/// ``` /// ```
pub fn quote_string<B, F>(quote: u8, escape: u8, should_escape: F) -> impl Fn(B) -> Vec<u8> pub fn quote_string<B, F>(
quote: u8,
escape: u8,
should_escape: F,
) -> impl Fn(B) -> Vec<u8>
where where
B: AsRef<[u8]>, B: AsRef<[u8]>,
F: Fn(u8) -> bool, F: Fn(u8) -> bool,

View file

@ -12,4 +12,6 @@ mod rule;
pub use crate::bytes::{Bytes, ShitCompare, ShitNeededForParsing}; pub use crate::bytes::{Bytes, ShitCompare, ShitNeededForParsing};
pub use crate::convert_error::{convert_error, dbg_dmp}; pub use crate::convert_error::{convert_error, dbg_dmp};
pub use crate::formatter::quote_string; pub use crate::formatter::quote_string;
pub use crate::parsers::{byte, never, parse_num, satisfy, skip, tagi, take, take_while1, VResult}; pub use crate::parsers::{
byte, never, parse_num, satisfy, skip, tagi, take, take_while1, VResult,
};

View file

@ -177,7 +177,9 @@ where
let snd = i.slice(tag_len..); let snd = i.slice(tag_len..);
Ok((snd, fst)) Ok((snd, fst))
} }
CompareResult::Incomplete => Err(Err::Incomplete(Needed::new(tag_len - i.input_len()))), CompareResult::Incomplete => {
Err(Err::Incomplete(Needed::new(tag_len - i.input_len())))
}
CompareResult::Error => { CompareResult::Error => {
let e: ErrorKind = ErrorKind::Tag; let e: ErrorKind = ErrorKind::Tag;
Err(Err::Error(E::from_error_kind(i, e))) Err(Err::Error(E::from_error_kind(i, e)))

View file

@ -1,3 +1,3 @@
fn_single_line = true fn_single_line = true
max_width = 100 max_width = 80
wrap_comments = true wrap_comments = true