From 080b4e0e09a7cbf74fa229d4643b57817fcbd61a Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Tue, 9 May 2023 12:49:30 -0500 Subject: [PATCH] Get send message working --- Cargo.lock | 1 + client/package.json | 1 + client/src-tauri/Cargo.toml | 1 + client/src-tauri/src/main.rs | 67 ++++++++++++++---- client/src-tauri/tauri.conf.json | 2 +- client/tauri.conf.json5 | 2 - client/vite.config.js | 2 +- proto/chat.proto | 15 ++-- server/src/main.rs | 59 ++++++++-------- server/src/mux.rs | 118 +++++++++++++++++++++++++++---- 10 files changed, 200 insertions(+), 68 deletions(-) delete mode 100644 client/tauri.conf.json5 diff --git a/Cargo.lock b/Cargo.lock index e3b5694..2fba709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1790,6 +1790,7 @@ dependencies = [ "tauri-build", "tokio", "tonic", + "uuid", ] [[package]] diff --git a/client/package.json b/client/package.json index be13309..e6440a6 100644 --- a/client/package.json +++ b/client/package.json @@ -1,5 +1,6 @@ { "scripts": { + "build": "vite build", "dev": "vite" }, "devDependencies": { diff --git a/client/src-tauri/Cargo.toml b/client/src-tauri/Cargo.toml index 7455212..51a13ff 100644 --- a/client/src-tauri/Cargo.toml +++ b/client/src-tauri/Cargo.toml @@ -21,6 +21,7 @@ mraow-common = { path = "../../common" } tokio = { version = "1.28.0", features = ["full"] } anyhow = "1.0.71" tonic = "0.9.2" +uuid = { version = "1.3.2", features = ["v4"] } [features] default = ["mraow-common/client"] diff --git a/client/src-tauri/src/main.rs b/client/src-tauri/src/main.rs index 73c0401..91198a3 100644 --- a/client/src-tauri/src/main.rs +++ b/client/src-tauri/src/main.rs @@ -1,21 +1,39 @@ // Prevents additional console window on Windows in release, DO NOT REMOVE!! #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] -use anyhow::Result; -use mraow_common::chat_proto::chat_client::ChatClient; -use tauri::async_runtime::{Mutex, TokioHandle}; -use tonic::transport::channel::Channel; +use std::sync::Arc; -type MyGreeterClient = ChatClient; +use anyhow::Result; +use mraow_common::chat_proto::{ + chat_client::ChatClient, ChatMessage, ReceiveMsgsRequest, +}; +use tauri::{ + async_runtime::{Mutex, TokioHandle}, + State, +}; +use tonic::{transport::channel::Channel, IntoRequest}; +use uuid::Uuid; + +type MyChatClient = Arc>>; + +pub struct UserId(Uuid); #[tauri::command] async fn send_message( - state: tauri::State<'_, Mutex>, + state: State<'_, MyChatClient>, + user_id: State<'_, UserId>, message: String, ) -> Result<(), ()> { println!("SHIET {state:?}"); - // let mut client = state.lock().await; + let mut client = state.lock().await; + + let user_id = user_id.inner(); + client.send_msg(ChatMessage { + from_user_id: user_id.0.to_string(), + content: message, + ..Default::default() + }).await.unwrap(); /* client .say_hello(HelloRequest { message, @@ -30,15 +48,36 @@ async fn send_message( async fn main() -> Result<()> { tauri::async_runtime::set(TokioHandle::current()); - /* - TODO: Make sure this doesn't necessarily have to connect before starting the app - let greeter_client = ChatClient::connect("http://[::1]:50051").await?; - let greeter_client = Mutex::new(greeter_client); - println!("Connected :)"); - */ + let uuid = Uuid::new_v4(); + let user_id = UserId(uuid.clone()); + + let chat_client = ChatClient::connect("http://[::1]:50051").await?; + let chat_client = Arc::new(Mutex::new(chat_client)); + let chat_client2 = chat_client.clone(); tauri::Builder::default() - // .manage(greeter_client) + .setup(move |app| { + tokio::spawn(async move { + let stream = { + let mut client = chat_client2.lock().await; + client + .receive_msgs(ReceiveMsgsRequest { + user_id: uuid.to_string(), + }) + .await + .unwrap() + }; + + let mut stream = stream.into_inner(); + while let Ok(Some(message)) = stream.message().await { + println!("SHIET message {message:?}"); + } + }); + + Ok(()) + }) + .manage(user_id) + .manage(chat_client) .invoke_handler(tauri::generate_handler![send_message]) .run(tauri::generate_context!()) .expect("error while running tauri application"); diff --git a/client/src-tauri/tauri.conf.json b/client/src-tauri/tauri.conf.json index eca8ca4..d2f6e9f 100644 --- a/client/src-tauri/tauri.conf.json +++ b/client/src-tauri/tauri.conf.json @@ -29,7 +29,7 @@ "icons/icon.icns", "icons/icon.ico" ], - "identifier": "com.tauri.dev", + "identifier": "io.mzhang.mraow", "longDescription": "", "macOS": { "entitlements": null, diff --git a/client/tauri.conf.json5 b/client/tauri.conf.json5 deleted file mode 100644 index 2c63c08..0000000 --- a/client/tauri.conf.json5 +++ /dev/null @@ -1,2 +0,0 @@ -{ -} diff --git a/client/vite.config.js b/client/vite.config.js index 1e69324..3b4ee38 100644 --- a/client/vite.config.js +++ b/client/vite.config.js @@ -7,7 +7,7 @@ export default defineConfig({ // Tauri expects a fixed port, fail if that port is not available server: { - strictPort: true, + strictPort: false, }, // to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`, diff --git a/proto/chat.proto b/proto/chat.proto index c3cb475..d7b06a3 100644 --- a/proto/chat.proto +++ b/proto/chat.proto @@ -12,18 +12,21 @@ message ChatMessage { string content = 5; } -message Room { - string name = 1; -} +message Room { string name = 1; } message UserList { repeated User users = 1; } -message RoomList {repeated Room rooms = 1;} +message RoomList { repeated Room rooms = 1; } -message User { string username = 1; } +message User { + string userId = 1; + string username = 2; +} // RPC Messages +message ReceiveMsgsRequest { string userId = 1; } + message RoomActionResponse {} message RoomAction { @@ -38,7 +41,7 @@ service Chat { // Messages rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {} - rpc receiveMsgs(google.protobuf.Empty) returns (stream ChatMessage) {} + rpc receiveMsgs(ReceiveMsgsRequest) returns (stream ChatMessage) {} // List rpc listRooms(google.protobuf.Empty) returns (RoomList) {} diff --git a/server/src/main.rs b/server/src/main.rs index fda6f5c..b75734f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,7 +2,12 @@ mod mux; use futures::Stream; use hyper::Body; -use mux::SharedMux; +use mraow_common::chat_proto::{ + chat_server::{Chat, ChatServer}, + ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList, + UserList, +}; +use mux::{SharedMux, TextMessage}; use std::{ pin::Pin, task::{Context, Poll}, @@ -10,13 +15,11 @@ use std::{ }; use tonic::{body::BoxBody, transport::Server, Request, Response, Status}; use tower::{Layer, Service}; +use uuid::Uuid; -use mraow_common::chat_proto::{ - chat_server::{Chat, ChatServer}, - ChatMessage, RoomAction, RoomActionResponse, RoomList, User, UserList, -}; +use crate::mux::{RoomId, RoomMessage, UserId}; -type ResponseStream = +pub type ResponseStream = Pin> + Send>>; #[derive(Default)] @@ -40,16 +43,33 @@ impl Chat for ChatImpl { &self, request: Request, ) -> Result, Status> { + let request = request.into_inner(); println!("Send message {request:?}"); - todo!() + + // TODO: Get room id + let room_id = RoomId(request.to_room_id); + let message = RoomMessage::Text(TextMessage { + content: request.content, + }); + + self.mux.send_message_to_room(room_id, message); + + Ok(Response::new(())) } async fn receive_msgs( &self, - request: Request<()>, + request: Request, ) -> Result, Status> { + let request = request.into_inner(); println!("Receive message {request:?}"); - todo!() + + let user_id = Uuid::parse_str(&request.user_id).unwrap(); + let user_id = UserId(user_id); + let stream = self.mux.create_listen_stream(user_id); + + // TODO: Handle drop + Ok(Response::new(stream)) } async fn list_rooms( @@ -67,27 +87,6 @@ impl Chat for ChatImpl { println!("Get all users {request:?}"); todo!() } - - /* - async fn say_hello( - &self, - request: Request, - ) -> Result, Status> { - let addr = request.remote_addr(); - let request = request.into_inner(); - - println!( - "Got a request from {:?}: {}", - addr, - request.message - ); - - let reply = HelloReply { - message: format!("Hello {}!", request.name), - }; - Ok(Response::new(reply)) - } - */ } #[tokio::main] diff --git a/server/src/mux.rs b/server/src/mux.rs index a100dc7..cb7659b 100644 --- a/server/src/mux.rs +++ b/server/src/mux.rs @@ -1,32 +1,122 @@ use std::{ - collections::{HashMap, HashSet}, + ops::{Deref, DerefMut}, + pin::Pin, sync::Arc, }; -use anyhow::Result; -use dashmap::DashMap; -use futures::{future, Future, FutureExt, Sink, Stream}; -use tokio::{ - sync::{ - broadcast::{Receiver, Sender}, - mpsc::{self, UnboundedReceiver, UnboundedSender}, - Mutex, - }, - task::JoinHandle, +use dashmap::{DashMap, DashSet}; + +use futures::{stream, FutureExt, Stream, StreamExt}; +use mraow_common::chat_proto::{ + chat_server::{Chat, ChatServer}, + ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList, + UserList, }; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; use uuid::Uuid; -pub struct UserId(Uuid); +use crate::ResponseStream; + +pub struct TextMessage { + pub content: String, +} + +pub enum RoomMessage { + Text(TextMessage), +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct UserId(pub Uuid); + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct RoomId(pub String); #[derive(Clone, Default)] -pub struct SharedMux(Arc>); +pub struct SharedMux(Arc); + +impl Deref for SharedMux { + type Target = Mux; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} /// Shared server state #[derive(Default)] pub struct Mux { - rooms: DashMap, + rooms: DashMap, + users: DashMap, +} + +impl Mux { + pub fn user_join_room(&self, user_id: UserId, room_id: RoomId) { + self + .rooms + .entry(room_id) + .and_modify(|room| { + room.members.insert(user_id.clone()); + }) + .or_insert_with(|| { + let members = DashSet::new(); + members.insert(user_id); + Room { members } + }); + } + + pub fn send_message_to_room(&self, room_id: RoomId, message: RoomMessage) { + let message = Arc::new(message); + + let recipients = + match self.rooms.get(&room_id).map(|room| room.members.clone()) { + Some(v) => v, + None => return, + }; + + for user_id in recipients { + // TODO: This should technically never be None? + let user = self.users.get(&user_id).unwrap(); + + user.tx.send(message.clone()); + } + } + + pub fn ensure_user(&self, user_id: UserId) { + self.users.entry(user_id).or_insert_with(|| { + let (tx, rx) = mpsc::unbounded_channel(); + ConnectedUser { tx, rx: Some(rx) } + }); + } + + pub fn create_listen_stream(&self, user_id: UserId) -> ResponseStream { + let rx = { + self.ensure_user(user_id.clone()); + let mut user = self.users.get_mut(&user_id).unwrap(); + user.rx.take().unwrap() + }; + let stream = UnboundedReceiverStream::new(rx); + + stream + .map(|message| { + let chat_message = ChatMessage { + ..Default::default() + }; + + Ok(chat_message) + }) + .boxed() + } } pub struct Room { members: DashSet, } + +pub struct ConnectedUser { + tx: UnboundedSender>, + rx: Option>>, +}