2021-08-05 update

This commit is contained in:
Michael Zhang 2021-08-05 21:26:39 -05:00
parent 5a9f847097
commit 1e2b1f5f5e
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
12 changed files with 1503 additions and 12 deletions

512
Cargo.lock generated
View file

@ -8,6 +8,23 @@ version = "1.0.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486"
[[package]]
name = "arrayvec"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b"
[[package]]
name = "async-trait"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
@ -25,18 +42,48 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "base64"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bitflags"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693"
[[package]]
name = "bitvec"
version = "0.19.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321"
dependencies = [
"funty",
"radium",
"tap",
"wyz",
]
[[package]]
name = "bumpalo"
version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631"
[[package]]
name = "bytes"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cc"
version = "1.0.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e70cc2f62c6ce1868963827bd677764c62d07c3d9a3e1fb1177ee1a9ab199eb2"
[[package]]
name = "cfg-if"
version = "1.0.0"
@ -88,6 +135,84 @@ dependencies = [
"syn",
]
[[package]]
name = "darling"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f2c43f534ea4b0b049015d00269734195e6d3f0f6635cb692251aca6f9f8b3c"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e91455b86830a1c21799d94524df0845183fa55bafd9aa137b01c7d1065fa36"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn",
]
[[package]]
name = "darling_macro"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29b5acf0dea37a7f66f7b25d2c5e93fd46f8f6968b1a5d7a3e02e97768afc95a"
dependencies = [
"darling_core",
"quote",
"syn",
]
[[package]]
name = "derive_builder"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d13202debe11181040ae9063d739fa32cfcaaebe2275fe387703460ae2365b30"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66e616858f6187ed828df7c64a6d71720d83767a7f19740b2d1b6fe6327b36e5"
dependencies = [
"darling",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "derive_builder_macro"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58a94ace95092c5acb1e97a7e846b310cfbd499652f72297da7493f618a98d73"
dependencies = [
"derive_builder_core",
"syn",
]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "funty"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7"
[[package]]
name = "futures"
version = "0.3.16"
@ -182,6 +307,25 @@ dependencies = [
"slab",
]
[[package]]
name = "h2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
@ -206,6 +350,95 @@ dependencies = [
"libc",
]
[[package]]
name = "http"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68"
[[package]]
name = "httpdate"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440"
[[package]]
name = "hyper"
version = "0.14.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b61cf2d1aebcf6e6352c97b81dc2244ca29194be1b276f5d8ad5c6330fffb11"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "imap"
version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"derive_builder",
"futures",
"imap-proto",
"log",
"nom",
"tokio",
"tokio-rustls",
"tokio-util",
"webpki-roots",
]
[[package]]
name = "imap-proto"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ad9b46a79efb6078e578ae04e51463d7c3e8767864687f7e63095b3cbefafbb"
dependencies = [
"nom",
]
[[package]]
name = "indexmap"
version = "1.7.0"
@ -247,12 +480,40 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "js-sys"
version = "0.3.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83bdfbace3a0e81a4253f73b49e960b053e396a11012cbd49b9b74d6a2b67062"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "lexical-core"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe"
dependencies = [
"arrayvec",
"bitflags",
"cfg-if",
"ryu",
"static_assertions",
]
[[package]]
name = "libc"
version = "0.2.98"
@ -279,9 +540,9 @@ dependencies = [
[[package]]
name = "memchr"
version = "2.4.0"
version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "mio"
@ -305,6 +566,19 @@ dependencies = [
"winapi",
]
[[package]]
name = "nom"
version = "6.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c5c51b9083a3c620fa67a2a635d1ce7d95b897e957d6b28ff9a5da960a103a6"
dependencies = [
"bitvec",
"funty",
"lexical-core",
"memchr",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -360,13 +634,17 @@ name = "panorama-daemon"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"clap",
"futures",
"hyper",
"imap",
"inotify",
"log",
"serde",
"stderrlog",
"tokio",
"tokio-rustls",
"toml",
"xdg",
]
@ -462,6 +740,12 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "radium"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8"
[[package]]
name = "redox_syscall"
version = "0.2.9"
@ -471,12 +755,56 @@ dependencies = [
"bitflags",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rustls"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64",
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "serde"
version = "1.0.126"
@ -518,6 +846,18 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "stderrlog"
version = "0.5.1"
@ -548,6 +888,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "tap"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369"
[[package]]
name = "termcolor"
version = "1.1.2"
@ -617,6 +963,31 @@ dependencies = [
"syn",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-util"
version = "0.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
dependencies = [
"bytes",
"futures-core",
"futures-sink",
"log",
"pin-project-lite",
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
@ -626,6 +997,38 @@ dependencies = [
"serde",
]
[[package]]
name = "tower-service"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052"
dependencies = [
"lazy_static",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
@ -644,6 +1047,12 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "vec_map"
version = "0.8.2"
@ -656,12 +1065,105 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.74"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f"
[[package]]
name = "web-sys"
version = "0.3.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940"
dependencies = [
"webpki",
]
[[package]]
name = "winapi"
version = "0.3.9"
@ -693,6 +1195,12 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "xdg"
version = "2.2.0"

View file

@ -1,4 +1,5 @@
[workspace]
members = [
"daemon",
"imap",
]

View file

@ -5,12 +5,16 @@ edition = "2018"
[dependencies]
anyhow = "1.0.42"
serde = { version = "1.0.126", features = ["derive"] }
tokio = { version = "1.9.0", features = ["full"] }
async-trait = "0.1.50"
clap = "3.0.0-beta.2"
futures = "0.3.16"
hyper = { version = "0.14.11", features = ["server", "http2", "stream"] }
imap = { path = "../imap" }
inotify = { version = "0.9.3", features = ["stream"] }
xdg = "2.2.0"
log = "0.4.14"
toml = "0.5.8"
serde = { version = "1.0.126", features = ["derive"] }
stderrlog = "0.5.1"
tokio = { version = "1.9.0", features = ["full"] }
tokio-rustls = "0.22.0"
toml = "0.5.8"
xdg = "2.2.0"

View file

@ -1 +0,0 @@
pub struct ImapClient {}

158
daemon/src/mail/mod.rs Normal file
View file

@ -0,0 +1,158 @@
/// The main function for the IMAP syncing thread
pub async fn sync_main(
acct_name: impl AsRef<str>,
acct: MailAccountConfig,
mail2ui_tx: UnboundedSender<MailEvent>,
mail_store: MailStore,
) -> Result<()> {
let acct_name = acct_name.as_ref().to_owned();
// loop ensures that the connection is retried after it dies
loop {
let builder: ClientConfig = ClientBuilder::default()
.hostname(acct.imap.server.clone())
.port(acct.imap.port)
.tls(matches!(acct.imap.tls, TlsMethod::On))
.build()
.map_err(|err| anyhow!("err: {}", err))?;
debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port);
let unauth = builder.open().await?;
let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) {
debug!("attempting to upgrade");
let client = unauth.upgrade().await?;
debug!("upgrade successful");
client
} else {
unauth
};
debug!("preparing to auth");
// check if the authentication method is supported
let mut authed = match &acct.imap.auth {
ImapAuth::Plain { username, password } => {
let auth = auth::Plain {
username: username.clone(),
password: password.clone(),
};
auth.perform_auth(unauth).await?
}
};
debug!("authentication successful!");
let folder_list = authed.list().await?;
let _ = mail2ui_tx.send(MailEvent::FolderList(
acct_name.clone(),
folder_list.clone(),
));
debug!("mailbox list: {:?}", folder_list);
for folder in folder_list.iter() {
debug!("folder: {}", folder);
let select = authed.select(folder).await?;
debug!("select response: {:?}", select);
if let (Some(exists), Some(uidvalidity)) = (select.exists, select.uid_validity) {
// figure out which uids don't exist locally yet
let new_uids = stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
mail_store.try_identify_email(&acct_name, &folder, uid, uidvalidity, None)
// invert the option to only select uids that haven't been downloaded
.map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
.map_err(|err| err.context("error checking if the email is already downloaded [try_identify_email]"))
}).try_collect::<Vec<_>>().await?;
if !new_uids.is_empty() {
debug!("fetching uids {:?}", new_uids);
let fetched = authed
.uid_fetch(&new_uids, FetchItems::PanoramaAll)
.await
.context("error fetching uids")?;
fetched
.map(Ok)
.try_for_each_concurrent(None, |(uid, attrs)| {
mail_store.store_email(&acct_name, &folder, uid, uidvalidity, attrs)
})
.await
.context("error during fetch-store")?;
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// TODO: remove this later
continue;
// let's just select INBOX for now, maybe have a config for default mailbox later?
debug!("selecting the INBOX mailbox");
let select = authed.select("INBOX").await?;
debug!("select result: {:?}", select);
loop {
let message_uids = authed.uid_search().await?;
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
let _ = mail2ui_tx.send(MailEvent::MessageUids(
acct_name.clone(),
message_uids.clone(),
));
// TODO: make this happen concurrently with the main loop?
let mut message_list = authed
.uid_fetch(&message_uids, FetchItems::All)
.await
.unwrap();
while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs);
// TODO: probably odn't care about this?
let _ = mail2ui_tx.send(evt);
}
// check if IDLE is supported
let supports_idle = authed.has_capability("IDLE").await?;
if supports_idle {
let mut idle_stream = authed.idle().await?;
loop {
let evt = match idle_stream.next().await {
Some(v) => v,
None => break,
};
debug!("got an event: {:?}", evt);
match evt {
Response::MailboxData(MailboxData::Exists(uid)) => {
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
// send DONE to stop the idle
std::mem::drop(idle_stream);
let handle = Notification::new()
.summary("New Email")
.body("holy Shit,")
.icon("firefox")
.timeout(Timeout::Milliseconds(6000))
.show()?;
let message_uids = authed.uid_search().await?;
let message_uids =
message_uids.into_iter().take(20).collect::<Vec<_>>();
let _ = mail2ui_tx.send(MailEvent::MessageUids(
acct_name.clone(),
message_uids.clone(),
));
// TODO: make this happen concurrently with the main loop?
let mut message_list = authed
.uid_fetch(&message_uids, FetchItems::All)
.await
.unwrap();
while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs);
// debug!("sent {:?}", evt);
mail2ui_tx.send(evt);
}
idle_stream = authed.idle().await?;
}
_ => {}
}
}
} else {
loop {
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
debug!("heartbeat");
}
}
if false {
break;
}
}
// wait a bit so we're not hitting the server really fast if the fail happens
// early on
//
// TODO: some kind of smart exponential backoff that considers some time
// threshold to be a failing case?
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}

View file

@ -6,14 +6,18 @@ extern crate log;
extern crate futures;
mod config;
mod imap;
mod mail;
use anyhow::Result;
use clap::Clap;
use futures::future::FutureExt;
use futures::future::{
select,
Either::{Left, Right},
FutureExt,
};
use tokio::sync::oneshot;
use crate::config::Config;
use crate::config::{Config, MailAccountConfig, TlsMethod};
type ExitListener = oneshot::Receiver<()>;
@ -31,7 +35,6 @@ struct Options {
#[tokio::main]
async fn main() -> Result<()> {
let opt = Options::parse();
println!("{:?}", opt);
stderrlog::new()
.module(module_path!())
@ -53,14 +56,54 @@ async fn main() -> Result<()> {
}
async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> {
println!("run with config: {:?}", config);
debug!("new config");
let mut notify_mail_threads = Vec::new();
for (account_name, account) in config.mail_accounts {
let (exit_tx, exit_rx) = oneshot::channel();
tokio::spawn(run_single_mail_account(account_name, account, exit_rx));
notify_mail_threads.push(exit_tx);
}
exit.await?;
for exit_tx in notify_mail_threads {
let _ = exit_tx.send(());
}
Ok(())
}
async fn run_single_mail_account(
account_name: String,
account: MailAccountConfig,
exit: ExitListener,
) -> Result<()> {
debug!("connecting to account {}", account_name);
let imap_cookie = ImapClient::open(
&account.imap.server,
account.imap.port,
matches!(account.imap.tls, TlsMethod::On),
);
pin_mut!(imap_cookie);
pin_mut!(exit);
let (imap, exit) = match select(imap_cookie, exit).await {
Left(res) => res,
Right(_) => return Ok(()),
};
debug!("connected to {}", account.imap.server);
let imap = imap?;
let mut exit = exit.fuse();
loop {
select! {
_ = exit => break,
}
}
debug!("disconnecting from account {}", account_name);
Ok(())
}

20
imap/Cargo.toml Normal file
View file

@ -0,0 +1,20 @@
[package]
name = "imap"
version = "0.1.0"
edition = "2018"
[features]
rfc2177-idle = []
[dependencies]
anyhow = "1.0.42"
bytes = "1.0.1"
derive_builder = "0.10.2"
futures = "0.3.16"
imap-proto = "0.14.3"
log = "0.4.14"
nom = "6.2.1"
tokio = { version = "1.9.0", features = ["full"] }
tokio-rustls = "0.22.0"
tokio-util = { version = "0.6.7", features = ["codec"] }
webpki-roots = "0.21.1"

59
imap/src/auth.rs Normal file
View file

@ -0,0 +1,59 @@
use anyhow::Result;
use crate::command::Command;
use crate::response::{Response, ResponseDone, Status};
use super::{ClientAuthenticated, ClientUnauthenticated};
#[async_trait]
pub trait Auth {
/// Performs authentication, consuming the client
// TODO: return the unauthed client if failed?
async fn perform_auth(self, client: ClientUnauthenticated) -> Result<ClientAuthenticated>;
/// Converts the wrappers around the client once the authentication has happened. Should only
/// be called by the `perform_auth` function.
fn convert_client(client: ClientUnauthenticated) -> ClientAuthenticated {
match client {
ClientUnauthenticated::Encrypted(e) => ClientAuthenticated::Encrypted(e),
ClientUnauthenticated::Unencrypted(e) => ClientAuthenticated::Unencrypted(e),
}
}
}
pub struct Plain {
pub username: String,
pub password: String,
}
#[async_trait]
impl Auth for Plain {
async fn perform_auth(self, mut client: ClientUnauthenticated) -> Result<ClientAuthenticated> {
let command = Command::Login {
username: self.username,
password: self.password,
};
let result = client.execute(command).await?;
let done = result.done().await?;
assert!(done.is_some());
let done = done.unwrap();
if done.status != Status::Ok {
bail!("unable to login: {:?}", done);
}
// if !matches!(
// result,
// Response::Done(ResponseDone {
// status: Status::Ok,
// ..
// })
// ) {
// bail!("unable to login: {:?}", result);
// }
Ok(<Self as Auth>::convert_client(client))
}
}

284
imap/src/client.rs Normal file
View file

@ -0,0 +1,284 @@
use std::borrow::Cow;
use std::sync::Arc;
use anyhow::Result;
use futures::{
future::{self, FutureExt},
stream::{Stream, StreamExt},
};
use tokio::{net::TcpStream, sync::mpsc};
use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
};
pub use super::inner::{Client, ResponseStream};
/// Struct used to start building the config for a client.
///
/// Call [`.build`][1] to _build_ the config, then run [`.open`][2] to actually start opening
/// the connection to the server.
///
/// [1]: self::ClientConfigBuilder::build
/// [2]: self::ClientConfig::open
pub type ClientBuilder = ClientConfigBuilder;
/// An IMAP client that hasn't been connected yet.
#[derive(Builder, Clone, Debug)]
pub struct ClientConfig {
/// The hostname of the IMAP server. If using TLS, must be an address
hostname: String,
/// The port of the IMAP server.
port: u16,
/// Whether or not the client is using an encrypted stream.
///
/// To upgrade the connection later, use the upgrade method.
tls: bool,
}
impl ClientConfig {
pub async fn open(self) -> Result<ClientUnauthenticated> {
let hostname = self.hostname.as_ref();
let port = self.port;
let conn = TcpStream::connect((hostname, port)).await?;
if self.tls {
let mut tls_config = RustlsConfig::new();
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap();
let conn = tls_config.connect(dnsname, conn).await?;
let mut inner = Client::new(conn, self);
inner.wait_for_greeting().await?;
return Ok(ClientUnauthenticated::Encrypted(inner));
} else {
let mut inner = Client::new(conn, self);
inner.wait_for_greeting().await?;
return Ok(ClientUnauthenticated::Unencrypted(inner));
}
}
}
pub enum ClientUnauthenticated {
Encrypted(Client<TlsStream<TcpStream>>),
Unencrypted(Client<TcpStream>),
}
impl ClientUnauthenticated {
pub async fn upgrade(self) -> Result<ClientUnauthenticated> {
match self {
// this is a no-op, we don't need to upgrade
ClientUnauthenticated::Encrypted(_) => Ok(self),
ClientUnauthenticated::Unencrypted(e) => {
Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?))
}
}
}
/// Exposing low-level execute
async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
match self {
ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await,
ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await,
}
}
/// Checks if the server that the client is talking to has support for the given capability.
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
match self {
ClientUnauthenticated::Encrypted(e) => e.has_capability(cap).await,
ClientUnauthenticated::Unencrypted(e) => e.has_capability(cap).await,
}
}
}
pub enum ClientAuthenticated {
Encrypted(Client<TlsStream<TcpStream>>),
Unencrypted(Client<TcpStream>),
}
impl ClientAuthenticated {
/// Exposing low-level execute
async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
match self {
ClientAuthenticated::Encrypted(e) => e.execute(cmd).await,
ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await,
}
}
fn sender(&self) -> mpsc::UnboundedSender<String> {
match self {
ClientAuthenticated::Encrypted(e) => e.write_tx.clone(),
ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(),
}
}
/// Checks if the server that the client is talking to has support for the given capability.
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
match self {
ClientAuthenticated::Encrypted(e) => e.has_capability(cap).await,
ClientAuthenticated::Unencrypted(e) => e.has_capability(cap).await,
}
}
/// Runs the LIST command
pub async fn list(&mut self) -> Result<Vec<String>> {
let cmd = Command::List {
reference: "".to_owned(),
mailbox: "*".to_owned(),
};
let res = self.execute(cmd).await?;
let (_, data) = res.wait().await?;
let mut folders = Vec::new();
for resp in data {
if let Response::MailboxData(MailboxData::List { name, .. }) = resp {
folders.push(name.to_owned());
}
}
Ok(folders)
}
/// Runs the SELECT command
pub async fn select(&mut self, mailbox: impl AsRef<str>) -> Result<SelectResponse> {
let cmd = Command::Select {
mailbox: mailbox.as_ref().to_owned(),
};
let stream = self.execute(cmd).await?;
let (_, data) = stream.wait().await?;
let mut select = SelectResponse::default();
for resp in data {
match resp {
Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags,
Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists),
Response::MailboxData(MailboxData::Recent(recent)) => select.recent = Some(recent),
Response::Data(ResponseData {
status: Status::Ok,
code: Some(code),
..
}) => match code {
ResponseCode::Unseen(value) => select.unseen = Some(value),
ResponseCode::UidNext(value) => select.uid_next = Some(value),
ResponseCode::UidValidity(value) => select.uid_validity = Some(value),
_ => {}
},
_ => {}
}
}
Ok(select)
}
/// Runs the SEARCH command
pub async fn uid_search(&mut self) -> Result<Vec<u32>> {
let cmd = Command::UidSearch {
criteria: SearchCriteria::All,
};
let stream = self.execute(cmd).await?;
let (_, data) = stream.wait().await?;
for resp in data {
if let Response::MailboxData(MailboxData::Search(uids)) = resp {
return Ok(uids);
}
}
bail!("could not find the SEARCH response")
}
/// Runs the FETCH command
pub async fn fetch(
&mut self,
uids: &[u32],
items: FetchItems,
) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> {
let cmd = Command::Fetch {
uids: uids.to_vec(),
items,
};
debug!("fetch: {}", cmd);
let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(),
}))
}
/// Runs the UID FETCH command
pub async fn uid_fetch(
&mut self,
uids: &[u32],
items: FetchItems,
) -> Result<impl Stream<Item = (u32, Vec<AttributeValue>)>> {
let cmd = Command::UidFetch {
uids: uids.to_vec(),
items,
};
debug!("uid fetch: {}", cmd);
let stream = self.execute(cmd).await?;
// let (done, data) = stream.wait().await?;
Ok(stream.filter_map(|resp| match resp {
Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(),
Response::Done(_) => future::ready(None).boxed(),
_ => future::pending().boxed(),
}))
}
/// Runs the IDLE command
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
pub async fn idle(&mut self) -> Result<IdleToken> {
let cmd = Command::Idle;
let stream = self.execute(cmd).await?;
let sender = self.sender();
Ok(IdleToken { stream, sender })
}
}
#[derive(Debug, Default)]
pub struct SelectResponse<'a> {
pub flags: Vec<Cow<'a, str>>,
pub exists: Option<u32>,
pub recent: Option<u32>,
pub uid_next: Option<u32>,
pub uid_validity: Option<u32>,
pub unseen: Option<u32>,
}
/// A token that represents an idling connection.
///
/// Dropping this token indicates that the idling should be completed, and the DONE command will be
/// sent to the server as a result.
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
pub struct IdleToken {
pub stream: ResponseStream,
sender: mpsc::UnboundedSender<String>,
}
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
impl Drop for IdleToken {
fn drop(&mut self) {
// TODO: should ignore this?
self.sender.send(format!("DONE\r\n")).unwrap();
}
}
#[cfg(feature = "rfc2177-idle")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))]
impl Stream for IdleToken {
type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let stream = Pin::new(&mut self.stream);
Stream::poll_next(stream, cx)
}
}

90
imap/src/codec.rs Normal file
View file

@ -0,0 +1,90 @@
use std::io;
use std::mem;
use bytes::{BufMut, Bytes, BytesMut};
use nom::{self, Needed};
use tokio_util::codec::{Decoder, Encoder};
use imap_proto::types::{Request, RequestId, Response};
#[derive(Default)]
pub struct ImapCodec {
decode_need_message_bytes: usize,
}
impl<'a> Decoder for ImapCodec {
type Item = ResponseData;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, io::Error> {
if self.decode_need_message_bytes > buf.len() {
return Ok(None);
}
let (response, rsp_len) = match imap_proto::Response::from_bytes(buf) {
Ok((remaining, response)) => {
// This SHOULD be acceptable/safe: BytesMut storage memory is
// allocated on the heap and should not move. It will not be
// freed as long as we keep a reference alive, which we do
// by retaining a reference to the split buffer, below.
let response = unsafe { mem::transmute(response) };
(response, buf.len() - remaining.len())
}
Err(nom::Err::Incomplete(Needed::Size(min))) => {
self.decode_need_message_bytes = min.get();
return Ok(None);
}
Err(nom::Err::Incomplete(_)) => {
return Ok(None);
}
Err(nom::Err::Error(nom::error::Error { code, .. }))
| Err(nom::Err::Failure(nom::error::Error { code, .. })) => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("{:?} during parsing of {:?}", code, buf),
));
}
};
let raw = buf.split_to(rsp_len).freeze();
self.decode_need_message_bytes = 0;
Ok(Some(ResponseData { raw, response }))
}
}
impl<'a> Encoder<&'a Request<'a>> for ImapCodec {
type Error = io::Error;
fn encode(&mut self, msg: &Request, dst: &mut BytesMut) -> Result<(), io::Error> {
dst.put(&*msg.0);
dst.put_u8(b' ');
dst.put_slice(&*msg.1);
dst.put_slice(b"\r\n");
Ok(())
}
}
#[derive(Debug)]
pub struct ResponseData {
raw: Bytes,
// This reference is really scoped to the lifetime of the `raw`
// member, but unfortunately Rust does not allow that yet. It
// is transmuted to `'static` by the `Decoder`, instead, and
// references returned to callers of `ResponseData` are limited
// to the lifetime of the `ResponseData` struct.
//
// `raw` is never mutated during the lifetime of `ResponseData`,
// and `Response` does not not implement any specific drop glue.
response: Response<'static>,
}
impl ResponseData {
pub fn request_id(&self) -> Option<&RequestId> {
match self.response {
Response::Done { ref tag, .. } => Some(tag),
_ => None,
}
}
#[allow(clippy::needless_lifetimes)]
pub fn parsed<'a>(&'a self) -> &'a Response<'a> {
&self.response
}
}

312
imap/src/inner.rs Normal file
View file

@ -0,0 +1,312 @@
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Result;
use futures::{
future::{self, FutureExt, TryFutureExt},
stream::{Stream, StreamExt},
};
use imap_proto::{builders::command::Command, types::Response};
use tokio::{
io::{
self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf,
},
sync::{
mpsc,
oneshot::{self, error::TryRecvError},
},
task::JoinHandle,
};
use tokio_rustls::{
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
};
use tokio_util::codec::FramedRead;
use crate::codec::{ImapCodec, ResponseData};
pub const TAG_PREFIX: &str = "ptag";
type Command2 = (String, Command, mpsc::UnboundedSender<ResponseData>);
pub struct Client<C> {
ctr: usize,
config: ClientConfig,
// conn: WriteHalf<C>,
pub(crate) write_tx: mpsc::UnboundedSender<String>,
cmd_tx: mpsc::UnboundedSender<Command2>,
greeting_rx: Option<oneshot::Receiver<()>>,
writer_exit_tx: oneshot::Sender<()>,
writer_handle: JoinHandle<Result<WriteHalf<C>>>,
listener_exit_tx: oneshot::Sender<()>,
listener_handle: JoinHandle<Result<ReadHalf<C>>>,
}
impl<C> Client<C>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
pub fn new(conn: C, config: ClientConfig) -> Self {
let (read_half, mut write_half) = io::split(conn);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (greeting_tx, greeting_rx) = oneshot::channel();
let (writer_exit_tx, exit_rx) = oneshot::channel();
let (write_tx, mut write_rx) = mpsc::unbounded_channel::<String>();
let writer_handle = tokio::spawn(write(write_half, write_rx, exit_rx).map_err(|err| {
error!("Help, the writer loop died: {}", err);
err
}));
let (exit_tx, exit_rx) = oneshot::channel();
let listener_handle = tokio::spawn(
listen(read_half, cmd_rx, write_tx.clone(), greeting_tx, exit_rx).map_err(|err| {
error!("Help, the listener loop died: {:?} {}", err, err);
err
}),
);
Client {
ctr: 0,
// conn: write_half,
config,
cmd_tx,
write_tx,
greeting_rx: Some(greeting_rx),
writer_exit_tx,
listener_exit_tx: exit_tx,
writer_handle,
listener_handle,
}
}
pub async fn wait_for_greeting(&mut self) -> Result<()> {
if let Some(greeting_rx) = self.greeting_rx.take() {
greeting_rx.await?;
}
Ok(())
}
pub async fn execute(&mut self, cmd: Command) -> Result<ResponseStream> {
let id = self.ctr;
self.ctr += 1;
let tag = format!("{}{}", TAG_PREFIX, id);
// let cmd_str = format!("{} {}\r\n", tag, cmd);
// self.write_tx.send(cmd_str);
// self.conn.write_all(cmd_str.as_bytes()).await?;
// self.conn.flush().await?;
let (tx, rx) = mpsc::unbounded_channel();
self.cmd_tx.send((tag, cmd, tx))?;
let stream = ResponseStream { inner: rx };
Ok(stream)
}
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
// TODO: cache capabilities if needed?
let cap = cap.as_ref();
let cap = parse_capability(cap)?;
let resp = self.execute(Command::Capability).await?;
let (_, data) = resp.wait().await?;
for resp in data {
if let Response::Capabilities(caps) = resp {
return Ok(caps.contains(&cap));
}
// debug!("cap: {:?}", resp);
}
Ok(false)
}
pub async fn upgrade(mut self) -> Result<Client<TlsStream<C>>> {
// TODO: make sure STARTTLS is in the capability list
if !self.has_capability("STARTTLS").await? {
bail!("server doesn't support this capability");
}
// first, send the STARTTLS command
let mut resp = self.execute(Command::Starttls).await?;
let resp = resp.next().await.unwrap();
debug!("server response to starttls: {:?}", resp);
debug!("sending exit for upgrade");
// TODO: check that the channel is still open?
self.listener_exit_tx.send(()).unwrap();
self.writer_exit_tx.send(()).unwrap();
let (reader, writer) = future::join(self.listener_handle, self.writer_handle).await;
let reader = reader??;
let writer = writer??;
// let reader = self.listener_handle.await??;
// let writer = self.conn;
let conn = reader.unsplit(writer);
let server_name = &self.config.hostname;
let mut tls_config = RustlsConfig::new();
tls_config
.root_store
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
let tls_config = TlsConnector::from(Arc::new(tls_config));
let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap();
let stream = tls_config.connect(dnsname, conn).await?;
debug!("upgraded, stream is using TLS now");
Ok(Client::new(stream, self.config))
}
}
pub struct ResponseStream {
pub(crate) inner: mpsc::UnboundedReceiver<ResponseData>,
}
impl ResponseStream {
/// Retrieves just the DONE item in the stream, discarding the rest
pub async fn done(mut self) -> Result<Option<ResponseDone>> {
while let Some(resp) = self.inner.recv().await {
if let Response::Done(done) = resp {
return Ok(Some(done));
}
}
Ok(None)
}
/// Waits for the entire stream to finish, returning the DONE status and the stream
pub async fn wait(mut self) -> Result<(Option<ResponseDone>, Vec<ResponseData>)> {
let mut done = None;
let mut vec = Vec::new();
while let Some(resp) = self.inner.recv().await {
if let Response::Done(d) = resp {
done = Some(d);
break;
} else {
vec.push(resp);
}
}
Ok((done, vec))
}
}
impl Stream for ResponseStream {
type Item = ResponseData;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
self.inner.poll_recv(cx)
}
}
#[allow(unreachable_code)]
async fn write<C>(
mut conn: WriteHalf<C>,
mut write_rx: mpsc::UnboundedReceiver<String>,
exit_rx: oneshot::Receiver<()>,
) -> Result<WriteHalf<C>>
where
C: AsyncWrite + Unpin,
{
let mut exit_rx = exit_rx.map_err(|_| ()).shared();
loop {
let write_fut = write_rx.recv().fuse();
pin_mut!(write_fut);
select! {
_ = exit_rx => {
break;
}
line = write_fut => {
if let Some(line) = line {
conn.write_all(line.as_bytes()).await?;
conn.flush().await?;
trace!("C>>>S: {:?}", line);
}
}
}
}
Ok(conn)
}
#[allow(unreachable_code)]
async fn listen<C>(
conn: ReadHalf<C>,
mut cmd_rx: mpsc::UnboundedReceiver<Command2>,
mut write_tx: mpsc::UnboundedSender<String>,
greeting_tx: oneshot::Sender<()>,
exit_rx: oneshot::Receiver<()>,
) -> Result<ReadHalf<C>>
where
C: AsyncRead + Unpin,
{
let codec = ImapCodec::default();
let mut framed = FramedRead::new(conn, codec);
let mut greeting_tx = Some(greeting_tx);
let mut curr_cmd: Option<Command2> = None;
let mut exit_rx = exit_rx.map_err(|_| ()).shared();
loop {
// let mut next_line = String::new();
// let read_fut = reader.read_line(&mut next_line).fuse();
let read_fut = framed.next().fuse();
pin_mut!(read_fut);
// only listen for a new command if there isn't one already
let mut cmd_fut = if let Some(_) = curr_cmd {
// if there is one, just make a future that never resolves so it'll always pick the
// other options in the select.
future::pending().boxed().fuse()
} else {
cmd_rx.recv().boxed().fuse()
};
select! {
_ = exit_rx => {
debug!("exiting the loop");
break;
}
// read a command from the command list
cmd = cmd_fut => {
if curr_cmd.is_none() {
if let Some((ref tag, ref cmd, _)) = cmd {
let cmd_str = format!("{} {}\r\n", tag, cmd);
write_tx.send(cmd_str);
}
curr_cmd = cmd;
}
}
// got a response from the server connection
resp = read_fut => {
let resp = match resp {
Some(Ok(v)) => v,
a => { error!("failed: {:?}", a); bail!("fuck"); },
};
trace!("S>>>C: {:?}", resp);
// if this is the very first response, then it's a greeting
if let Some(greeting_tx) = greeting_tx.take() {
greeting_tx.send(()).unwrap();
}
if let Response::Done(_) = resp {
// since this is the DONE message, clear curr_cmd so another one can be sent
if let Some((_, _, cmd_tx)) = curr_cmd.take() {
let res = cmd_tx.send(resp);
// debug!("res0: {:?}", res);
}
} else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() {
// we got a response from the server for this command, so send it over the
// channel
// debug!("sending {:?} to tag {}", resp, tag);
let res = cmd_tx.send(resp);
// debug!("res1: {:?}", res);
}
}
}
}
let conn = framed.into_inner();
Ok(conn)
}

13
imap/src/lib.rs Normal file
View file

@ -0,0 +1,13 @@
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate derive_builder;
mod auth;
mod client;
mod codec;
mod inner;