diff --git a/Cargo.lock b/Cargo.lock index 6f72b74..bf1fdf2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,23 @@ version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "595d3cfa7a60d4555cb5067b99f07142a08ea778de5cf993f7b75c7d8fabc486" +[[package]] +name = "arrayvec" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" + +[[package]] +name = "async-trait" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atty" version = "0.2.14" @@ -25,18 +42,48 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "base64" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" + [[package]] name = "bitflags" version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" +[[package]] +name = "bitvec" +version = "0.19.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8942c8d352ae1838c9dda0b0ca2ab657696ef2232a20147cf1b30ae1a9cb4321" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + +[[package]] +name = "bumpalo" +version = "3.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" + [[package]] name = "bytes" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +[[package]] +name = "cc" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70cc2f62c6ce1868963827bd677764c62d07c3d9a3e1fb1177ee1a9ab199eb2" + [[package]] name = "cfg-if" version = "1.0.0" @@ -88,6 +135,84 @@ dependencies = [ "syn", ] +[[package]] +name = "darling" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f2c43f534ea4b0b049015d00269734195e6d3f0f6635cb692251aca6f9f8b3c" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e91455b86830a1c21799d94524df0845183fa55bafd9aa137b01c7d1065fa36" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29b5acf0dea37a7f66f7b25d2c5e93fd46f8f6968b1a5d7a3e02e97768afc95a" +dependencies = [ + "darling_core", + "quote", + "syn", +] + +[[package]] +name = "derive_builder" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d13202debe11181040ae9063d739fa32cfcaaebe2275fe387703460ae2365b30" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66e616858f6187ed828df7c64a6d71720d83767a7f19740b2d1b6fe6327b36e5" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "derive_builder_macro" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58a94ace95092c5acb1e97a7e846b310cfbd499652f72297da7493f618a98d73" +dependencies = [ + "derive_builder_core", + "syn", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "funty" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fed34cd105917e91daa4da6b3728c47b068749d6a62c59811f06ed2ac71d9da7" + [[package]] name = "futures" version = "0.3.16" @@ -182,6 +307,25 @@ dependencies = [ "slab", ] +[[package]] +name = "h2" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.11.2" @@ -206,6 +350,95 @@ dependencies = [ "libc", ] +[[package]] +name = "http" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60daa14be0e0786db0f03a9e57cb404c9d756eed2b6c62b9ea98ec5743ec75a9" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" + +[[package]] +name = "httpdate" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440" + +[[package]] +name = "hyper" +version = "0.14.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b61cf2d1aebcf6e6352c97b81dc2244ca29194be1b276f5d8ad5c6330fffb11" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + +[[package]] +name = "imap" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "derive_builder", + "futures", + "imap-proto", + "log", + "nom", + "tokio", + "tokio-rustls", + "tokio-util", + "webpki-roots", +] + +[[package]] +name = "imap-proto" +version = "0.14.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ad9b46a79efb6078e578ae04e51463d7c3e8767864687f7e63095b3cbefafbb" +dependencies = [ + "nom", +] + [[package]] name = "indexmap" version = "1.7.0" @@ -247,12 +480,40 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" + +[[package]] +name = "js-sys" +version = "0.3.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83bdfbace3a0e81a4253f73b49e960b053e396a11012cbd49b9b74d6a2b67062" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" +[[package]] +name = "lexical-core" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6607c62aa161d23d17a9072cc5da0be67cdfc89d3afb1e8d9c842bebc2525ffe" +dependencies = [ + "arrayvec", + "bitflags", + "cfg-if", + "ryu", + "static_assertions", +] + [[package]] name = "libc" version = "0.2.98" @@ -279,9 +540,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.4.0" +version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16bd47d9e329435e309c58469fe0791c2d0d1ba96ec0954152a5ae2b04387dc" +checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" [[package]] name = "mio" @@ -305,6 +566,19 @@ dependencies = [ "winapi", ] +[[package]] +name = "nom" +version = "6.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c5c51b9083a3c620fa67a2a635d1ce7d95b897e957d6b28ff9a5da960a103a6" +dependencies = [ + "bitvec", + "funty", + "lexical-core", + "memchr", + "version_check", +] + [[package]] name = "ntapi" version = "0.3.6" @@ -360,13 +634,17 @@ name = "panorama-daemon" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "clap", "futures", + "hyper", + "imap", "inotify", "log", "serde", "stderrlog", "tokio", + "tokio-rustls", "toml", "xdg", ] @@ -462,6 +740,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radium" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "941ba9d78d8e2f7ce474c015eea4d9c6d25b6a3327f9832ee29a4de27f91bbb8" + [[package]] name = "redox_syscall" version = "0.2.9" @@ -471,12 +755,56 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7" +dependencies = [ + "base64", + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + [[package]] name = "scopeguard" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "serde" version = "1.0.126" @@ -518,6 +846,18 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e" +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stderrlog" version = "0.5.1" @@ -548,6 +888,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "termcolor" version = "1.1.2" @@ -617,6 +963,31 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + +[[package]] +name = "tokio-util" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "toml" version = "0.5.8" @@ -626,6 +997,38 @@ dependencies = [ "serde", ] +[[package]] +name = "tower-service" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" + +[[package]] +name = "tracing" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" +dependencies = [ + "cfg-if", + "pin-project-lite", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "unicode-segmentation" version = "1.8.0" @@ -644,6 +1047,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ccb82d61f80a663efe1f787a51b16b5a51e3314d6ac365b08639f52387b33f3" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "vec_map" version = "0.8.2" @@ -656,12 +1065,105 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d54ee1d4ed486f78874278e63e4069fc1ab9f6a18ca492076ffb90c5eb2997fd" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b33f6a0694ccfea53d94db8b2ed1c3a8a4c86dd936b13b9f0a15ec4a451b900" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "088169ca61430fe1e58b8096c24975251700e7b1f6fd91cc9d59b04fb9b18bd4" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be2241542ff3d9f241f5e2cb6dd09b37efe786df8851c54957683a49f0987a97" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7cff876b8f18eed75a66cf49b65e7f967cb354a7aa16003fb55dbfd25b44b4f" + +[[package]] +name = "web-sys" +version = "0.3.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e828417b379f3df7111d3a2a9e5753706cae29c41f7c4029ee9fd77f3e09e582" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.21.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aabe153544e473b775453675851ecc86863d2a81d786d741f6b76778f2a48940" +dependencies = [ + "webpki", +] + [[package]] name = "winapi" version = "0.3.9" @@ -693,6 +1195,12 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "wyz" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214" + [[package]] name = "xdg" version = "2.2.0" diff --git a/Cargo.toml b/Cargo.toml index 7f470b0..3ef24c3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] members = [ "daemon", + "imap", ] diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index 32ad41f..09c4bbb 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -5,12 +5,16 @@ edition = "2018" [dependencies] anyhow = "1.0.42" -serde = { version = "1.0.126", features = ["derive"] } -tokio = { version = "1.9.0", features = ["full"] } +async-trait = "0.1.50" clap = "3.0.0-beta.2" futures = "0.3.16" +hyper = { version = "0.14.11", features = ["server", "http2", "stream"] } +imap = { path = "../imap" } inotify = { version = "0.9.3", features = ["stream"] } -xdg = "2.2.0" log = "0.4.14" -toml = "0.5.8" +serde = { version = "1.0.126", features = ["derive"] } stderrlog = "0.5.1" +tokio = { version = "1.9.0", features = ["full"] } +tokio-rustls = "0.22.0" +toml = "0.5.8" +xdg = "2.2.0" \ No newline at end of file diff --git a/daemon/src/imap.rs b/daemon/src/imap.rs deleted file mode 100644 index 0d284f8..0000000 --- a/daemon/src/imap.rs +++ /dev/null @@ -1 +0,0 @@ -pub struct ImapClient {} diff --git a/daemon/src/mail/mod.rs b/daemon/src/mail/mod.rs new file mode 100644 index 0000000..1de94c1 --- /dev/null +++ b/daemon/src/mail/mod.rs @@ -0,0 +1,158 @@ +/// The main function for the IMAP syncing thread +pub async fn sync_main( + acct_name: impl AsRef, + acct: MailAccountConfig, + mail2ui_tx: UnboundedSender, + mail_store: MailStore, +) -> Result<()> { + let acct_name = acct_name.as_ref().to_owned(); + // loop ensures that the connection is retried after it dies + loop { + let builder: ClientConfig = ClientBuilder::default() + .hostname(acct.imap.server.clone()) + .port(acct.imap.port) + .tls(matches!(acct.imap.tls, TlsMethod::On)) + .build() + .map_err(|err| anyhow!("err: {}", err))?; + debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port); + let unauth = builder.open().await?; + let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) { + debug!("attempting to upgrade"); + let client = unauth.upgrade().await?; + debug!("upgrade successful"); + client + } else { + unauth + }; + debug!("preparing to auth"); + // check if the authentication method is supported + let mut authed = match &acct.imap.auth { + ImapAuth::Plain { username, password } => { + let auth = auth::Plain { + username: username.clone(), + password: password.clone(), + }; + auth.perform_auth(unauth).await? + } + }; + debug!("authentication successful!"); + let folder_list = authed.list().await?; + let _ = mail2ui_tx.send(MailEvent::FolderList( + acct_name.clone(), + folder_list.clone(), + )); + debug!("mailbox list: {:?}", folder_list); + for folder in folder_list.iter() { + debug!("folder: {}", folder); + let select = authed.select(folder).await?; + debug!("select response: {:?}", select); + if let (Some(exists), Some(uidvalidity)) = (select.exists, select.uid_validity) { + // figure out which uids don't exist locally yet + let new_uids = stream::iter(1..exists).map(Ok).try_filter_map(|uid| { + mail_store.try_identify_email(&acct_name, &folder, uid, uidvalidity, None) + // invert the option to only select uids that haven't been downloaded + .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None)) + .map_err(|err| err.context("error checking if the email is already downloaded [try_identify_email]")) + }).try_collect::>().await?; + if !new_uids.is_empty() { + debug!("fetching uids {:?}", new_uids); + let fetched = authed + .uid_fetch(&new_uids, FetchItems::PanoramaAll) + .await + .context("error fetching uids")?; + fetched + .map(Ok) + .try_for_each_concurrent(None, |(uid, attrs)| { + mail_store.store_email(&acct_name, &folder, uid, uidvalidity, attrs) + }) + .await + .context("error during fetch-store")?; + } + } + } + tokio::time::sleep(std::time::Duration::from_secs(50)).await; + // TODO: remove this later + continue; + // let's just select INBOX for now, maybe have a config for default mailbox later? + debug!("selecting the INBOX mailbox"); + let select = authed.select("INBOX").await?; + debug!("select result: {:?}", select); + loop { + let message_uids = authed.uid_search().await?; + let message_uids = message_uids.into_iter().take(30).collect::>(); + let _ = mail2ui_tx.send(MailEvent::MessageUids( + acct_name.clone(), + message_uids.clone(), + )); + // TODO: make this happen concurrently with the main loop? + let mut message_list = authed + .uid_fetch(&message_uids, FetchItems::All) + .await + .unwrap(); + while let Some((uid, attrs)) = message_list.next().await { + let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs); + // TODO: probably odn't care about this? + let _ = mail2ui_tx.send(evt); + } + // check if IDLE is supported + let supports_idle = authed.has_capability("IDLE").await?; + if supports_idle { + let mut idle_stream = authed.idle().await?; + loop { + let evt = match idle_stream.next().await { + Some(v) => v, + None => break, + }; + debug!("got an event: {:?}", evt); + match evt { + Response::MailboxData(MailboxData::Exists(uid)) => { + debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid); + // send DONE to stop the idle + std::mem::drop(idle_stream); + let handle = Notification::new() + .summary("New Email") + .body("holy Shit,") + .icon("firefox") + .timeout(Timeout::Milliseconds(6000)) + .show()?; + let message_uids = authed.uid_search().await?; + let message_uids = + message_uids.into_iter().take(20).collect::>(); + let _ = mail2ui_tx.send(MailEvent::MessageUids( + acct_name.clone(), + message_uids.clone(), + )); + // TODO: make this happen concurrently with the main loop? + let mut message_list = authed + .uid_fetch(&message_uids, FetchItems::All) + .await + .unwrap(); + while let Some((uid, attrs)) = message_list.next().await { + let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs); + // debug!("sent {:?}", evt); + mail2ui_tx.send(evt); + } + idle_stream = authed.idle().await?; + } + _ => {} + } + } + } else { + loop { + tokio::time::sleep(std::time::Duration::from_secs(20)).await; + debug!("heartbeat"); + } + } + if false { + break; + } + } + + // wait a bit so we're not hitting the server really fast if the fail happens + // early on + // + // TODO: some kind of smart exponential backoff that considers some time + // threshold to be a failing case? + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + } +} diff --git a/daemon/src/main.rs b/daemon/src/main.rs index ab4043f..1e5ccdd 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -6,14 +6,18 @@ extern crate log; extern crate futures; mod config; -mod imap; +mod mail; use anyhow::Result; use clap::Clap; -use futures::future::FutureExt; +use futures::future::{ + select, + Either::{Left, Right}, + FutureExt, +}; use tokio::sync::oneshot; -use crate::config::Config; +use crate::config::{Config, MailAccountConfig, TlsMethod}; type ExitListener = oneshot::Receiver<()>; @@ -31,7 +35,6 @@ struct Options { #[tokio::main] async fn main() -> Result<()> { let opt = Options::parse(); - println!("{:?}", opt); stderrlog::new() .module(module_path!()) @@ -53,14 +56,54 @@ async fn main() -> Result<()> { } async fn run_with_config(config: Config, exit: ExitListener) -> Result<()> { - println!("run with config: {:?}", config); + debug!("new config"); + let mut notify_mail_threads = Vec::new(); + for (account_name, account) in config.mail_accounts { + let (exit_tx, exit_rx) = oneshot::channel(); + tokio::spawn(run_single_mail_account(account_name, account, exit_rx)); + notify_mail_threads.push(exit_tx); + } + + exit.await?; + for exit_tx in notify_mail_threads { + let _ = exit_tx.send(()); + } + + Ok(()) +} + +async fn run_single_mail_account( + account_name: String, + account: MailAccountConfig, + exit: ExitListener, +) -> Result<()> { + debug!("connecting to account {}", account_name); + + let imap_cookie = ImapClient::open( + &account.imap.server, + account.imap.port, + matches!(account.imap.tls, TlsMethod::On), + ); + + pin_mut!(imap_cookie); + pin_mut!(exit); + + let (imap, exit) = match select(imap_cookie, exit).await { + Left(res) => res, + Right(_) => return Ok(()), + }; + + debug!("connected to {}", account.imap.server); + let imap = imap?; let mut exit = exit.fuse(); + loop { select! { _ = exit => break, } } + debug!("disconnecting from account {}", account_name); Ok(()) } diff --git a/imap/Cargo.toml b/imap/Cargo.toml new file mode 100644 index 0000000..65ff46c --- /dev/null +++ b/imap/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "imap" +version = "0.1.0" +edition = "2018" + +[features] +rfc2177-idle = [] + +[dependencies] +anyhow = "1.0.42" +bytes = "1.0.1" +derive_builder = "0.10.2" +futures = "0.3.16" +imap-proto = "0.14.3" +log = "0.4.14" +nom = "6.2.1" +tokio = { version = "1.9.0", features = ["full"] } +tokio-rustls = "0.22.0" +tokio-util = { version = "0.6.7", features = ["codec"] } +webpki-roots = "0.21.1" diff --git a/imap/src/auth.rs b/imap/src/auth.rs new file mode 100644 index 0000000..4647b42 --- /dev/null +++ b/imap/src/auth.rs @@ -0,0 +1,59 @@ +use anyhow::Result; + +use crate::command::Command; +use crate::response::{Response, ResponseDone, Status}; + +use super::{ClientAuthenticated, ClientUnauthenticated}; + +#[async_trait] +pub trait Auth { + /// Performs authentication, consuming the client + // TODO: return the unauthed client if failed? + async fn perform_auth(self, client: ClientUnauthenticated) -> Result; + + /// Converts the wrappers around the client once the authentication has happened. Should only + /// be called by the `perform_auth` function. + fn convert_client(client: ClientUnauthenticated) -> ClientAuthenticated { + match client { + ClientUnauthenticated::Encrypted(e) => ClientAuthenticated::Encrypted(e), + ClientUnauthenticated::Unencrypted(e) => ClientAuthenticated::Unencrypted(e), + } + } +} + +pub struct Plain { + pub username: String, + pub password: String, +} + +#[async_trait] +impl Auth for Plain { + async fn perform_auth(self, mut client: ClientUnauthenticated) -> Result { + let command = Command::Login { + username: self.username, + password: self.password, + }; + + let result = client.execute(command).await?; + let done = result.done().await?; + + assert!(done.is_some()); + let done = done.unwrap(); + + if done.status != Status::Ok { + bail!("unable to login: {:?}", done); + } + + // if !matches!( + // result, + // Response::Done(ResponseDone { + // status: Status::Ok, + // .. + // }) + // ) { + // bail!("unable to login: {:?}", result); + // } + + Ok(::convert_client(client)) + } +} diff --git a/imap/src/client.rs b/imap/src/client.rs new file mode 100644 index 0000000..9cbc9a9 --- /dev/null +++ b/imap/src/client.rs @@ -0,0 +1,284 @@ +use std::borrow::Cow; +use std::sync::Arc; + +use anyhow::Result; +use futures::{ + future::{self, FutureExt}, + stream::{Stream, StreamExt}, +}; +use tokio::{net::TcpStream, sync::mpsc}; +use tokio_rustls::{ + client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, +}; + +pub use super::inner::{Client, ResponseStream}; + +/// Struct used to start building the config for a client. +/// +/// Call [`.build`][1] to _build_ the config, then run [`.open`][2] to actually start opening +/// the connection to the server. +/// +/// [1]: self::ClientConfigBuilder::build +/// [2]: self::ClientConfig::open +pub type ClientBuilder = ClientConfigBuilder; + +/// An IMAP client that hasn't been connected yet. +#[derive(Builder, Clone, Debug)] +pub struct ClientConfig { + /// The hostname of the IMAP server. If using TLS, must be an address + hostname: String, + + /// The port of the IMAP server. + port: u16, + + /// Whether or not the client is using an encrypted stream. + /// + /// To upgrade the connection later, use the upgrade method. + tls: bool, +} + +impl ClientConfig { + pub async fn open(self) -> Result { + let hostname = self.hostname.as_ref(); + let port = self.port; + let conn = TcpStream::connect((hostname, port)).await?; + + if self.tls { + let mut tls_config = RustlsConfig::new(); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let tls_config = TlsConnector::from(Arc::new(tls_config)); + let dnsname = DNSNameRef::try_from_ascii_str(hostname).unwrap(); + let conn = tls_config.connect(dnsname, conn).await?; + + let mut inner = Client::new(conn, self); + inner.wait_for_greeting().await?; + return Ok(ClientUnauthenticated::Encrypted(inner)); + } else { + let mut inner = Client::new(conn, self); + inner.wait_for_greeting().await?; + return Ok(ClientUnauthenticated::Unencrypted(inner)); + } + } +} + +pub enum ClientUnauthenticated { + Encrypted(Client>), + Unencrypted(Client), +} + +impl ClientUnauthenticated { + pub async fn upgrade(self) -> Result { + match self { + // this is a no-op, we don't need to upgrade + ClientUnauthenticated::Encrypted(_) => Ok(self), + ClientUnauthenticated::Unencrypted(e) => { + Ok(ClientUnauthenticated::Encrypted(e.upgrade().await?)) + } + } + } + + /// Exposing low-level execute + async fn execute(&mut self, cmd: Command) -> Result { + match self { + ClientUnauthenticated::Encrypted(e) => e.execute(cmd).await, + ClientUnauthenticated::Unencrypted(e) => e.execute(cmd).await, + } + } + + /// Checks if the server that the client is talking to has support for the given capability. + pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { + match self { + ClientUnauthenticated::Encrypted(e) => e.has_capability(cap).await, + ClientUnauthenticated::Unencrypted(e) => e.has_capability(cap).await, + } + } +} + +pub enum ClientAuthenticated { + Encrypted(Client>), + Unencrypted(Client), +} + +impl ClientAuthenticated { + /// Exposing low-level execute + async fn execute(&mut self, cmd: Command) -> Result { + match self { + ClientAuthenticated::Encrypted(e) => e.execute(cmd).await, + ClientAuthenticated::Unencrypted(e) => e.execute(cmd).await, + } + } + + fn sender(&self) -> mpsc::UnboundedSender { + match self { + ClientAuthenticated::Encrypted(e) => e.write_tx.clone(), + ClientAuthenticated::Unencrypted(e) => e.write_tx.clone(), + } + } + + /// Checks if the server that the client is talking to has support for the given capability. + pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { + match self { + ClientAuthenticated::Encrypted(e) => e.has_capability(cap).await, + ClientAuthenticated::Unencrypted(e) => e.has_capability(cap).await, + } + } + + /// Runs the LIST command + pub async fn list(&mut self) -> Result> { + let cmd = Command::List { + reference: "".to_owned(), + mailbox: "*".to_owned(), + }; + + let res = self.execute(cmd).await?; + let (_, data) = res.wait().await?; + + let mut folders = Vec::new(); + for resp in data { + if let Response::MailboxData(MailboxData::List { name, .. }) = resp { + folders.push(name.to_owned()); + } + } + + Ok(folders) + } + + /// Runs the SELECT command + pub async fn select(&mut self, mailbox: impl AsRef) -> Result { + let cmd = Command::Select { + mailbox: mailbox.as_ref().to_owned(), + }; + + let stream = self.execute(cmd).await?; + let (_, data) = stream.wait().await?; + + let mut select = SelectResponse::default(); + for resp in data { + match resp { + Response::MailboxData(MailboxData::Flags(flags)) => select.flags = flags, + Response::MailboxData(MailboxData::Exists(exists)) => select.exists = Some(exists), + Response::MailboxData(MailboxData::Recent(recent)) => select.recent = Some(recent), + Response::Data(ResponseData { + status: Status::Ok, + code: Some(code), + .. + }) => match code { + ResponseCode::Unseen(value) => select.unseen = Some(value), + ResponseCode::UidNext(value) => select.uid_next = Some(value), + ResponseCode::UidValidity(value) => select.uid_validity = Some(value), + _ => {} + }, + _ => {} + } + } + + Ok(select) + } + + /// Runs the SEARCH command + pub async fn uid_search(&mut self) -> Result> { + let cmd = Command::UidSearch { + criteria: SearchCriteria::All, + }; + let stream = self.execute(cmd).await?; + let (_, data) = stream.wait().await?; + for resp in data { + if let Response::MailboxData(MailboxData::Search(uids)) = resp { + return Ok(uids); + } + } + bail!("could not find the SEARCH response") + } + + /// Runs the FETCH command + pub async fn fetch( + &mut self, + uids: &[u32], + items: FetchItems, + ) -> Result)>> { + let cmd = Command::Fetch { + uids: uids.to_vec(), + items, + }; + debug!("fetch: {}", cmd); + let stream = self.execute(cmd).await?; + // let (done, data) = stream.wait().await?; + Ok(stream.filter_map(|resp| match resp { + Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), + Response::Done(_) => future::ready(None).boxed(), + _ => future::pending().boxed(), + })) + } + + /// Runs the UID FETCH command + pub async fn uid_fetch( + &mut self, + uids: &[u32], + items: FetchItems, + ) -> Result)>> { + let cmd = Command::UidFetch { + uids: uids.to_vec(), + items, + }; + debug!("uid fetch: {}", cmd); + let stream = self.execute(cmd).await?; + // let (done, data) = stream.wait().await?; + Ok(stream.filter_map(|resp| match resp { + Response::Fetch(n, attrs) => future::ready(Some((n, attrs))).boxed(), + Response::Done(_) => future::ready(None).boxed(), + _ => future::pending().boxed(), + })) + } + + /// Runs the IDLE command + #[cfg(feature = "rfc2177-idle")] + #[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] + pub async fn idle(&mut self) -> Result { + let cmd = Command::Idle; + let stream = self.execute(cmd).await?; + let sender = self.sender(); + Ok(IdleToken { stream, sender }) + } +} + +#[derive(Debug, Default)] +pub struct SelectResponse<'a> { + pub flags: Vec>, + pub exists: Option, + pub recent: Option, + pub uid_next: Option, + pub uid_validity: Option, + pub unseen: Option, +} + +/// A token that represents an idling connection. +/// +/// Dropping this token indicates that the idling should be completed, and the DONE command will be +/// sent to the server as a result. +#[cfg(feature = "rfc2177-idle")] +#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] +pub struct IdleToken { + pub stream: ResponseStream, + sender: mpsc::UnboundedSender, +} + +#[cfg(feature = "rfc2177-idle")] +#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] +impl Drop for IdleToken { + fn drop(&mut self) { + // TODO: should ignore this? + self.sender.send(format!("DONE\r\n")).unwrap(); + } +} + +#[cfg(feature = "rfc2177-idle")] +#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177-idle")))] +impl Stream for IdleToken { + type Item = Response; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let stream = Pin::new(&mut self.stream); + Stream::poll_next(stream, cx) + } +} diff --git a/imap/src/codec.rs b/imap/src/codec.rs new file mode 100644 index 0000000..85f87f9 --- /dev/null +++ b/imap/src/codec.rs @@ -0,0 +1,90 @@ +use std::io; +use std::mem; + +use bytes::{BufMut, Bytes, BytesMut}; +use nom::{self, Needed}; +use tokio_util::codec::{Decoder, Encoder}; + +use imap_proto::types::{Request, RequestId, Response}; + +#[derive(Default)] +pub struct ImapCodec { + decode_need_message_bytes: usize, +} + +impl<'a> Decoder for ImapCodec { + type Item = ResponseData; + type Error = io::Error; + fn decode(&mut self, buf: &mut BytesMut) -> Result, io::Error> { + if self.decode_need_message_bytes > buf.len() { + return Ok(None); + } + + let (response, rsp_len) = match imap_proto::Response::from_bytes(buf) { + Ok((remaining, response)) => { + // This SHOULD be acceptable/safe: BytesMut storage memory is + // allocated on the heap and should not move. It will not be + // freed as long as we keep a reference alive, which we do + // by retaining a reference to the split buffer, below. + let response = unsafe { mem::transmute(response) }; + (response, buf.len() - remaining.len()) + } + Err(nom::Err::Incomplete(Needed::Size(min))) => { + self.decode_need_message_bytes = min.get(); + return Ok(None); + } + Err(nom::Err::Incomplete(_)) => { + return Ok(None); + } + Err(nom::Err::Error(nom::error::Error { code, .. })) + | Err(nom::Err::Failure(nom::error::Error { code, .. })) => { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("{:?} during parsing of {:?}", code, buf), + )); + } + }; + let raw = buf.split_to(rsp_len).freeze(); + self.decode_need_message_bytes = 0; + Ok(Some(ResponseData { raw, response })) + } +} + +impl<'a> Encoder<&'a Request<'a>> for ImapCodec { + type Error = io::Error; + fn encode(&mut self, msg: &Request, dst: &mut BytesMut) -> Result<(), io::Error> { + dst.put(&*msg.0); + dst.put_u8(b' '); + dst.put_slice(&*msg.1); + dst.put_slice(b"\r\n"); + Ok(()) + } +} + +#[derive(Debug)] +pub struct ResponseData { + raw: Bytes, + // This reference is really scoped to the lifetime of the `raw` + // member, but unfortunately Rust does not allow that yet. It + // is transmuted to `'static` by the `Decoder`, instead, and + // references returned to callers of `ResponseData` are limited + // to the lifetime of the `ResponseData` struct. + // + // `raw` is never mutated during the lifetime of `ResponseData`, + // and `Response` does not not implement any specific drop glue. + response: Response<'static>, +} + +impl ResponseData { + pub fn request_id(&self) -> Option<&RequestId> { + match self.response { + Response::Done { ref tag, .. } => Some(tag), + _ => None, + } + } + + #[allow(clippy::needless_lifetimes)] + pub fn parsed<'a>(&'a self) -> &'a Response<'a> { + &self.response + } +} diff --git a/imap/src/inner.rs b/imap/src/inner.rs new file mode 100644 index 0000000..880a61e --- /dev/null +++ b/imap/src/inner.rs @@ -0,0 +1,312 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use anyhow::Result; +use futures::{ + future::{self, FutureExt, TryFutureExt}, + stream::{Stream, StreamExt}, +}; +use imap_proto::{builders::command::Command, types::Response}; +use tokio::{ + io::{ + self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf, + }, + sync::{ + mpsc, + oneshot::{self, error::TryRecvError}, + }, + task::JoinHandle, +}; +use tokio_rustls::{ + client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector, +}; +use tokio_util::codec::FramedRead; + +use crate::codec::{ImapCodec, ResponseData}; + +pub const TAG_PREFIX: &str = "ptag"; +type Command2 = (String, Command, mpsc::UnboundedSender); + +pub struct Client { + ctr: usize, + config: ClientConfig, + // conn: WriteHalf, + pub(crate) write_tx: mpsc::UnboundedSender, + cmd_tx: mpsc::UnboundedSender, + greeting_rx: Option>, + writer_exit_tx: oneshot::Sender<()>, + writer_handle: JoinHandle>>, + listener_exit_tx: oneshot::Sender<()>, + listener_handle: JoinHandle>>, +} + +impl Client +where + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, +{ + pub fn new(conn: C, config: ClientConfig) -> Self { + let (read_half, mut write_half) = io::split(conn); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + let (greeting_tx, greeting_rx) = oneshot::channel(); + + let (writer_exit_tx, exit_rx) = oneshot::channel(); + let (write_tx, mut write_rx) = mpsc::unbounded_channel::(); + let writer_handle = tokio::spawn(write(write_half, write_rx, exit_rx).map_err(|err| { + error!("Help, the writer loop died: {}", err); + err + })); + + let (exit_tx, exit_rx) = oneshot::channel(); + let listener_handle = tokio::spawn( + listen(read_half, cmd_rx, write_tx.clone(), greeting_tx, exit_rx).map_err(|err| { + error!("Help, the listener loop died: {:?} {}", err, err); + err + }), + ); + + Client { + ctr: 0, + // conn: write_half, + config, + cmd_tx, + write_tx, + greeting_rx: Some(greeting_rx), + writer_exit_tx, + listener_exit_tx: exit_tx, + writer_handle, + listener_handle, + } + } + + pub async fn wait_for_greeting(&mut self) -> Result<()> { + if let Some(greeting_rx) = self.greeting_rx.take() { + greeting_rx.await?; + } + Ok(()) + } + + pub async fn execute(&mut self, cmd: Command) -> Result { + let id = self.ctr; + self.ctr += 1; + + let tag = format!("{}{}", TAG_PREFIX, id); + // let cmd_str = format!("{} {}\r\n", tag, cmd); + // self.write_tx.send(cmd_str); + // self.conn.write_all(cmd_str.as_bytes()).await?; + // self.conn.flush().await?; + + let (tx, rx) = mpsc::unbounded_channel(); + self.cmd_tx.send((tag, cmd, tx))?; + + let stream = ResponseStream { inner: rx }; + Ok(stream) + } + + pub async fn has_capability(&mut self, cap: impl AsRef) -> Result { + // TODO: cache capabilities if needed? + let cap = cap.as_ref(); + let cap = parse_capability(cap)?; + + let resp = self.execute(Command::Capability).await?; + let (_, data) = resp.wait().await?; + + for resp in data { + if let Response::Capabilities(caps) = resp { + return Ok(caps.contains(&cap)); + } + // debug!("cap: {:?}", resp); + } + + Ok(false) + } + + pub async fn upgrade(mut self) -> Result>> { + // TODO: make sure STARTTLS is in the capability list + if !self.has_capability("STARTTLS").await? { + bail!("server doesn't support this capability"); + } + + // first, send the STARTTLS command + let mut resp = self.execute(Command::Starttls).await?; + let resp = resp.next().await.unwrap(); + debug!("server response to starttls: {:?}", resp); + + debug!("sending exit for upgrade"); + // TODO: check that the channel is still open? + self.listener_exit_tx.send(()).unwrap(); + self.writer_exit_tx.send(()).unwrap(); + let (reader, writer) = future::join(self.listener_handle, self.writer_handle).await; + let reader = reader??; + let writer = writer??; + // let reader = self.listener_handle.await??; + // let writer = self.conn; + + let conn = reader.unsplit(writer); + let server_name = &self.config.hostname; + + let mut tls_config = RustlsConfig::new(); + tls_config + .root_store + .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); + let tls_config = TlsConnector::from(Arc::new(tls_config)); + let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap(); + let stream = tls_config.connect(dnsname, conn).await?; + debug!("upgraded, stream is using TLS now"); + + Ok(Client::new(stream, self.config)) + } +} + +pub struct ResponseStream { + pub(crate) inner: mpsc::UnboundedReceiver, +} + +impl ResponseStream { + /// Retrieves just the DONE item in the stream, discarding the rest + pub async fn done(mut self) -> Result> { + while let Some(resp) = self.inner.recv().await { + if let Response::Done(done) = resp { + return Ok(Some(done)); + } + } + Ok(None) + } + + /// Waits for the entire stream to finish, returning the DONE status and the stream + pub async fn wait(mut self) -> Result<(Option, Vec)> { + let mut done = None; + let mut vec = Vec::new(); + while let Some(resp) = self.inner.recv().await { + if let Response::Done(d) = resp { + done = Some(d); + break; + } else { + vec.push(resp); + } + } + Ok((done, vec)) + } +} + +impl Stream for ResponseStream { + type Item = ResponseData; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.inner.poll_recv(cx) + } +} + +#[allow(unreachable_code)] +async fn write( + mut conn: WriteHalf, + mut write_rx: mpsc::UnboundedReceiver, + exit_rx: oneshot::Receiver<()>, +) -> Result> +where + C: AsyncWrite + Unpin, +{ + let mut exit_rx = exit_rx.map_err(|_| ()).shared(); + loop { + let write_fut = write_rx.recv().fuse(); + pin_mut!(write_fut); + + select! { + _ = exit_rx => { + break; + } + + line = write_fut => { + if let Some(line) = line { + conn.write_all(line.as_bytes()).await?; + conn.flush().await?; + trace!("C>>>S: {:?}", line); + } + } + } + } + + Ok(conn) +} + +#[allow(unreachable_code)] +async fn listen( + conn: ReadHalf, + mut cmd_rx: mpsc::UnboundedReceiver, + mut write_tx: mpsc::UnboundedSender, + greeting_tx: oneshot::Sender<()>, + exit_rx: oneshot::Receiver<()>, +) -> Result> +where + C: AsyncRead + Unpin, +{ + let codec = ImapCodec::default(); + let mut framed = FramedRead::new(conn, codec); + let mut greeting_tx = Some(greeting_tx); + let mut curr_cmd: Option = None; + let mut exit_rx = exit_rx.map_err(|_| ()).shared(); + + loop { + // let mut next_line = String::new(); + // let read_fut = reader.read_line(&mut next_line).fuse(); + let read_fut = framed.next().fuse(); + pin_mut!(read_fut); + + // only listen for a new command if there isn't one already + let mut cmd_fut = if let Some(_) = curr_cmd { + // if there is one, just make a future that never resolves so it'll always pick the + // other options in the select. + future::pending().boxed().fuse() + } else { + cmd_rx.recv().boxed().fuse() + }; + + select! { + _ = exit_rx => { + debug!("exiting the loop"); + break; + } + + // read a command from the command list + cmd = cmd_fut => { + if curr_cmd.is_none() { + if let Some((ref tag, ref cmd, _)) = cmd { + let cmd_str = format!("{} {}\r\n", tag, cmd); + write_tx.send(cmd_str); + } + curr_cmd = cmd; + } + } + + // got a response from the server connection + resp = read_fut => { + let resp = match resp { + Some(Ok(v)) => v, + a => { error!("failed: {:?}", a); bail!("fuck"); }, + }; + trace!("S>>>C: {:?}", resp); + + // if this is the very first response, then it's a greeting + if let Some(greeting_tx) = greeting_tx.take() { + greeting_tx.send(()).unwrap(); + } + + if let Response::Done(_) = resp { + // since this is the DONE message, clear curr_cmd so another one can be sent + if let Some((_, _, cmd_tx)) = curr_cmd.take() { + let res = cmd_tx.send(resp); + // debug!("res0: {:?}", res); + } + } else if let Some((tag, cmd, cmd_tx)) = curr_cmd.as_mut() { + // we got a response from the server for this command, so send it over the + // channel + // debug!("sending {:?} to tag {}", resp, tag); + let res = cmd_tx.send(resp); + // debug!("res1: {:?}", res); + } + } + } + } + + let conn = framed.into_inner(); + Ok(conn) +} diff --git a/imap/src/lib.rs b/imap/src/lib.rs new file mode 100644 index 0000000..358f4a2 --- /dev/null +++ b/imap/src/lib.rs @@ -0,0 +1,13 @@ +#[macro_use] +extern crate anyhow; +#[macro_use] +extern crate log; +#[macro_use] +extern crate futures; +#[macro_use] +extern crate derive_builder; + +mod auth; +mod client; +mod codec; +mod inner;