add the daemon back

This commit is contained in:
Michael Zhang 2021-08-09 06:50:19 -05:00
parent e4bc3e4b98
commit f32dd58aac
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
10 changed files with 422 additions and 81 deletions

317
Cargo.lock generated
View file

@ -114,6 +114,38 @@ dependencies = [
"winapi",
]
[[package]]
name = "clap"
version = "3.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bd1061998a501ee7d4b6d449020df3266ca3124b941ec56cf2005c3779ca142"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"indexmap",
"lazy_static",
"os_str_bytes",
"strsim",
"termcolor",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap_derive"
version = "3.0.0-beta.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "370f715b81112975b1b69db93e0b56ea4cd4e5002ac43b2da8474106a54096a1"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "darling"
version = "0.12.4"
@ -308,6 +340,40 @@ dependencies = [
"slab",
]
[[package]]
name = "h2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "825343c4eef0b63f541f8903f395dc5beb362a979b5799a84062527ef1e37726"
dependencies = [
"bytes",
"fnv",
"futures-core",
"futures-sink",
"futures-util",
"http",
"indexmap",
"slab",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "hashbrown"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -317,12 +383,101 @@ dependencies = [
"libc",
]
[[package]]
name = "http"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
dependencies = [
"bytes",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c583b2979440c60be0821a6199eca73bc3c8dcd9d070d75ac726e2c6186e5"
dependencies = [
"bytes",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68"
[[package]]
name = "httpdate"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6456b8a6c8f33fee7d958fcd1b60d55b11940a79e63ae87013e6d22e26034440"
[[package]]
name = "hyper"
version = "0.14.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b61cf2d1aebcf6e6352c97b81dc2244ca29194be1b276f5d8ad5c6330fffb11"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "indexmap"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
dependencies = [
"autocfg",
"hashbrown",
]
[[package]]
name = "inotify"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b031475cb1b103ee221afb806a23d35e0570bf7271d7588762ceba8127ed43b3"
dependencies = [
"bitflags",
"futures-core",
"inotify-sys",
"libc",
"tokio",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "instant"
version = "0.1.10"
@ -332,6 +487,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
name = "js-sys"
version = "0.3.52"
@ -469,9 +630,35 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56"
[[package]]
name = "os_str_bytes"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afb2e1c3ee07430c2cf76151675e583e0f19985fa6efae47d6848a3e2c824f85"
[[package]]
name = "panorama-daemon"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"clap",
"futures",
"hyper",
"inotify",
"log",
"panorama-imap",
"serde",
"stderrlog",
"tokio",
"tokio-rustls",
"toml",
"xdg",
]
[[package]]
name = "panorama-imap"
version = "0.0.3"
version = "0.0.4"
dependencies = [
"anyhow",
"async-trait",
@ -528,6 +715,30 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -629,6 +840,26 @@ dependencies = [
"untrusted",
]
[[package]]
name = "serde"
version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f03b9878abf6d14e6779d3f24f07b2cfa90352cfec4acc5aab8f1ac7f146fae8"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.127"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a024926d3432516606328597e0f224a51355a493b49fdd67e9209187cbe55ecc"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -707,6 +938,15 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "textwrap"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "203008d98caf094106cfaba70acfed15e18ed3ddb7d94e49baec153a2b462789"
dependencies = [
"unicode-width",
]
[[package]]
name = "thread_local"
version = "1.0.1"
@ -783,6 +1023,59 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31142970826733df8241ef35dc040ef98c679ab14d7c3e54d827099b3acecaa"
dependencies = [
"serde",
]
[[package]]
name = "tower-service"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9ff14f98b1a4b289c6248a023c1c2fa1491062964e9fed67ab29c4e4da4a052"
dependencies = [
"lazy_static",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.2"
@ -795,12 +1088,28 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]]
name = "wasi"
version = "0.10.0+wasi-snapshot-preview1"
@ -926,3 +1235,9 @@ name = "wyz"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85e60b0d1b5f99db2556934e21937020776a5d31520bf169e851ac44e6420214"
[[package]]
name = "xdg"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d089681aa106a86fade1b0128fb5daf07d5867a509ab036d99988dec80429a57"

View file

@ -1,6 +1,6 @@
[workspace]
members = [
# "daemon",
"daemon",
"imap",
]

View file

@ -9,9 +9,9 @@ async-trait = "0.1.50"
clap = "3.0.0-beta.2"
futures = "0.3.16"
hyper = { version = "0.14.11", features = ["server", "http2", "stream"] }
imap = { path = "../imap" }
inotify = { version = "0.9.3", features = ["stream"] }
log = "0.4.14"
panorama-imap = { path = "../imap" }
serde = { version = "1.0.126", features = ["derive"] }
stderrlog = "0.5.1"
tokio = { version = "1.9.0", features = ["full"] }

View file

@ -11,8 +11,9 @@ use super::Config;
pub type ConfigWatcher = watch::Receiver<Config>;
/// Start the entire config watcher system, and return a [ConfigWatcher][self::ConfigWatcher],
/// which is a cloneable receiver of config update events.
/// Start the entire config watcher system, and return a
/// [ConfigWatcher][self::ConfigWatcher], which is a cloneable receiver of
/// config update events.
pub fn spawn_config_watcher_system() -> Result<(JoinHandle<()>, ConfigWatcher)> {
let mut inotify = Inotify::init()?;
let xdg = BaseDirectories::new()?;

View file

@ -1,101 +1,126 @@
#![allow(dead_code)]
mod store;
use anyhow::{Context, Result};
use futures::stream::{ StreamExt, };
use panorama_imap::{
client::{auth::Login, ConfigBuilder},
proto::{
command::FetchItems,
response::{MailboxData, Response},
},
};
use tokio::sync::mpsc::UnboundedSender;
use crate::config::{ImapAuth, MailAccountConfig, TlsMethod};
use self::store::MailStore;
pub enum MailEvent {}
/// The main function for the IMAP syncing thread
pub async fn sync_main(
acct_name: impl AsRef<str>,
acct: MailAccountConfig,
mail2ui_tx: UnboundedSender<MailEvent>,
mail_store: MailStore,
_mail2ui_tx: UnboundedSender<MailEvent>,
_mail_store: MailStore,
) -> Result<()> {
let acct_name = acct_name.as_ref().to_owned();
let _acct_name = acct_name.as_ref().to_owned();
// loop ensures that the connection is retried after it dies
loop {
let builder: ClientConfig = ClientBuilder::default()
let client = ConfigBuilder::default()
.hostname(acct.imap.server.clone())
.port(acct.imap.port)
.tls(matches!(acct.imap.tls, TlsMethod::On))
.build()
.open()
.await
.map_err(|err| anyhow!("err: {}", err))?;
debug!("connecting to {}:{}", &acct.imap.server, acct.imap.port);
let unauth = builder.open().await?;
debug!("connected to {}:{}", &acct.imap.server, acct.imap.port);
let unauth = if matches!(acct.imap.tls, TlsMethod::Starttls) {
debug!("attempting to upgrade");
let client = unauth.upgrade().await?;
let client = client.upgrade().await?;
debug!("upgrade successful");
client
} else {
unauth
client
};
debug!("preparing to auth");
// check if the authentication method is supported
let mut authed = match &acct.imap.auth {
ImapAuth::Plain { username, password } => {
let auth = auth::Plain {
let login = Login {
username: username.clone(),
password: password.clone(),
};
auth.perform_auth(unauth).await?
unauth.auth(login).await?
}
};
debug!("authentication successful!");
let folder_list = authed.list().await?;
let _ = mail2ui_tx.send(MailEvent::FolderList(
acct_name.clone(),
folder_list.clone(),
));
// let _ = mail2ui_tx.send(MailEvent::FolderList(
// acct_name.clone(),
// folder_list.clone(),
// ));
debug!("mailbox list: {:?}", folder_list);
for folder in folder_list.iter() {
debug!("folder: {}", folder);
let select = authed.select(folder).await?;
debug!("folder: {:?}", folder);
let select = authed.select("INBOX").await?;
debug!("select response: {:?}", select);
if let (Some(exists), Some(uidvalidity)) = (select.exists, select.uid_validity) {
if let (Some(_exists), Some(_uidvalidity)) = (select.exists, select.uid_validity) {
// figure out which uids don't exist locally yet
let new_uids = stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
mail_store.try_identify_email(&acct_name, &folder, uid, uidvalidity, None)
// invert the option to only select uids that haven't been downloaded
.map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
.map_err(|err| err.context("error checking if the email is already downloaded [try_identify_email]"))
}).try_collect::<Vec<_>>().await?;
let new_uids = vec![];
// let new_uids = stream::iter(1..exists).map(Ok).try_filter_map(|uid| {
// todo!()
// // mail_store.try_identify_email(&acct_name, &folder, uid, uidvalidity, None)
// // // invert the option to only select uids that haven't been downloaded
// // .map_ok(move |o| o.map_or_else(move || Some(uid), |v| None))
// // .map_err(|err| err.context("error checking if the email is already downloaded [try_identify_email]"))
// }).try_collect::<Vec<_>>().await?;
if !new_uids.is_empty() {
debug!("fetching uids {:?}", new_uids);
let fetched = authed
let _fetched = authed
.uid_fetch(&new_uids, FetchItems::PanoramaAll)
.await
.context("error fetching uids")?;
fetched
.map(Ok)
.try_for_each_concurrent(None, |(uid, attrs)| {
mail_store.store_email(&acct_name, &folder, uid, uidvalidity, attrs)
})
.await
.context("error during fetch-store")?;
// fetched
// .map(Ok)
// .try_for_each_concurrent(None, |(uid, attrs)| {
// mail_store.store_email(&acct_name, &folder, uid, uidvalidity, attrs)
// })
// .await
// .context("error during fetch-store")?;
}
}
}
tokio::time::sleep(std::time::Duration::from_secs(50)).await;
// TODO: remove this later
continue;
// let's just select INBOX for now, maybe have a config for default mailbox later?
// continue;
// let's just select INBOX for now, maybe have a config for default mailbox
// later?
debug!("selecting the INBOX mailbox");
let select = authed.select("INBOX").await?;
debug!("select result: {:?}", select);
loop {
let message_uids = authed.uid_search().await?;
let message_uids = message_uids.into_iter().take(30).collect::<Vec<_>>();
let _ = mail2ui_tx.send(MailEvent::MessageUids(
acct_name.clone(),
message_uids.clone(),
));
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(),
// message_uids.clone(),
// ));
// TODO: make this happen concurrently with the main loop?
let mut message_list = authed
.uid_fetch(&message_uids, FetchItems::All)
.await
.unwrap();
while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs);
// TODO: probably odn't care about this?
let _ = mail2ui_tx.send(evt);
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.clone(), uid,
// attrs); TODO: probably odn't care about this?
// let _ = mail2ui_tx.send(evt);
}
// check if IDLE is supported
let supports_idle = authed.has_capability("IDLE").await?;
let supports_idle = true; // authed.has_capability("IDLE").await?;
if supports_idle {
let mut idle_stream = authed.idle().await?;
loop {
@ -109,29 +134,31 @@ pub async fn sync_main(
debug!("NEW MESSAGE WITH UID {:?}, droping everything", uid);
// send DONE to stop the idle
std::mem::drop(idle_stream);
let handle = Notification::new()
.summary("New Email")
.body("holy Shit,")
.icon("firefox")
.timeout(Timeout::Milliseconds(6000))
.show()?;
// let handle = Notification::new()
// .summary("New Email")
// .body("TODO")
// .icon("firefox")
// .timeout(Timeout::Milliseconds(6000))
// .show()?;
let message_uids = authed.uid_search().await?;
let message_uids =
message_uids.into_iter().take(20).collect::<Vec<_>>();
let _ = mail2ui_tx.send(MailEvent::MessageUids(
acct_name.clone(),
message_uids.clone(),
));
// let _ = mail2ui_tx.send(MailEvent::MessageUids(
// acct_name.clone(),
// message_uids.clone(),
// ));
// TODO: make this happen concurrently with the main loop?
let mut message_list = authed
.uid_fetch(&message_uids, FetchItems::All)
.await
.unwrap();
while let Some((uid, attrs)) = message_list.next().await {
let evt = MailEvent::UpdateUid(acct_name.clone(), uid, attrs);
while let Some((_uid, _attrs)) = message_list.next().await {
// let evt = MailEvent::UpdateUid(acct_name.
// clone(), uid, attrs);
// debug!("sent {:?}", evt);
mail2ui_tx.send(evt);
// mail2ui_tx.send(evt);
}
idle_stream = authed.idle().await?;
}
_ => {}

1
daemon/src/mail/store.rs Normal file
View file

@ -0,0 +1 @@
pub struct MailStore;

View file

@ -1,6 +1,8 @@
#[macro_use]
extern crate serde;
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate log;
#[macro_use]
extern crate futures;
@ -15,13 +17,15 @@ use futures::future::{
Either::{Left, Right},
FutureExt,
};
use panorama_imap::client::ConfigBuilder;
use tokio::sync::oneshot;
use crate::config::{Config, MailAccountConfig, TlsMethod};
type ExitListener = oneshot::Receiver<()>;
/// The panorama daemon runs in the background and communicates with other panorama components over Unix sockets.
/// The panorama daemon runs in the background and communicates with other
/// panorama components over Unix sockets.
#[derive(Debug, Clap)]
struct Options {
// /// Config file path (defaults to XDG)
@ -80,11 +84,12 @@ async fn run_single_mail_account(
) -> Result<()> {
debug!("connecting to account {}", account_name);
let imap_cookie = ImapClient::open(
&account.imap.server,
account.imap.port,
matches!(account.imap.tls, TlsMethod::On),
);
// set up the connection
let mut builder = ConfigBuilder::default();
let imap_cookie = builder
.hostname(account.imap.server.clone())
.port(account.imap.port)
.tls(matches!(account.imap.tls, TlsMethod::On)).open();
pin_mut!(imap_cookie);
pin_mut!(exit);
@ -95,7 +100,7 @@ async fn run_single_mail_account(
};
debug!("connected to {}", account.imap.server);
let imap = imap?;
let _imap = imap?;
let mut exit = exit.fuse();
loop {

View file

@ -1,7 +1,7 @@
[package]
name = "panorama-imap"
description = "IMAP protocol implementation with high-level async client."
version = "0.0.3"
version = "0.0.4"
edition = "2018"
authors = ["Michael Zhang <mail@mzhang.io>"]
keywords = ["imap", "email", "parser"]
@ -12,11 +12,6 @@ documentation = "https://docs.rs/panorama-imap"
readme = "README.md"
workspace = ".."
[[bin]]
name = "mzhang-test"
path = "bin/mzhang_test.rs"
required-features = ["stderrlog"]
[features]
default = ["rfc2177", "rfc6154"]
low-level = []

1
imap/bin/.gitignore vendored
View file

@ -1 +0,0 @@
/mzhang_test.rs

View file

@ -236,12 +236,11 @@ impl ClientAuthenticated {
/// Runs the IDLE command
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
pub async fn idle<'a>(&'a mut self) -> Result<IdleToken<'a>> {
pub async fn idle(&mut self) -> Result<IdleToken> {
let cmd = Command::Idle;
let stream = self.execute(cmd).await?;
Ok(IdleToken {
stream,
_client: self,
})
}
}
@ -262,15 +261,14 @@ pub struct SelectResponse {
/// DONE command will be sent to the server as a result.
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
pub struct IdleToken<'a> {
pub struct IdleToken {
pub stream: ResponseStream,
// sender: mpsc::UnboundedSender<TaggedCommand>,
_client: &'a mut ClientAuthenticated,
}
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
impl<'a> Drop for IdleToken<'a> {
impl Drop for IdleToken {
fn drop(&mut self) {
// TODO: put this into a channel instead
// tokio::spawn(self.client.execute(Command::Done));
@ -279,7 +277,7 @@ impl<'a> Drop for IdleToken<'a> {
#[cfg(feature = "rfc2177")]
#[cfg_attr(docsrs, doc(cfg(feature = "rfc2177")))]
impl<'a> Stream for IdleToken<'a> {
impl Stream for IdleToken {
type Item = Response;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let stream = Pin::new(&mut self.stream);