had some trouble with capability matching IMAP4rev1 as an atom

This commit is contained in:
Michael Zhang 2021-08-23 00:01:05 -05:00
parent ff98685afb
commit b0c8423968
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
19 changed files with 219 additions and 56 deletions

75
Cargo.lock generated
View file

@ -2,6 +2,15 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "aho-corasick"
version = "0.7.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5"
dependencies = [
"memchr",
]
[[package]]
name = "anyhow"
version = "1.0.43"
@ -222,6 +231,19 @@ dependencies = [
"syn",
]
[[package]]
name = "env_logger"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
dependencies = [
"atty",
"humantime",
"log",
"regex",
"termcolor",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -427,6 +449,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440"
[[package]]
name = "humantime"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.11"
@ -555,19 +583,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "mbsync"
version = "0.1.0"
dependencies = [
"anyhow",
"bitflags",
"clap",
"derivative",
"derive_builder",
"panorama-imap",
"tokio",
]
[[package]]
name = "memchr"
version = "2.3.4"
@ -690,6 +705,7 @@ dependencies = [
"bitflags",
"bytes",
"chrono",
"derivative",
"derive_builder",
"format-bytes",
"futures",
@ -703,6 +719,22 @@ dependencies = [
"webpki-roots",
]
[[package]]
name = "panorama-mbsync"
version = "0.1.0"
dependencies = [
"anyhow",
"bitflags",
"clap",
"derivative",
"derive_builder",
"env_logger",
"log",
"panorama-imap",
"serde",
"tokio",
]
[[package]]
name = "panorama-proto-common"
version = "0.0.1"
@ -829,12 +861,29 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a26af418b574bd56588335b3a3659a65725d4e636eb1016c2f9e3b38c7cc759"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "ring"
version = "0.16.20"

View file

@ -25,6 +25,7 @@ async-trait = "0.1.51"
bitflags = "1.2.1"
bytes = "1.0.1"
chrono = "0.4.19"
derivative = "2.2.0"
derive_builder = "0.10.2"
format-bytes = "0.2.2"
futures = "0.3.16"

View file

@ -24,7 +24,7 @@ use crate::proto::{
use super::auth::AuthMethod;
use super::inner::Inner;
use super::response_stream::ResponseStream;
use super::upgrade::upgrade;
use super::tls::wrap_tls;
/// An IMAP client that hasn't been connected yet.
#[derive(Builder, Clone, Debug)]
@ -57,10 +57,12 @@ impl ConfigBuilder {
let hostname = config.hostname.as_ref();
let port = config.port;
trace!("connecting to {}:{}...", hostname, port);
let conn = TcpStream::connect((hostname, port)).await?;
trace!("connected.");
if config.tls {
let conn = upgrade(conn, hostname).await?;
let conn = wrap_tls(conn, hostname).await?;
let mut inner = Inner::new(conn, config).await?;
inner.wait_for_greeting().await?;

View file

@ -28,14 +28,31 @@ impl<'a> Decoder for ImapCodec {
return Ok(None);
}
// this is a pretty hot mess so here's my best attempt at explaining
// buf, or buf1, is the original message
// "split" mutably removes all the bytes from the self, and returns a new
// BytesMut with the contents. so buf2 now has all the original contents
// and buf1 is now empty
let buf2 = buf.split();
// now we're going to clone buf2 here, calling "freeze" turns the BytesMut
// back into Bytes so we can manipulate it. remember, none of this should be
// actually copying anything
let buf3 = buf2.clone().freeze();
debug!("going to parse a response since buffer len: {}", buf3.len());
// trace!("buf: {:?}", buf3);
// we don't know how long the message is going to be yet, so parse it out of the
// Bytes right now, and since the buffer is being consumed, subtracting the
// remainder of the string from the original total (buf4_len) will tell us how
// long the payload was. this also avoids unnecessary cloning
let buf4: Bytes = buf3.clone().into();
let buf4_len = buf4.len();
let (response, len) = match parse_response(buf4) {
Ok((remaining, response)) => (response, buf4_len - remaining.len()),
// the incomplete cases: set the decoded bytes and quit early
Err(nom::Err::Incomplete(Needed::Size(min))) => {
self.decode_need_message_bytes = min.get();
return Ok(None);
@ -43,6 +60,8 @@ impl<'a> Decoder for ImapCodec {
Err(nom::Err::Incomplete(_)) => {
return Ok(None);
}
// shit
Err(Err::Error(err)) | Err(Err::Failure(err)) => {
let buf4 = buf3.clone().into();
error!("failed to parse: {:?}", buf4);
@ -55,10 +74,14 @@ impl<'a> Decoder for ImapCodec {
};
info!("success, parsed as {:?}", response);
// "unsplit" is the opposite of split, we're getting back the original data here
buf.unsplit(buf2);
// and then move to after the message we just parsed
let _ = buf.split_to(len);
debug!("buf: {:?}", buf);
// since we're done parsing a complete message, set this to zero
self.decode_need_message_bytes = 0;
Ok(Some(response))
}

View file

@ -1,4 +1,8 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::collections::HashSet;
use std::sync::{
atomic::{AtomicU32, Ordering},
Arc,
};
use anyhow::Result;
use futures::{
@ -8,7 +12,7 @@ use futures::{
use panorama_proto_common::Bytes;
use tokio::{
io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter, ReadHalf, WriteHalf},
sync::{mpsc, oneshot},
sync::{mpsc, oneshot, RwLock},
task::JoinHandle,
};
use tokio_rustls::client::TlsStream;
@ -16,14 +20,14 @@ use tokio_util::codec::FramedRead;
use crate::proto::{
command::Command,
response::{Condition, Response, Status, Tag},
response::{Capability, Condition, Response, Status, Tag},
rfc3501::capability as parse_capability,
};
use super::client::Config;
use super::codec::{ImapCodec, TaggedCommand};
use super::response_stream::ResponseStream;
use super::upgrade::upgrade;
use super::tls::wrap_tls;
const TAG_PREFIX: &str = "panotag";
@ -47,6 +51,7 @@ pub struct Inner<C> {
_write_tx: mpsc::UnboundedSender<TaggedCommand>,
greeting_rx: Option<GreetingWaiter>,
capabilities: Arc<RwLock<Option<HashSet<Capability>>>>,
}
#[derive(Debug)]
@ -88,6 +93,7 @@ where
let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx));
let tag_number = AtomicU32::new(0);
let capabilities = Arc::new(RwLock::new(None));
Ok(Inner {
config,
tag_number,
@ -98,6 +104,7 @@ where
write_handle,
_write_tx: write_tx,
greeting_rx: Some(greeting_rx),
capabilities,
})
}
@ -117,27 +124,37 @@ where
}
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
// TODO: cache capabilities if needed?
let cap = cap.as_ref().to_owned();
let (_, cap) = parse_capability(Bytes::from(cap))?;
let cap_bytes = cap.as_ref().as_bytes().to_vec();
let (_, cap) = parse_capability(Bytes::from(cap_bytes))?;
let resp = self.execute(Command::Capability).await?;
let (_, data) = resp.wait().await?;
let contains = {
let read = self.capabilities.read().await;
if let Some(read) = &*read {
read.contains(&cap)
} else {
std::mem::drop(read);
for resp in data {
if let Response::Capabilities(caps) = resp {
return Ok(caps.contains(&cap));
}
// debug!("cap: {:?}", resp);
let cmd = self.execute(Command::Capability).await?;
let done = cmd.done().await?;
todo!("done: {:?}", done);
// todo!()
}
};
Ok(false)
Ok(contains)
}
pub async fn upgrade(mut self) -> Result<Inner<TlsStream<C>>> {
debug!("preparing to upgrade using STARTTLS");
// TODO: check that this capability exists??
// TODO: issue the STARTTLS command to the server
// check that this capability exists
// if it doesn't exist, then it's not an IMAP4-compliant server
if !self.has_capability("STARTTLS").await? {
bail!("Server does not have the STARTTLS capability");
}
// issue the STARTTLS command to the server
let resp = self.execute(Command::Starttls).await?;
dbg!(resp.wait().await?);
debug!("received OK from server");
@ -152,7 +169,7 @@ where
// put the read half and write half back together
let stream = read_half.unsplit(write_half);
let tls_stream = upgrade(stream, &self.config.hostname).await?;
let tls_stream = wrap_tls(stream, &self.config.hostname).await?;
Inner::new(tls_stream, self.config).await
}

View file

@ -22,7 +22,7 @@ pub mod response_stream;
mod client;
mod codec;
mod inner;
mod upgrade;
mod tls;
pub use self::client::{ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder};
pub use self::codec::{ImapCodec, TaggedCommand};

View file

@ -6,7 +6,8 @@ use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
};
pub async fn upgrade<C>(c: C, hostname: impl AsRef<str>) -> Result<TlsStream<C>>
/// Wraps the given async stream in TLS with the given hostname (required)
pub async fn wrap_tls<C>(c: C, hostname: impl AsRef<str>) -> Result<TlsStream<C>>
where
C: AsyncRead + AsyncWrite + Unpin,
{

View file

@ -5,6 +5,8 @@ extern crate async_trait;
#[macro_use]
extern crate derive_builder;
#[macro_use]
extern crate derivative;
#[macro_use]
extern crate format_bytes;
#[macro_use]
extern crate futures;

View file

@ -101,9 +101,12 @@ pub struct CommandList {
pub mailbox: Bytes,
}
#[derive(Clone, Debug)]
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct CommandLogin {
pub userid: Bytes,
#[derivative(Debug = "ignore")]
pub password: Bytes,
}

View file

@ -116,7 +116,7 @@ pub enum UidSetMember {
Uid(u32),
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum Capability {
Imap4rev1,
Auth(Atom),

View file

@ -46,6 +46,10 @@ rule!(pub astring : Bytes => alt((take_while1(is_astring_char), string)));
pub(crate) fn is_astring_char(c: u8) -> bool { is_atom_char(c) || is_resp_specials(c) }
rule!(pub ASTRING_CHAR : u8 => alt((ATOM_CHAR, resp_specials)));
// really odd behavior about take_while1 is that if there isn't a character that's
// not is_atom_char, then it's actually going to error out and require another character
// in case there's more. makes sense, just need to keep in mind that we need more
// content in order to satisfy this
rule!(pub atom : Bytes => take_while1(is_atom_char));
pub(crate) fn is_atom_char(c: u8) -> bool { is_char(c) && !is_atom_specials(c) }
@ -67,14 +71,14 @@ rule!(pub auth_type : Atom => atom);
rule!(pub capability : Capability => alt((
map(preceded(tagi(b"AUTH="), auth_type), Capability::Auth),
map(atom, Capability::Atom),
map(verify(atom, |s| &s[..] != b"IMAP4rev1"), Capability::Atom),
)));
rule!(pub capability_data : Vec<Capability> => preceded(tagi(b"CAPABILITY"), {
map(separated_pair(
many0(preceded(SP, capability)),
pair(SP, tagi(b"IMAP4rev1")),
many0(preceded(SP, capability))
many0(preceded(SP, capability)),
), |(mut a, b)| { a.extend(b); a })
}));

View file

@ -1,7 +1,11 @@
use panorama_proto_common::Bytes;
#![allow(unused_imports)]
use panorama_proto_common::*;
use nom::{sequence::*, multi::*};
use super::response::*;
use super::rfc3501::*;
use super::rfc2234::*;
#[test]
fn test_literal() {
@ -14,6 +18,17 @@ fn test_literal() {
);
}
#[test]
fn test_capabilities() {
assert_eq!(capability(Bytes::from(b"UNSELECT\r\n")).unwrap().1, Capability::Atom(Bytes::from(b"UNSELECT")));
// trivial case
assert_eq!(capability_data(Bytes::from(b"CAPABILITY IMAP4rev1\r\n")).unwrap().1, vec![]);
assert_eq!(capability_data(Bytes::from(b"CAPABILITY UNSELECT IMAP4rev1 NAMESPACE\r\n")).unwrap().1,
vec![Capability::Atom(Bytes::from(b"UNSELECT")), Capability::Atom(Bytes::from(b"NAMESPACE"))]);
}
#[test]
fn test_list() {
assert!(matches!(

View file

@ -1,7 +1,15 @@
[package]
name = "mbsync"
name = "panorama-mbsync"
version = "0.1.0"
edition = "2018"
description = "mbsync written using panorama's IMAP library"
authors = ["Michael Zhang <mail@mzhang.io>"]
keywords = ["imap", "email", "mbsync"]
license = "GPL-3.0-or-later"
categories = ["email"]
repository = "https://git.mzhang.io/michael/panorama"
readme = "README.md"
workspace = ".."
[dependencies]
anyhow = "1.0.42"
@ -10,4 +18,7 @@ clap = "3.0.0-beta.2"
derivative = "2.2.0"
derive_builder = "0.10.2"
panorama-imap = { path = "../imap" }
serde = { version = "1.0.127", features = ["derive"] }
tokio = { version = "1.9.0", features = ["full"] }
log = "0.4.14"
env_logger = "0.9.0"

2
mbsync/README.md Normal file
View file

@ -0,0 +1,2 @@
mbsync
===

View file

@ -14,6 +14,7 @@ pub fn read_from_file(path: impl AsRef<Path>) -> Result<Config> {
pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
let r = BufReader::new(r);
// store each element here
let mut accounts = HashMap::new();
let mut stores = HashMap::new();
let mut channels = HashMap::new();
@ -25,6 +26,9 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
Channel(ChannelBuilder),
}
let mut current_section = None;
// this is called whenever we encounter a new section
// it wraps up the current builder and adds it to the appropriate table
let finish_section = |accounts: &mut HashMap<_, _>,
stores: &mut HashMap<_, _>,
channels: &mut HashMap<_, _>,
@ -55,6 +59,8 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
for line in r.lines() {
let line = line?.trim().to_string();
// ignore empty lines and comments
if line.is_empty() || line.starts_with('#') {
continue;
}
@ -202,6 +208,7 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
}
}
// finish again at the end
finish_section(
&mut accounts,
&mut stores,
@ -218,6 +225,7 @@ pub fn read_from_reader<R: Read>(r: R) -> Result<Config> {
Ok(config)
}
/// Double check that all the names being referred to actually exist
fn check_config(config: &Config) -> Result<()> {
for store in config.stores.values() {
if let Store::Imap(store) = store {
@ -233,19 +241,19 @@ fn check_config(config: &Config) -> Result<()> {
Ok(())
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub struct Config {
pub accounts: HashMap<String, Account>,
pub stores: HashMap<String, Store>,
pub channels: HashMap<String, Channel>,
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub enum Account {
Imap(ImapAccount),
}
#[derive(Derivative, Builder)]
#[derive(Derivative, Builder, Serialize, Deserialize)]
#[derivative(Debug)]
pub struct ImapAccount {
pub name: String,
@ -261,20 +269,20 @@ pub struct ImapAccount {
pub pass: String,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SslType {
None,
Starttls,
Imaps,
}
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize)]
pub enum Store {
Maildir(MaildirStore),
Imap(ImapStore),
}
#[derive(Debug, Builder)]
#[derive(Debug, Builder, Serialize, Deserialize)]
pub struct MaildirStore {
pub name: String,
pub path: PathBuf,
@ -282,20 +290,20 @@ pub struct MaildirStore {
pub subfolders: MaildirSubfolderStyle,
}
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MaildirSubfolderStyle {
Verbatim,
Maildirpp,
Legacy,
}
#[derive(Debug, Builder)]
#[derive(Debug, Builder, Serialize, Deserialize)]
pub struct ImapStore {
pub name: String,
pub account: String,
}
#[derive(Debug, Builder)]
#[derive(Debug, Builder, Serialize, Deserialize)]
pub struct Channel {
pub name: String,
pub far: String,
@ -304,6 +312,7 @@ pub struct Channel {
}
bitflags! {
#[derive(Serialize, Deserialize)]
pub struct ChannelSyncOps: u32 {
const PULL = 1 << 1;
const PUSH = 1 << 2;

View file

@ -6,6 +6,10 @@ extern crate bitflags;
extern crate derivative;
#[macro_use]
extern crate derive_builder;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate log;
pub mod config;
pub mod store;

View file

@ -1,8 +1,11 @@
#[macro_use]
extern crate log;
use std::path::PathBuf;
use anyhow::Result;
use clap::Clap;
use mbsync::{
use panorama_mbsync::{
config::{self, ChannelSyncOps},
store,
};
@ -12,24 +15,30 @@ struct Opt {
/// The path to the config file (defaults to ~/.mbsyncrc).
#[clap(name = "config", long = "config", short = 'c')]
config_path: Option<PathBuf>,
/// Verbose mode (-v, -vv, -vvv, etc)
#[clap(short = 'v', long = "verbose", parse(from_occurrences))]
verbose: usize,
}
#[tokio::main]
async fn main() -> Result<()> {
let opt = Opt::parse();
println!("opts: {:?}", opt);
info!("opts: {:?}", opt);
env_logger::init();
let config_path = match opt.config_path {
Some(path) => path,
None => PathBuf::from(env!("HOME")).join(".mbsyncrc"),
};
println!("config path: {:?}", config_path);
info!("config path: {:?}", config_path);
let config = config::read_from_file(&config_path)?;
println!("config: {:?}", config);
info!("config: {:?}", config);
for channel in config.channels.values() {
println!("beginning to sync {}", channel.name);
info!("beginning to sync {}", channel.name);
let far = store::open(&config, &channel.far).await?;
let near = store::open(&config, &channel.near).await?;

View file

@ -17,6 +17,8 @@ pub async fn open(config: &Config, store_name: impl AsRef<str>) -> Result<Opened
.accounts
.get(&store.account)
.expect("already checked by config reader");
debug!("account: {:?}", account);
match account {
Account::Imap(account) => {
let port = account.port.unwrap_or_else(|| match account.ssltype {
@ -27,15 +29,20 @@ pub async fn open(config: &Config, store_name: impl AsRef<str>) -> Result<Opened
SslType::None | SslType::Starttls => false,
SslType::Imaps => true,
};
trace!("opening client...");
let mut client = ConfigBuilder::default()
.hostname(account.host.clone())
.port(port)
.tls(tls)
.open()
.await?;
trace!("opened.");
if let SslType::Starttls = account.ssltype {
trace!("upgrading client...");
client = client.upgrade().await?;
trace!("upgraded.");
}
println!("connected");

View file

@ -8,7 +8,7 @@ use nom::{
};
/// Glue code between nom and Bytes so they work together.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[derive(Clone, Debug, Default, PartialEq, Eq, Hash)]
pub struct Bytes(bytes::Bytes);
impl Bytes {
@ -40,6 +40,10 @@ impl From<&'static [u8]> for Bytes {
fn from(slice: &'static [u8]) -> Self { Bytes(bytes::Bytes::from(slice)) }
}
impl From<Vec<u8>> for Bytes {
fn from(slice: Vec<u8>) -> Self { Bytes(bytes::Bytes::from(slice)) }
}
impl From<&'static str> for Bytes {
fn from(s: &'static str) -> Self { Bytes(bytes::Bytes::from(s.as_bytes())) }
}