This commit is contained in:
Michael Zhang 2023-05-09 11:37:52 -05:00
parent b1ef494759
commit d9f08b8deb
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
5 changed files with 37 additions and 91 deletions

15
Cargo.lock generated
View file

@ -609,6 +609,19 @@ dependencies = [
"syn 2.0.15", "syn 2.0.15",
] ]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.17" version = "0.99.17"
@ -1793,6 +1806,7 @@ name = "mraow-server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"dashmap",
"futures", "futures",
"hyper", "hyper",
"mraow-common", "mraow-common",
@ -1805,6 +1819,7 @@ dependencies = [
"tower", "tower",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid",
] ]
[[package]] [[package]]

View file

@ -34,7 +34,7 @@ message RoomAction {
service Chat { service Chat {
// Rooms // Rooms
rpc roomAction(RoomAction) returns (JoinResponse) {} rpc roomAction(RoomAction) returns (RoomActionResponse) {}
// Messages // Messages
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {} rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}

View file

@ -20,6 +20,8 @@ tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
mraow-common = { path = "../common" } mraow-common = { path = "../common" }
uuid = { version = "1.3.2", features = ["v4"] }
dashmap = "5.4.0"
[build-dependencies] [build-dependencies]
tonic-build = "0.9.2" tonic-build = "0.9.2"

View file

@ -2,6 +2,7 @@ mod mux;
use futures::Stream; use futures::Stream;
use hyper::Body; use hyper::Body;
use mux::SharedMux;
use std::{ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -19,7 +20,9 @@ type ResponseStream =
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>; Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
#[derive(Default)] #[derive(Default)]
pub struct ChatImpl {} pub struct ChatImpl {
mux: SharedMux,
}
#[tonic::async_trait] #[tonic::async_trait]
impl Chat for ChatImpl { impl Chat for ChatImpl {
@ -41,10 +44,10 @@ impl Chat for ChatImpl {
todo!() todo!()
} }
async fn receive_msg( async fn receive_msgs(
&self, &self,
request: Request<()>, request: Request<()>,
) -> Result<Response<Self::receiveMsgStream>, Status> { ) -> Result<Response<Self::receiveMsgsStream>, Status> {
println!("Receive message {request:?}"); println!("Receive message {request:?}");
todo!() todo!()
} }

View file

@ -1,106 +1,32 @@
use std::{ use std::{
collections::HashMap, collections::{HashMap, HashSet},
pin::Pin, sync::Arc,
task::{Context, Poll},
}; };
use anyhow::Result; use anyhow::Result;
use dashmap::DashMap;
use futures::{future, Future, FutureExt, Sink, Stream}; use futures::{future, Future, FutureExt, Sink, Stream};
use tokio::{ use tokio::{
sync::{ sync::{
broadcast::{Receiver, Sender}, broadcast::{Receiver, Sender},
mpsc::{self, UnboundedReceiver, UnboundedSender}, mpsc::{self, UnboundedReceiver, UnboundedSender},
Mutex,
}, },
task::JoinHandle, task::JoinHandle,
}; };
use uuid::Uuid;
#[derive(Clone)] pub struct UserId(Uuid);
pub struct SharedMux(Mutex<Mux>);
#[derive(Clone, Default)]
pub struct SharedMux(Arc<Mutex<Mux>>);
/// Shared server state /// Shared server state
#[derive(Default)]
pub struct Mux { pub struct Mux {
rooms: HashMap<String, Room>, rooms: DashMap<String, Room>,
} }
#[derive(Clone)]
pub enum RoomMessage {}
pub struct Room { pub struct Room {
name: String, members: DashSet<UserId>,
tx: Sender<RoomMessage>,
rx: Receiver<RoomMessage>,
}
impl Room {
pub fn get_handle(&self) -> RoomHandle {
RoomHandle {
name: self.name.clone(),
tx: self.tx.clone(),
rx: self.tx.subscribe(),
}
}
}
pub struct RoomHandle {
name: String,
tx: Sender<RoomMessage>,
rx: Receiver<RoomMessage>,
}
pub enum CompositeListenerMessage {
Room { name: String, message: RoomMessage },
JoinRoom(String),
DropRoom(String),
}
pub struct CompositeListener {
current_listening_rooms: HashMap<String, RoomHandle>,
}
impl CompositeListener {}
/// Create a "composite" listener that listens to multiple rooms / streams, and
/// is able to add more streams in the middle as well
pub fn create_listener() -> (
JoinHandle<()>,
UnboundedSender<CompositeListenerMessage>,
UnboundedReceiver<CompositeListenerMessage>,
) {
let (tx, rx) = mpsc::unbounded_channel();
/*
let join_handle = tokio::spawn(async {
let mut currently_listening_to: HashMap<String, RoomHandle> =
HashMap::new();
loop {
// Build select targets
let ouais = rx.recv().boxed();
let mut select_targets: Vec<
Pin<Box<dyn Future<Output = Option<CompositeListenerMessage>>>>,
> = vec![ouais];
for (_, room_handle) in currently_listening_to.iter() {
select_targets.push(
room_handle
.rx
.recv()
.map(|f| {
f.ok().map(|m| CompositeListenerMessage::Room {
name: room_handle.name.clone(),
message: m,
})
})
.boxed(),
);
}
// Select
let result = future::select_all(select_targets).await;
}
});
(join_handle, tx, rx)
*/
todo!()
} }