panorama/imap/src/client/inner.rs

141 lines
4.1 KiB
Rust
Raw Normal View History

2021-02-20 05:03:33 +00:00
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use anyhow::Result;
use futures::future::{Future, FutureExt};
use panorama_strings::{StringEntry, StringStore};
2021-02-20 07:30:58 +00:00
use parking_lot::{Mutex, RwLock};
2021-02-20 05:03:33 +00:00
use tokio::{
io::{
self, AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader,
WriteHalf,
},
task::JoinHandle,
};
use crate::command::Command;
2021-02-20 07:30:58 +00:00
use crate::response::Response;
2021-02-20 05:03:33 +00:00
pub type BoxedFunc = Box<dyn Fn()>;
2021-02-21 01:13:10 +00:00
pub const TAG_PREFIX: &str = "panorama";
2021-02-20 05:03:33 +00:00
/// The private Client struct, that is shared by all of the exported structs in the state machine.
pub struct Client<C> {
conn: WriteHalf<C>,
symbols: StringStore,
id: usize,
2021-02-20 07:30:58 +00:00
results: ResultMap,
2021-02-20 05:03:33 +00:00
caps: Vec<StringEntry>,
handle: JoinHandle<Result<()>>,
}
impl<C> Client<C>
where
C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
/// Creates a new client that wraps a connection
pub fn new(conn: C) -> Self {
let (read_half, write_half) = io::split(conn);
2021-02-20 07:30:58 +00:00
let results = Arc::new(RwLock::new(HashMap::new()));
let listen_fut = tokio::spawn(listen(read_half, results.clone()));
2021-02-20 05:03:33 +00:00
Client {
conn: write_half,
symbols: StringStore::new(256),
id: 0,
2021-02-20 07:30:58 +00:00
results,
2021-02-20 05:03:33 +00:00
caps: Vec::new(),
handle: listen_fut,
}
}
/// Sends a command to the server and returns a handle to retrieve the result
2021-02-20 07:30:58 +00:00
pub async fn execute(&mut self, cmd: Command) -> Result<Response> {
debug!("executing command {:?}", cmd);
2021-02-20 05:03:33 +00:00
let id = self.id;
self.id += 1;
{
2021-02-20 07:30:58 +00:00
let mut handlers = self.results.write();
handlers.insert(id, (None, None));
2021-02-20 05:03:33 +00:00
}
2021-02-21 01:13:10 +00:00
let cmd_str = format!("{}{} {}\r\n", TAG_PREFIX, id, cmd);
2021-02-20 07:30:58 +00:00
debug!("[{}] writing to socket: {:?}", id, cmd_str);
2021-02-20 05:03:33 +00:00
self.conn.write_all(cmd_str.as_bytes()).await?;
2021-02-20 07:30:58 +00:00
debug!("[{}] written.", id);
2021-02-20 05:03:33 +00:00
ExecHandle(self, id).await;
2021-02-20 07:30:58 +00:00
let resp = {
let mut handlers = self.results.write();
handlers.remove(&id).unwrap().0.unwrap()
};
Ok(Response(resp))
2021-02-20 05:03:33 +00:00
}
/// Executes the CAPABILITY command
2021-02-20 07:30:58 +00:00
pub async fn supports(&mut self) -> Result<()> {
2021-02-20 05:03:33 +00:00
let cmd = Command::Capability;
2021-02-20 07:30:58 +00:00
debug!("sending: {:?} {:?}", cmd, cmd.to_string());
let result = self.execute(cmd).await?;
debug!("result from supports: {:?}", result);
Ok(())
2021-02-20 05:03:33 +00:00
}
}
pub struct ExecHandle<'a, C>(&'a Client<C>, usize);
impl<'a, C> Future for ExecHandle<'a, C> {
type Output = ();
2021-02-20 07:30:58 +00:00
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut handlers = self.0.results.write();
2021-02-21 01:13:10 +00:00
let state = handlers.get_mut(&self.1);
2021-02-20 05:03:33 +00:00
// TODO: handle the None case here
2021-02-20 07:30:58 +00:00
debug!("f[{}] {:?}", self.1, state);
let (result, waker) = state.unwrap();
match result {
Some(_) => Poll::Ready(()),
None => {
*waker = Some(cx.waker().clone());
Poll::Pending
}
2021-02-20 05:03:33 +00:00
}
}
}
2021-02-20 07:30:58 +00:00
use std::task::Waker;
pub type ResultMap = Arc<RwLock<HashMap<usize, (Option<String>, Option<Waker>)>>>;
async fn listen(conn: impl AsyncRead + Unpin, results: ResultMap) -> Result<()> {
2021-02-20 05:24:46 +00:00
debug!("amogus");
2021-02-20 05:03:33 +00:00
let mut reader = BufReader::new(conn);
loop {
let mut next_line = String::new();
reader.read_line(&mut next_line).await?;
2021-02-21 01:13:10 +00:00
let next_line = next_line.trim_end_matches('\r');
2021-02-20 07:30:58 +00:00
// debug!("line: {:?}", next_line);
2021-02-21 01:13:10 +00:00
let mut parts = next_line.split(" ");
let tag = parts.next().unwrap();
let rest = parts.collect::<Vec<_>>().join(" ");
2021-02-20 07:30:58 +00:00
if tag == "*" {
2021-02-21 01:13:10 +00:00
debug!("UNTAGGED {:?}", rest);
} else if tag.starts_with(TAG_PREFIX) {
let id = tag.trim_start_matches(TAG_PREFIX).parse::<usize>()?;
debug!("set {} to {:?}", id, rest);
2021-02-20 07:30:58 +00:00
let mut results = results.write();
if let Some((c, w)) = results.get_mut(&id) {
2021-02-21 01:13:10 +00:00
*c = Some(rest.to_string());
2021-02-20 07:30:58 +00:00
let w = w.take().unwrap();
w.wake();
}
}
2021-02-20 05:03:33 +00:00
}
}