add some docs

This commit is contained in:
Michael Zhang 2021-08-09 01:28:54 -05:00
parent 206b9750e0
commit e2e606f324
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
8 changed files with 174 additions and 65 deletions

View file

@ -19,6 +19,7 @@ required-features = ["stderrlog"]
[features] [features]
default = ["rfc2177", "rfc6154"] default = ["rfc2177", "rfc6154"]
low-level = []
rfc2087 = [] # quota rfc2087 = [] # quota
rfc2177 = [] # idle rfc2177 = [] # idle
rfc6154 = [] # list rfc6154 = [] # list

View file

@ -6,7 +6,7 @@ use futures::{
future::{self, FutureExt}, future::{self, FutureExt},
stream::{Stream, StreamExt}, stream::{Stream, StreamExt},
}; };
use tokio::{net::TcpStream, sync::mpsc}; use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream; use tokio_rustls::client::TlsStream;
use crate::proto::{ use crate::proto::{
@ -46,6 +46,10 @@ pub struct Config {
pub verify_hostname: bool, pub verify_hostname: bool,
} }
impl Config {
pub fn builder() -> ConfigBuilder { ConfigBuilder::default() }
}
impl ConfigBuilder { impl ConfigBuilder {
pub async fn open(&self) -> Result<ClientUnauthenticated> { pub async fn open(&self) -> Result<ClientUnauthenticated> {
let config = self.build()?; let config = self.build()?;
@ -67,6 +71,7 @@ impl ConfigBuilder {
} }
} }
/// A client that hasn't been authenticated.
pub enum ClientUnauthenticated { pub enum ClientUnauthenticated {
Encrypted(Inner<TlsStream<TcpStream>>), Encrypted(Inner<TlsStream<TcpStream>>),
Unencrypted(Inner<TcpStream>), Unencrypted(Inner<TcpStream>),
@ -87,6 +92,7 @@ impl ClientUnauthenticated {
client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>); client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>);
} }
/// A client that has been authenticated.
pub enum ClientAuthenticated { pub enum ClientAuthenticated {
Encrypted(Inner<TlsStream<TcpStream>>), Encrypted(Inner<TlsStream<TcpStream>>),
Unencrypted(Inner<TcpStream>), Unencrypted(Inner<TcpStream>),
@ -96,13 +102,6 @@ impl ClientAuthenticated {
client_expose!(async execute(cmd: Command) -> Result<ResponseStream>); client_expose!(async execute(cmd: Command) -> Result<ResponseStream>);
client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>); client_expose!(async has_capability(cap: impl AsRef<str>) -> Result<bool>);
fn sender(&self) -> mpsc::UnboundedSender<String> {
match self {
ClientAuthenticated::Encrypted(e) => e.write_tx.clone(),
ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(),
}
}
/// Runs the LIST command /// Runs the LIST command
pub async fn list(&mut self) -> Result<Vec<Mailbox>> { pub async fn list(&mut self) -> Result<Vec<Mailbox>> {
let cmd = Command::List(CommandList { let cmd = Command::List(CommandList {
@ -216,11 +215,13 @@ impl ClientAuthenticated {
/// Runs the IDLE command /// Runs the IDLE command
#[cfg(feature = "rfc2177")] #[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
pub async fn idle(&mut self) -> Result<IdleToken> { pub async fn idle<'a>(&'a mut self) -> Result<IdleToken<'a>> {
let cmd = Command::Idle; let cmd = Command::Idle;
let stream = self.execute(cmd).await?; let stream = self.execute(cmd).await?;
let sender = self.sender(); Ok(IdleToken {
Ok(IdleToken { stream, sender }) stream,
_client: self,
})
} }
} }
@ -240,23 +241,24 @@ pub struct SelectResponse {
/// DONE command will be sent to the server as a result. /// DONE command will be sent to the server as a result.
#[cfg(feature = "rfc2177")] #[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
pub struct IdleToken { pub struct IdleToken<'a> {
pub stream: ResponseStream, pub stream: ResponseStream,
sender: mpsc::UnboundedSender<String>, // sender: mpsc::UnboundedSender<TaggedCommand>,
_client: &'a mut ClientAuthenticated,
} }
#[cfg(feature = "rfc2177")] #[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
impl Drop for IdleToken { impl<'a> Drop for IdleToken<'a> {
fn drop(&mut self) { fn drop(&mut self) {
// TODO: should ignore this? // TODO: put this into a channel instead
self.sender.send(format!("DONE\r\n")).unwrap(); // tokio::spawn(self.client.execute(Command::Done));
} }
} }
#[cfg(feature = "rfc2177")] #[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))] #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
impl Stream for IdleToken { impl<'a> Stream for IdleToken<'a> {
type Item = Response; type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let stream = Pin::new(&mut self.stream); let stream = Pin::new(&mut self.stream);

View file

@ -1,11 +1,16 @@
use std::io; use std::io;
use bytes::{Buf, BytesMut}; use bytes::{Buf, BufMut, BytesMut};
use nom::Needed; use nom::Needed;
use tokio_util::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
use crate::proto::{command::Command, response::Response, rfc3501::response as parse_response}; use crate::proto::{
command::Command,
response::{Response, Tag},
rfc3501::response as parse_response,
};
/// A codec that can be used for decoding `Response`s and encoding `Command`s.
#[derive(Default)] #[derive(Default)]
pub struct ImapCodec { pub struct ImapCodec {
decode_need_message_bytes: usize, decode_need_message_bytes: usize,
@ -47,14 +52,19 @@ impl<'a> Decoder for ImapCodec {
} }
} }
impl<'a> Encoder<&'a Command> for ImapCodec { /// A command with its accompanying tag.
pub struct TaggedCommand(pub Tag, pub Command);
impl<'a> Encoder<&'a TaggedCommand> for ImapCodec {
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, _msg: &Command, _dst: &mut BytesMut) -> Result<(), io::Error> { fn encode(&mut self, tagged_cmd: &TaggedCommand, dst: &mut BytesMut) -> Result<(), io::Error> {
todo!() let tag = &tagged_cmd.0;
// dst.put(&*msg.0); let _command = &tagged_cmd.1;
// dst.put_u8(b' ');
// dst.put_slice(&*msg.1); dst.put(&*tag.0);
// dst.put_slice(b"\r\n"); dst.put_u8(b' ');
// Ok(()) // TODO: write command
dst.put_slice(b"\r\n");
Ok(())
} }
} }

View file

@ -1,27 +1,33 @@
use std::sync::atomic::AtomicU32; use std::sync::atomic::{AtomicU32, Ordering};
use anyhow::Result; use anyhow::Result;
use futures::{ use futures::{
future::{FutureExt, TryFutureExt}, future::{self, FutureExt, TryFutureExt},
sink::SinkExt,
stream::StreamExt, stream::StreamExt,
}; };
use tokio::{ use tokio::{
io::{split, AsyncRead, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}, io::{split, AsyncRead, AsyncWrite, ReadHalf, WriteHalf},
sync::{mpsc, oneshot}, sync::{mpsc, oneshot},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_rustls::client::TlsStream; use tokio_rustls::client::TlsStream;
use tokio_util::codec::FramedRead; use tokio_util::codec::{FramedRead, FramedWrite};
use crate::codec::ImapCodec;
use crate::proto::{ use crate::proto::{
bytes::Bytes, command::Command, response::Response, rfc3501::capability as parse_capability, bytes::Bytes,
command::Command,
response::{Response, Tag},
rfc3501::capability as parse_capability,
}; };
use super::client::Config; use super::client::Config;
use super::codec::{ImapCodec, TaggedCommand};
use super::response_stream::ResponseStream; use super::response_stream::ResponseStream;
use super::upgrade::upgrade; use super::upgrade::upgrade;
const TAG_PREFIX: &str = "panotag";
type ExitSender = oneshot::Sender<()>; type ExitSender = oneshot::Sender<()>;
type ExitListener = oneshot::Receiver<()>; type ExitListener = oneshot::Receiver<()>;
type GreetingSender = oneshot::Sender<()>; type GreetingSender = oneshot::Sender<()>;
@ -32,19 +38,23 @@ type GreetingWaiter = oneshot::Receiver<()>;
pub struct Inner<C> { pub struct Inner<C> {
config: Config, config: Config,
tag_number: AtomicU32, tag_number: AtomicU32,
command_tx: mpsc::UnboundedSender<CommandContainer>,
read_exit: ExitSender, read_exit: ExitSender,
read_handle: JoinHandle<ReadHalf<C>>, read_handle: JoinHandle<ReadHalf<C>>,
write_exit: ExitSender, write_exit: ExitSender,
pub(crate) write_tx: mpsc::UnboundedSender<String>,
write_handle: JoinHandle<WriteHalf<C>>, write_handle: JoinHandle<WriteHalf<C>>,
_write_tx: mpsc::UnboundedSender<TaggedCommand>,
greeting_rx: Option<GreetingWaiter>, greeting_rx: Option<GreetingWaiter>,
} }
#[derive(Debug)]
struct CommandContainer { struct CommandContainer {
tag: Tag,
command: Command, command: Command,
channel: mpsc::UnboundedSender<Response>,
} }
impl<C> Inner<C> impl<C> Inner<C>
@ -52,6 +62,8 @@ where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static, C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{ {
pub async fn new(c: C, config: Config) -> Result<Self> { pub async fn new(c: C, config: Config) -> Result<Self> {
let (command_tx, command_rx) = mpsc::unbounded_channel();
// break the stream of bytes into a reader and a writer // break the stream of bytes into a reader and a writer
// the read_half represents the server->client connection // the read_half represents the server->client connection
// the write_half represents the client->server connection // the write_half represents the client->server connection
@ -63,28 +75,47 @@ where
// spawn the server->client loop // spawn the server->client loop
let (read_exit, exit_rx) = oneshot::channel(); let (read_exit, exit_rx) = oneshot::channel();
let read_handle = tokio::spawn(read_loop(read_half, exit_rx, greeting_tx)); let (write_tx, write_rx) = mpsc::unbounded_channel(); // TODO: maybe an arbitrary/configurable limit here would be better?
let read_handle = tokio::spawn(read_loop(
read_half,
exit_rx,
greeting_tx,
write_tx.clone(),
command_rx,
));
// spawn the client->server loop // spawn the client->server loop
let (write_exit, exit_rx) = oneshot::channel(); let (write_exit, exit_rx) = oneshot::channel();
// TODO: maybe an arbitrary/configurable limit here would be better?
let (write_tx, write_rx) = mpsc::unbounded_channel();
let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx)); let write_handle = tokio::spawn(write_loop(write_half, exit_rx, write_rx));
let tag_number = AtomicU32::new(0); let tag_number = AtomicU32::new(0);
Ok(Inner { Ok(Inner {
config, config,
tag_number, tag_number,
command_tx,
read_exit, read_exit,
read_handle, read_handle,
write_exit, write_exit,
write_tx,
write_handle, write_handle,
_write_tx: write_tx,
greeting_rx: Some(greeting_rx), greeting_rx: Some(greeting_rx),
}) })
} }
pub async fn execute(&mut self, _command: Command) -> Result<ResponseStream> { todo!() } pub async fn execute(&mut self, command: Command) -> Result<ResponseStream> {
let id = self.tag_number.fetch_add(1, Ordering::SeqCst);
let tag = Tag(Bytes::from(format!("{}{}", TAG_PREFIX, id)));
let (channel, rx) = mpsc::unbounded_channel();
self.command_tx.send(CommandContainer {
tag,
command,
channel,
})?;
let stream = ResponseStream { inner: rx };
Ok(stream)
}
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> { pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
// TODO: cache capabilities if needed? // TODO: cache capabilities if needed?
@ -139,6 +170,8 @@ async fn read_loop<C>(
stream: ReadHalf<C>, stream: ReadHalf<C>,
exit: ExitListener, exit: ExitListener,
greeting_tx: GreetingSender, greeting_tx: GreetingSender,
write_tx: mpsc::UnboundedSender<TaggedCommand>,
mut command_rx: mpsc::UnboundedReceiver<CommandContainer>,
) -> ReadHalf<C> ) -> ReadHalf<C>
where where
C: AsyncRead, C: AsyncRead,
@ -158,14 +191,55 @@ where
let next = framed.next().fuse(); let next = framed.next().fuse();
pin_mut!(next); pin_mut!(next);
// only listen for a new command if there isn't one already
let mut cmd_fut = if let Some(_) = curr_cmd {
// if there is one, just make a future that never resolves so it'll always pick
// the other options in the select.
future::pending().boxed().fuse()
} else {
command_rx.recv().boxed().fuse()
};
select! { select! {
msg = next => { // read a command from the command list
println!("hellosu {:?}", msg); mut command = cmd_fut => {
if curr_cmd.is_none() {
if let Some(CommandContainer { tag, command, .. }) = command.take() {
let _ = write_tx.send(TaggedCommand(tag, command));
// let cmd_str = format!("{} {:?}\r\n", tag, cmd);
// write_tx.send(cmd_str);
}
curr_cmd = command;
}
}
// new message from the server
resp = next => {
let resp = match resp {
Some(Ok(v)) => v,
a => { error!("failed: {:?}", a); todo!("fuck"); },
};
trace!("S>>>C: {:?}", resp);
// if this is the very first response, then it's a greeting // if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() { if let Some(greeting_tx) = greeting_tx.take() {
greeting_tx.send(()).unwrap(); greeting_tx.send(()).unwrap();
} }
if let Response::Done(_) = resp {
// since this is the DONE message, clear curr_cmd so another one can be sent
if let Some(CommandContainer { channel, .. }) = curr_cmd.take() {
let _ = channel.send(resp);
// debug!("res0: {:?}", res);
}
} else if let Some(CommandContainer { channel, .. }) = curr_cmd.as_mut() {
// we got a response from the server for this command, so send it over the
// channel
// debug!("sending {:?} to tag {}", resp, tag);
let _res = channel.send(resp);
// debug!("res1: {:?}", res);
}
} }
_ = exit => break, _ = exit => break,
} }
@ -175,30 +249,35 @@ where
} }
async fn write_loop<C>( async fn write_loop<C>(
mut stream: WriteHalf<C>, stream: WriteHalf<C>,
exit_rx: ExitListener, exit_rx: ExitListener,
mut write_rx: mpsc::UnboundedReceiver<String>, mut command_rx: mpsc::UnboundedReceiver<TaggedCommand>,
) -> WriteHalf<C> ) -> WriteHalf<C>
where where
C: AsyncWrite, C: AsyncWrite,
{ {
// set up framed communication
let codec = ImapCodec::default();
let mut framed = FramedWrite::new(stream, codec);
let mut exit_rx = exit_rx.map_err(|_| ()).shared(); let mut exit_rx = exit_rx.map_err(|_| ()).shared();
loop { loop {
let write_fut = write_rx.recv().fuse(); let command_fut = command_rx.recv().fuse();
pin_mut!(write_fut); pin_mut!(command_fut);
select! { select! {
line = write_fut => { command = command_fut => {
if let Some(line) = line {
// TODO: handle errors here // TODO: handle errors here
let _ = stream.write_all(line.as_bytes()).await; if let Some(command) = command {
let _ = stream.flush().await; let _ = framed.send(&command).await;
trace!("C>>>S: {:?}", line);
} }
// let _ = stream.write_all(line.as_bytes()).await;
// let _ = stream.flush().await;
// trace!("C>>>S: {:?}", line);
} }
_ = exit_rx => break, _ = exit_rx => break,
} }
} }
stream framed.into_inner()
} }

View file

@ -1,13 +1,27 @@
//! High-level IMAP Client
//! ---
//!
//! ```no_run
//! let client = Config::builder()
//! .hostname("your.domain")
//! .port(993)
//! .open().await?;
//! ```
#[macro_use] #[macro_use]
mod macros; mod macros;
pub mod auth; pub mod auth;
pub mod client;
pub mod configurable_cert_verifier; pub mod configurable_cert_verifier;
pub mod response_stream; pub mod response_stream;
pub mod upgrade;
mod client;
mod codec;
mod inner; mod inner;
mod upgrade;
pub use self::client::ConfigBuilder; pub use self::client::{ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder};
pub use self::codec::{ImapCodec, TaggedCommand};
#[cfg(feature = "low-level")]
pub use self::inner::Inner; pub use self::inner::Inner;

View file

@ -8,10 +8,9 @@ extern crate log;
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
extern crate derive_builder; extern crate derive_builder;
#[macro_use] // #[macro_use]
extern crate bitflags; // extern crate bitflags;
pub mod client; pub mod client;
pub mod codec; // pub mod events;
pub mod events;
pub mod proto; pub mod proto;

View file

@ -1,6 +1,6 @@
use crate::proto::bytes::Bytes; use crate::proto::bytes::Bytes;
#[derive(Debug)] #[derive(Clone, Debug)]
pub enum Command { pub enum Command {
// Any state // Any state
Capability, Capability,
@ -41,32 +41,34 @@ pub enum Command {
// Extensions // Extensions
#[cfg(feature = "rfc2177")] #[cfg(feature = "rfc2177")]
Idle, Idle,
#[cfg(feature = "rfc2177")]
Done,
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct CommandFetch { pub struct CommandFetch {
pub ids: Vec<u32>, pub ids: Vec<u32>,
pub items: FetchItems, pub items: FetchItems,
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct CommandList { pub struct CommandList {
pub reference: Bytes, pub reference: Bytes,
pub mailbox: Bytes, pub mailbox: Bytes,
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct CommandLogin { pub struct CommandLogin {
pub username: Bytes, pub username: Bytes,
pub password: Bytes, pub password: Bytes,
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct CommandSearch { pub struct CommandSearch {
pub criteria: SearchCriteria, pub criteria: SearchCriteria,
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct CommandSelect { pub struct CommandSelect {
pub mailbox: Bytes, pub mailbox: Bytes,
} }

View file

@ -1,3 +1,5 @@
//! Helper functions for manipulating the wire protocol.
#![allow(non_snake_case, dead_code)] #![allow(non_snake_case, dead_code)]
// utils // utils