diff --git a/Cargo.lock b/Cargo.lock index 7ca9de0..e3b5694 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -609,6 +609,19 @@ dependencies = [ "syn 2.0.15", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -1793,6 +1806,7 @@ name = "mraow-server" version = "0.1.0" dependencies = [ "anyhow", + "dashmap", "futures", "hyper", "mraow-common", @@ -1805,6 +1819,7 @@ dependencies = [ "tower", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/proto/chat.proto b/proto/chat.proto index c56b1bd..c3cb475 100644 --- a/proto/chat.proto +++ b/proto/chat.proto @@ -34,7 +34,7 @@ message RoomAction { service Chat { // Rooms - rpc roomAction(RoomAction) returns (JoinResponse) {} + rpc roomAction(RoomAction) returns (RoomActionResponse) {} // Messages rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {} @@ -43,4 +43,4 @@ service Chat { // List rpc listRooms(google.protobuf.Empty) returns (RoomList) {} rpc getAllUsers(google.protobuf.Empty) returns (UserList) {} -} \ No newline at end of file +} diff --git a/server/Cargo.toml b/server/Cargo.toml index 321c9e3..af3c16b 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -20,6 +20,8 @@ tracing = "0.1.37" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } mraow-common = { path = "../common" } +uuid = { version = "1.3.2", features = ["v4"] } +dashmap = "5.4.0" [build-dependencies] tonic-build = "0.9.2" diff --git a/server/src/main.rs b/server/src/main.rs index 1337057..fda6f5c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,6 +2,7 @@ mod mux; use futures::Stream; use hyper::Body; +use mux::SharedMux; use std::{ pin::Pin, task::{Context, Poll}, @@ -19,7 +20,9 @@ type ResponseStream = Pin> + Send>>; #[derive(Default)] -pub struct ChatImpl {} +pub struct ChatImpl { + mux: SharedMux, +} #[tonic::async_trait] impl Chat for ChatImpl { @@ -41,10 +44,10 @@ impl Chat for ChatImpl { todo!() } - async fn receive_msg( + async fn receive_msgs( &self, request: Request<()>, - ) -> Result, Status> { + ) -> Result, Status> { println!("Receive message {request:?}"); todo!() } diff --git a/server/src/mux.rs b/server/src/mux.rs index a117078..a100dc7 100644 --- a/server/src/mux.rs +++ b/server/src/mux.rs @@ -1,106 +1,32 @@ use std::{ - collections::HashMap, - pin::Pin, - task::{Context, Poll}, + collections::{HashMap, HashSet}, + sync::Arc, }; use anyhow::Result; +use dashmap::DashMap; use futures::{future, Future, FutureExt, Sink, Stream}; use tokio::{ sync::{ broadcast::{Receiver, Sender}, mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, }, task::JoinHandle, }; +use uuid::Uuid; -#[derive(Clone)] -pub struct SharedMux(Mutex); +pub struct UserId(Uuid); + +#[derive(Clone, Default)] +pub struct SharedMux(Arc>); /// Shared server state +#[derive(Default)] pub struct Mux { - rooms: HashMap, + rooms: DashMap, } -#[derive(Clone)] -pub enum RoomMessage {} - pub struct Room { - name: String, - tx: Sender, - rx: Receiver, -} - -impl Room { - pub fn get_handle(&self) -> RoomHandle { - RoomHandle { - name: self.name.clone(), - tx: self.tx.clone(), - rx: self.tx.subscribe(), - } - } -} - -pub struct RoomHandle { - name: String, - tx: Sender, - rx: Receiver, -} - -pub enum CompositeListenerMessage { - Room { name: String, message: RoomMessage }, - JoinRoom(String), - DropRoom(String), -} - -pub struct CompositeListener { - current_listening_rooms: HashMap, -} - -impl CompositeListener {} - -/// Create a "composite" listener that listens to multiple rooms / streams, and -/// is able to add more streams in the middle as well -pub fn create_listener() -> ( - JoinHandle<()>, - UnboundedSender, - UnboundedReceiver, -) { - let (tx, rx) = mpsc::unbounded_channel(); - - /* - let join_handle = tokio::spawn(async { - let mut currently_listening_to: HashMap = - HashMap::new(); - - loop { - // Build select targets - let ouais = rx.recv().boxed(); - let mut select_targets: Vec< - Pin>>>, - > = vec![ouais]; - for (_, room_handle) in currently_listening_to.iter() { - select_targets.push( - room_handle - .rx - .recv() - .map(|f| { - f.ok().map(|m| CompositeListenerMessage::Room { - name: room_handle.name.clone(), - message: m, - }) - }) - .boxed(), - ); - } - - // Select - let result = future::select_all(select_targets).await; - } - }); - - (join_handle, tx, rx) - */ - - todo!() + members: DashSet, }