2021-02-23 02:47:00 +00:00
|
|
|
use std::collections::{HashMap, HashSet, VecDeque};
|
|
|
|
use std::mem;
|
2021-02-20 05:03:33 +00:00
|
|
|
use std::pin::Pin;
|
|
|
|
use std::sync::Arc;
|
2021-02-21 13:42:40 +00:00
|
|
|
use std::task::{Context, Poll, Waker};
|
2021-02-20 05:03:33 +00:00
|
|
|
|
2021-02-26 06:03:23 +00:00
|
|
|
use anyhow::{Context as AnyhowContext, Error, Result};
|
|
|
|
use futures::{
|
|
|
|
future::{self, BoxFuture, Either, Future, FutureExt, TryFutureExt},
|
|
|
|
stream::{BoxStream, Stream, StreamExt, TryStream},
|
|
|
|
};
|
|
|
|
use genawaiter::{
|
|
|
|
sync::{gen, Gen},
|
|
|
|
yield_,
|
|
|
|
};
|
2021-02-23 02:47:00 +00:00
|
|
|
use parking_lot::{RwLock, RwLockWriteGuard};
|
2021-02-20 05:03:33 +00:00
|
|
|
use tokio::{
|
|
|
|
io::{
|
2021-02-22 20:43:09 +00:00
|
|
|
self, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader, ReadHalf, WriteHalf,
|
2021-02-20 05:03:33 +00:00
|
|
|
},
|
2021-02-26 06:03:23 +00:00
|
|
|
sync::{mpsc, oneshot},
|
2021-02-20 05:03:33 +00:00
|
|
|
task::JoinHandle,
|
|
|
|
};
|
2021-02-21 13:54:46 +00:00
|
|
|
use tokio_rustls::{
|
|
|
|
client::TlsStream, rustls::ClientConfig as RustlsConfig, webpki::DNSNameRef, TlsConnector,
|
|
|
|
};
|
2021-02-26 06:03:23 +00:00
|
|
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
2021-02-20 05:03:33 +00:00
|
|
|
|
|
|
|
use crate::command::Command;
|
2021-02-23 02:47:00 +00:00
|
|
|
use crate::parser::{parse_capability, parse_response};
|
2021-02-22 20:43:09 +00:00
|
|
|
use crate::response::{Capability, Response, ResponseCode, Status};
|
2021-02-21 13:42:40 +00:00
|
|
|
|
2021-02-21 13:54:46 +00:00
|
|
|
use super::ClientConfig;
|
2021-02-20 05:03:33 +00:00
|
|
|
|
2021-02-22 20:36:08 +00:00
|
|
|
pub type CapsLock = Arc<RwLock<Option<HashSet<Capability>>>>;
|
2021-02-26 06:03:23 +00:00
|
|
|
pub type ResponseFuture = Box<dyn Future<Output = Result<Response>> + Send + Unpin>;
|
|
|
|
pub type ResponseSender = mpsc::UnboundedSender<Response>;
|
|
|
|
pub type ResponseStream = mpsc::UnboundedReceiver<Response>;
|
|
|
|
type ResultQueue = Arc<RwLock<VecDeque<HandlerResult>>>;
|
2021-02-23 02:47:00 +00:00
|
|
|
pub type GreetingState = Arc<RwLock<(Option<Response>, Option<Waker>)>>;
|
2021-02-21 01:13:10 +00:00
|
|
|
pub const TAG_PREFIX: &str = "panorama";
|
2021-02-20 05:03:33 +00:00
|
|
|
|
2021-02-26 06:03:23 +00:00
|
|
|
struct HandlerResult {
|
|
|
|
id: usize,
|
|
|
|
end: Option<oneshot::Sender<Response>>,
|
|
|
|
sender: ResponseSender,
|
|
|
|
waker: Option<Waker>,
|
|
|
|
}
|
|
|
|
|
2021-02-21 13:42:40 +00:00
|
|
|
/// The lower-level Client struct, that is shared by all of the exported structs in the state machine.
|
2021-02-20 05:03:33 +00:00
|
|
|
pub struct Client<C> {
|
2021-02-21 13:54:46 +00:00
|
|
|
config: ClientConfig,
|
2021-02-26 06:03:23 +00:00
|
|
|
|
|
|
|
/// write half of the connection
|
2021-02-20 05:03:33 +00:00
|
|
|
conn: WriteHalf<C>,
|
|
|
|
|
2021-02-26 06:03:23 +00:00
|
|
|
/// counter for monotonically incrementing unique ids
|
2021-02-20 05:03:33 +00:00
|
|
|
id: usize,
|
2021-02-26 06:03:23 +00:00
|
|
|
|
|
|
|
results: ResultQueue,
|
2021-02-20 05:03:33 +00:00
|
|
|
|
2021-02-21 13:42:40 +00:00
|
|
|
/// cached set of capabilities
|
2021-02-22 09:06:40 +00:00
|
|
|
caps: CapsLock,
|
2021-02-21 13:42:40 +00:00
|
|
|
|
|
|
|
/// join handle for the listener thread
|
|
|
|
listener_handle: JoinHandle<Result<ReadHalf<C>>>,
|
|
|
|
|
|
|
|
/// used for telling the listener thread to stop and return the read half
|
|
|
|
exit_tx: mpsc::Sender<()>,
|
|
|
|
|
|
|
|
/// used for receiving the greeting
|
2021-02-21 13:54:46 +00:00
|
|
|
greeting: GreetingState,
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<C> Client<C>
|
|
|
|
where
|
|
|
|
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
|
|
|
{
|
|
|
|
/// Creates a new client that wraps a connection
|
2021-02-21 13:54:46 +00:00
|
|
|
pub fn new(conn: C, config: ClientConfig) -> Self {
|
2021-02-20 05:03:33 +00:00
|
|
|
let (read_half, write_half) = io::split(conn);
|
2021-02-23 02:47:00 +00:00
|
|
|
let results = Arc::new(RwLock::new(VecDeque::new()));
|
2021-02-21 13:42:40 +00:00
|
|
|
let (exit_tx, exit_rx) = mpsc::channel(1);
|
2021-02-23 02:47:00 +00:00
|
|
|
let greeting = Arc::new(RwLock::new((None, None)));
|
2021-02-22 09:06:40 +00:00
|
|
|
let caps: CapsLock = Arc::new(RwLock::new(None));
|
|
|
|
|
|
|
|
let listener_handle = tokio::spawn(listen(
|
2021-02-21 13:42:40 +00:00
|
|
|
read_half,
|
2021-02-22 09:06:40 +00:00
|
|
|
caps.clone(),
|
2021-02-21 13:42:40 +00:00
|
|
|
results.clone(),
|
|
|
|
exit_rx,
|
|
|
|
greeting.clone(),
|
|
|
|
));
|
2021-02-20 05:03:33 +00:00
|
|
|
|
|
|
|
Client {
|
2021-02-21 13:42:40 +00:00
|
|
|
config,
|
2021-02-20 05:03:33 +00:00
|
|
|
conn: write_half,
|
|
|
|
id: 0,
|
2021-02-20 07:30:58 +00:00
|
|
|
results,
|
2021-02-22 09:06:40 +00:00
|
|
|
listener_handle,
|
|
|
|
caps,
|
2021-02-21 13:42:40 +00:00
|
|
|
exit_tx,
|
|
|
|
greeting,
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-21 13:54:46 +00:00
|
|
|
/// Returns a future that doesn't resolve until we receive a greeting from the server.
|
|
|
|
pub fn wait_for_greeting(&self) -> GreetingWaiter {
|
2021-02-21 13:42:40 +00:00
|
|
|
debug!("waiting for greeting");
|
2021-02-21 13:54:46 +00:00
|
|
|
GreetingWaiter(self.greeting.clone())
|
2021-02-21 13:42:40 +00:00
|
|
|
}
|
|
|
|
|
2021-02-20 05:03:33 +00:00
|
|
|
/// Sends a command to the server and returns a handle to retrieve the result
|
2021-02-26 06:03:23 +00:00
|
|
|
pub async fn execute(&mut self, cmd: Command) -> Result<(ResponseFuture, ResponseStream)> {
|
|
|
|
// debug!("executing command {:?}", cmd);
|
2021-02-20 05:03:33 +00:00
|
|
|
let id = self.id;
|
|
|
|
self.id += 1;
|
2021-02-26 06:03:23 +00:00
|
|
|
|
|
|
|
// create a channel for sending the final response
|
|
|
|
let (end_tx, end_rx) = oneshot::channel();
|
|
|
|
|
|
|
|
// create a channel for sending responses for this particular client call
|
|
|
|
// this should queue up responses
|
|
|
|
let (tx, rx) = mpsc::unbounded_channel();
|
|
|
|
|
2021-02-20 05:03:33 +00:00
|
|
|
{
|
2021-02-20 07:30:58 +00:00
|
|
|
let mut handlers = self.results.write();
|
2021-02-26 06:03:23 +00:00
|
|
|
handlers.push_back(HandlerResult {
|
|
|
|
id,
|
|
|
|
end: Some(end_tx),
|
|
|
|
sender: tx,
|
|
|
|
waker: None,
|
|
|
|
});
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
|
|
|
|
2021-02-21 01:13:10 +00:00
|
|
|
let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd);
|
2021-02-24 12:43:50 +00:00
|
|
|
// debug!("[{}] writing to socket: {:?}", id, cmd_str);
|
2021-02-20 05:03:33 +00:00
|
|
|
self.conn.write_all(cmd_str.as_bytes()).await?;
|
2021-02-21 13:48:11 +00:00
|
|
|
self.conn.flush().await?;
|
2021-02-24 12:43:50 +00:00
|
|
|
// debug!("[{}] written.", id);
|
2021-02-26 06:03:23 +00:00
|
|
|
// let resp = ExecWaiter(self, id, false).await;
|
2021-02-23 02:47:00 +00:00
|
|
|
// let resp = {
|
|
|
|
// let mut handlers = self.results.write();
|
|
|
|
// handlers.remove(&id).unwrap().0.unwrap()
|
|
|
|
// };
|
2021-02-26 06:03:23 +00:00
|
|
|
|
|
|
|
// let resp = end_rx.await?;
|
|
|
|
|
|
|
|
let q = self.results.clone();
|
|
|
|
// let end = Box::new(end_rx.map_err(|err| Error::from).map(move |resp| resp));
|
2021-02-26 06:26:37 +00:00
|
|
|
let end = Box::new(end_rx.map_err(Error::from).map(move |resp| {
|
2021-02-26 06:03:23 +00:00
|
|
|
// pop the first entry from the list
|
|
|
|
let mut results = q.write();
|
|
|
|
results.pop_front();
|
|
|
|
resp
|
|
|
|
}));
|
|
|
|
|
|
|
|
Ok((end, rx))
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Executes the CAPABILITY command
|
2021-02-23 02:47:00 +00:00
|
|
|
pub async fn capabilities(&mut self, force: bool) -> Result<()> {
|
|
|
|
{
|
2021-02-23 04:01:39 +00:00
|
|
|
let caps = self.caps.read();
|
2021-02-23 02:47:00 +00:00
|
|
|
if caps.is_some() && !force {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-20 05:03:33 +00:00
|
|
|
let cmd = Command::Capability;
|
2021-02-26 06:03:23 +00:00
|
|
|
// debug!("sending: {:?} {:?}", cmd, cmd.to_string());
|
2021-02-23 02:47:00 +00:00
|
|
|
let (result, intermediate) = self
|
2021-02-21 13:42:40 +00:00
|
|
|
.execute(cmd)
|
|
|
|
.await
|
|
|
|
.context("error executing CAPABILITY command")?;
|
2021-02-26 06:03:23 +00:00
|
|
|
let result = result.await?;
|
2021-02-21 14:59:27 +00:00
|
|
|
debug!("cap resp: {:?}", result);
|
2021-02-23 02:47:00 +00:00
|
|
|
|
2021-02-26 06:03:23 +00:00
|
|
|
if let Some(Response::Capabilities(new_caps)) = UnboundedReceiverStream::new(intermediate)
|
|
|
|
.filter(|resp| future::ready(matches!(resp, Response::Capabilities(_))))
|
|
|
|
.next()
|
|
|
|
.await
|
2021-02-23 02:47:00 +00:00
|
|
|
{
|
2021-02-23 04:01:39 +00:00
|
|
|
let mut caps = self.caps.write();
|
2021-02-23 02:47:00 +00:00
|
|
|
*caps = Some(new_caps.iter().cloned().collect());
|
|
|
|
}
|
|
|
|
|
2021-02-20 07:30:58 +00:00
|
|
|
Ok(())
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
2021-02-21 13:42:40 +00:00
|
|
|
|
|
|
|
/// Attempts to upgrade this connection using STARTTLS
|
|
|
|
pub async fn upgrade(mut self) -> Result<Client<TlsStream<C>>> {
|
|
|
|
// TODO: make sure STARTTLS is in the capability list
|
2021-02-22 20:36:08 +00:00
|
|
|
if !self.has_capability("STARTTLS").await? {
|
|
|
|
bail!("server doesn't support this capability");
|
|
|
|
}
|
|
|
|
|
2021-02-21 13:42:40 +00:00
|
|
|
// first, send the STARTTLS command
|
2021-02-26 06:03:23 +00:00
|
|
|
let (resp, _) = self.execute(Command::Starttls).await?;
|
|
|
|
let resp = resp.await?;
|
2021-02-21 13:42:40 +00:00
|
|
|
debug!("server response to starttls: {:?}", resp);
|
|
|
|
|
2021-02-22 09:06:40 +00:00
|
|
|
debug!("sending exit for upgrade");
|
2021-02-21 13:42:40 +00:00
|
|
|
self.exit_tx.send(()).await?;
|
|
|
|
let reader = self.listener_handle.await??;
|
|
|
|
let writer = self.conn;
|
|
|
|
|
|
|
|
let conn = reader.unsplit(writer);
|
|
|
|
|
|
|
|
let server_name = &self.config.hostname;
|
|
|
|
|
2021-02-21 13:54:46 +00:00
|
|
|
let mut tls_config = RustlsConfig::new();
|
2021-02-21 13:42:40 +00:00
|
|
|
tls_config
|
|
|
|
.root_store
|
|
|
|
.add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS);
|
|
|
|
let tls_config = TlsConnector::from(Arc::new(tls_config));
|
|
|
|
let dnsname = DNSNameRef::try_from_ascii_str(server_name).unwrap();
|
|
|
|
let stream = tls_config.connect(dnsname, conn).await?;
|
2021-02-22 09:06:40 +00:00
|
|
|
debug!("upgraded, stream is using TLS now");
|
2021-02-21 13:42:40 +00:00
|
|
|
|
|
|
|
Ok(Client::new(stream, self.config))
|
|
|
|
}
|
2021-02-22 20:36:08 +00:00
|
|
|
|
|
|
|
/// Check if this client has a particular capability
|
2021-02-23 02:47:00 +00:00
|
|
|
pub async fn has_capability(&mut self, cap: impl AsRef<str>) -> Result<bool> {
|
2021-02-22 20:36:08 +00:00
|
|
|
let cap = cap.as_ref().to_owned();
|
|
|
|
debug!("checking for the capability: {:?}", cap);
|
2021-02-23 02:47:00 +00:00
|
|
|
let cap = parse_capability(cap)?;
|
2021-02-22 20:36:08 +00:00
|
|
|
|
2021-02-23 02:47:00 +00:00
|
|
|
self.capabilities(false).await?;
|
2021-02-23 04:01:39 +00:00
|
|
|
let caps = self.caps.read();
|
2021-02-22 20:36:08 +00:00
|
|
|
// TODO: refresh caps
|
|
|
|
|
|
|
|
let caps = caps.as_ref().unwrap();
|
|
|
|
let result = caps.contains(&cap);
|
|
|
|
debug!("cap result: {:?}", result);
|
|
|
|
Ok(result)
|
|
|
|
}
|
2021-02-21 13:42:40 +00:00
|
|
|
}
|
|
|
|
|
2021-02-21 13:54:46 +00:00
|
|
|
pub struct GreetingWaiter(GreetingState);
|
2021-02-21 13:42:40 +00:00
|
|
|
|
2021-02-21 13:54:46 +00:00
|
|
|
impl Future for GreetingWaiter {
|
2021-02-23 02:47:00 +00:00
|
|
|
type Output = Response;
|
2021-02-21 13:42:40 +00:00
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
|
|
|
let (state, waker) = &mut *self.0.write();
|
2021-02-22 09:06:40 +00:00
|
|
|
debug!("g {:?}", state);
|
2021-02-21 13:42:40 +00:00
|
|
|
if waker.is_none() {
|
|
|
|
*waker = Some(cx.waker().clone());
|
|
|
|
}
|
|
|
|
|
2021-02-23 02:47:00 +00:00
|
|
|
match state.take() {
|
|
|
|
Some(v) => Poll::Ready(v),
|
|
|
|
None => Poll::Pending,
|
2021-02-21 13:42:40 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
|
|
|
|
2021-02-26 06:03:23 +00:00
|
|
|
// pub struct ExecWaiter<'a, C>(&'a Client<C>, usize, bool);
|
|
|
|
//
|
|
|
|
// impl<'a, C> Future for ExecWaiter<'a, C> {
|
|
|
|
// type Output = (Response, ResponseStream);
|
|
|
|
// fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
|
|
|
// // add the waker
|
|
|
|
// let mut results = self.0.results.write();
|
|
|
|
// if !self.2 {
|
|
|
|
// if let Some(HandlerResult {
|
|
|
|
// waker: waker_ref, ..
|
|
|
|
// }) = results
|
|
|
|
// .iter_mut()
|
|
|
|
// .find(|HandlerResult { id, .. }| *id == self.1)
|
|
|
|
// {
|
|
|
|
// let waker = cx.waker().clone();
|
|
|
|
// *waker_ref = Some(waker);
|
|
|
|
// self.2 = true;
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// // if this struct exists then there's definitely at least one entry
|
|
|
|
// let HandlerResult {
|
|
|
|
// id,
|
|
|
|
// end: last_response,
|
|
|
|
// ..
|
|
|
|
// } = &results[0];
|
|
|
|
// if *id != self.1 || last_response.is_none() {
|
|
|
|
// return Poll::Pending;
|
|
|
|
// }
|
|
|
|
//
|
|
|
|
// let HandlerResult {
|
|
|
|
// end: last_response,
|
|
|
|
// stream: intermediate_responses,
|
|
|
|
// ..
|
|
|
|
// } = results.pop_front().unwrap();
|
|
|
|
// mem::drop(results);
|
|
|
|
//
|
|
|
|
// Poll::Ready((
|
|
|
|
// last_response.expect("already checked"),
|
|
|
|
// intermediate_responses,
|
|
|
|
// ))
|
|
|
|
// }
|
|
|
|
// }
|
2021-02-20 05:03:33 +00:00
|
|
|
|
2021-02-21 13:42:40 +00:00
|
|
|
/// Main listen loop for the application
|
|
|
|
async fn listen<C>(
|
|
|
|
conn: C,
|
2021-02-22 09:06:40 +00:00
|
|
|
caps: CapsLock,
|
2021-02-26 06:03:23 +00:00
|
|
|
results: ResultQueue,
|
2021-02-21 13:42:40 +00:00
|
|
|
mut exit: mpsc::Receiver<()>,
|
2021-02-21 13:54:46 +00:00
|
|
|
greeting: GreetingState,
|
2021-02-21 13:42:40 +00:00
|
|
|
) -> Result<C>
|
|
|
|
where
|
|
|
|
C: AsyncRead + Unpin,
|
|
|
|
{
|
2021-02-24 12:43:50 +00:00
|
|
|
// debug!("amogus");
|
2021-02-20 05:03:33 +00:00
|
|
|
let mut reader = BufReader::new(conn);
|
2021-02-21 13:42:40 +00:00
|
|
|
let mut greeting = Some(greeting);
|
|
|
|
|
2021-02-20 05:03:33 +00:00
|
|
|
loop {
|
|
|
|
let mut next_line = String::new();
|
2021-02-21 13:42:40 +00:00
|
|
|
let fut = reader.read_line(&mut next_line).fuse();
|
|
|
|
pin_mut!(fut);
|
|
|
|
let fut2 = exit.recv().fuse();
|
|
|
|
pin_mut!(fut2);
|
|
|
|
|
|
|
|
match future::select(fut, fut2).await {
|
2021-02-22 07:37:19 +00:00
|
|
|
Either::Left((res, _)) => {
|
2021-02-22 09:06:40 +00:00
|
|
|
let bytes = res.context("read failed")?;
|
|
|
|
if bytes == 0 {
|
|
|
|
bail!("connection probably died");
|
|
|
|
}
|
|
|
|
|
2021-02-22 07:37:19 +00:00
|
|
|
debug!("got a new line {:?}", next_line);
|
2021-02-23 02:47:00 +00:00
|
|
|
let resp = parse_response(next_line)?;
|
2021-02-22 09:06:40 +00:00
|
|
|
|
2021-02-23 04:01:39 +00:00
|
|
|
// if this is the very first message, treat it as a greeting
|
2021-02-22 09:06:40 +00:00
|
|
|
if let Some(greeting) = greeting.take() {
|
|
|
|
let (greeting, waker) = &mut *greeting.write();
|
|
|
|
debug!("received greeting!");
|
2021-02-23 02:47:00 +00:00
|
|
|
*greeting = Some(resp.clone());
|
2021-02-22 09:06:40 +00:00
|
|
|
if let Some(waker) = waker.take() {
|
|
|
|
waker.wake();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-23 04:01:39 +00:00
|
|
|
// update capabilities list
|
|
|
|
// TODO: probably not really necessary here (done somewhere else)?
|
|
|
|
if let Response::Capabilities(new_caps)
|
|
|
|
| Response::Data {
|
|
|
|
status: Status::Ok,
|
|
|
|
code: Some(ResponseCode::Capabilities(new_caps)),
|
|
|
|
..
|
|
|
|
} = &resp
|
|
|
|
{
|
|
|
|
let caps = &mut *caps.write();
|
|
|
|
*caps = Some(new_caps.iter().cloned().collect());
|
|
|
|
debug!("new caps: {:?}", caps);
|
|
|
|
}
|
|
|
|
|
2021-02-22 09:06:40 +00:00
|
|
|
match &resp {
|
2021-02-23 04:01:39 +00:00
|
|
|
Response::Data {
|
|
|
|
status: Status::Ok, ..
|
2021-02-22 09:06:40 +00:00
|
|
|
} => {
|
2021-02-23 04:01:39 +00:00
|
|
|
let mut results = results.write();
|
2021-02-26 06:03:23 +00:00
|
|
|
if let Some(HandlerResult { id, sender, .. }) = results.iter_mut().next() {
|
|
|
|
debug!("pushed to intermediate for id {}: {:?}", id, resp);
|
|
|
|
sender.send(resp)?;
|
2021-02-23 04:01:39 +00:00
|
|
|
}
|
2021-02-21 13:42:40 +00:00
|
|
|
}
|
2021-02-22 09:06:40 +00:00
|
|
|
|
2021-02-22 20:36:08 +00:00
|
|
|
// bye
|
|
|
|
Response::Data {
|
|
|
|
status: Status::Bye,
|
|
|
|
..
|
|
|
|
} => {
|
|
|
|
bail!("disconnected");
|
|
|
|
}
|
|
|
|
|
2021-02-22 09:06:40 +00:00
|
|
|
Response::Done { tag, .. } => {
|
2021-02-22 20:43:09 +00:00
|
|
|
if tag.starts_with(TAG_PREFIX) {
|
2021-02-23 04:01:39 +00:00
|
|
|
// let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
|
2021-02-22 09:06:40 +00:00
|
|
|
let mut results = results.write();
|
2021-02-26 06:03:23 +00:00
|
|
|
if let Some(HandlerResult {
|
|
|
|
end: ref mut opt,
|
|
|
|
waker,
|
|
|
|
..
|
|
|
|
}) = results.iter_mut().next()
|
|
|
|
{
|
|
|
|
if let Some(opt) = opt.take() {
|
|
|
|
opt.send(resp).unwrap();
|
|
|
|
}
|
|
|
|
// *opt = Some(resp);
|
2021-02-22 09:06:40 +00:00
|
|
|
if let Some(waker) = waker.take() {
|
|
|
|
waker.wake();
|
|
|
|
}
|
|
|
|
}
|
2021-02-21 13:42:40 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-22 09:06:40 +00:00
|
|
|
|
2021-02-23 02:47:00 +00:00
|
|
|
_ => {}
|
2021-02-21 13:42:40 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-22 09:06:40 +00:00
|
|
|
|
2021-02-21 13:42:40 +00:00
|
|
|
Either::Right((_, _)) => {
|
|
|
|
debug!("exiting read loop");
|
|
|
|
break;
|
2021-02-20 07:30:58 +00:00
|
|
|
}
|
|
|
|
}
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|
2021-02-21 13:42:40 +00:00
|
|
|
|
|
|
|
let conn = reader.into_inner();
|
|
|
|
Ok(conn)
|
2021-02-20 05:03:33 +00:00
|
|
|
}
|