Get send message working
This commit is contained in:
parent
d9f08b8deb
commit
080b4e0e09
10 changed files with 200 additions and 68 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1790,6 +1790,7 @@ dependencies = [
|
|||
"tauri-build",
|
||||
"tokio",
|
||||
"tonic",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
{
|
||||
"scripts": {
|
||||
"build": "vite build",
|
||||
"dev": "vite"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
|
|
@ -21,6 +21,7 @@ mraow-common = { path = "../../common" }
|
|||
tokio = { version = "1.28.0", features = ["full"] }
|
||||
anyhow = "1.0.71"
|
||||
tonic = "0.9.2"
|
||||
uuid = { version = "1.3.2", features = ["v4"] }
|
||||
|
||||
[features]
|
||||
default = ["mraow-common/client"]
|
||||
|
|
|
@ -1,21 +1,39 @@
|
|||
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
|
||||
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
|
||||
|
||||
use anyhow::Result;
|
||||
use mraow_common::chat_proto::chat_client::ChatClient;
|
||||
use tauri::async_runtime::{Mutex, TokioHandle};
|
||||
use tonic::transport::channel::Channel;
|
||||
use std::sync::Arc;
|
||||
|
||||
type MyGreeterClient = ChatClient<Channel>;
|
||||
use anyhow::Result;
|
||||
use mraow_common::chat_proto::{
|
||||
chat_client::ChatClient, ChatMessage, ReceiveMsgsRequest,
|
||||
};
|
||||
use tauri::{
|
||||
async_runtime::{Mutex, TokioHandle},
|
||||
State,
|
||||
};
|
||||
use tonic::{transport::channel::Channel, IntoRequest};
|
||||
use uuid::Uuid;
|
||||
|
||||
type MyChatClient = Arc<Mutex<ChatClient<Channel>>>;
|
||||
|
||||
pub struct UserId(Uuid);
|
||||
|
||||
#[tauri::command]
|
||||
async fn send_message(
|
||||
state: tauri::State<'_, Mutex<MyGreeterClient>>,
|
||||
state: State<'_, MyChatClient>,
|
||||
user_id: State<'_, UserId>,
|
||||
message: String,
|
||||
) -> Result<(), ()> {
|
||||
println!("SHIET {state:?}");
|
||||
|
||||
// let mut client = state.lock().await;
|
||||
let mut client = state.lock().await;
|
||||
|
||||
let user_id = user_id.inner();
|
||||
client.send_msg(ChatMessage {
|
||||
from_user_id: user_id.0.to_string(),
|
||||
content: message,
|
||||
..Default::default()
|
||||
}).await.unwrap();
|
||||
/* client
|
||||
.say_hello(HelloRequest {
|
||||
message,
|
||||
|
@ -30,15 +48,36 @@ async fn send_message(
|
|||
async fn main() -> Result<()> {
|
||||
tauri::async_runtime::set(TokioHandle::current());
|
||||
|
||||
/*
|
||||
TODO: Make sure this doesn't necessarily have to connect before starting the app
|
||||
let greeter_client = ChatClient::connect("http://[::1]:50051").await?;
|
||||
let greeter_client = Mutex::new(greeter_client);
|
||||
println!("Connected :)");
|
||||
*/
|
||||
let uuid = Uuid::new_v4();
|
||||
let user_id = UserId(uuid.clone());
|
||||
|
||||
let chat_client = ChatClient::connect("http://[::1]:50051").await?;
|
||||
let chat_client = Arc::new(Mutex::new(chat_client));
|
||||
let chat_client2 = chat_client.clone();
|
||||
|
||||
tauri::Builder::default()
|
||||
// .manage(greeter_client)
|
||||
.setup(move |app| {
|
||||
tokio::spawn(async move {
|
||||
let stream = {
|
||||
let mut client = chat_client2.lock().await;
|
||||
client
|
||||
.receive_msgs(ReceiveMsgsRequest {
|
||||
user_id: uuid.to_string(),
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let mut stream = stream.into_inner();
|
||||
while let Ok(Some(message)) = stream.message().await {
|
||||
println!("SHIET message {message:?}");
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.manage(user_id)
|
||||
.manage(chat_client)
|
||||
.invoke_handler(tauri::generate_handler![send_message])
|
||||
.run(tauri::generate_context!())
|
||||
.expect("error while running tauri application");
|
||||
|
|
|
@ -29,7 +29,7 @@
|
|||
"icons/icon.icns",
|
||||
"icons/icon.ico"
|
||||
],
|
||||
"identifier": "com.tauri.dev",
|
||||
"identifier": "io.mzhang.mraow",
|
||||
"longDescription": "",
|
||||
"macOS": {
|
||||
"entitlements": null,
|
||||
|
|
|
@ -1,2 +0,0 @@
|
|||
{
|
||||
}
|
|
@ -7,7 +7,7 @@ export default defineConfig({
|
|||
|
||||
// Tauri expects a fixed port, fail if that port is not available
|
||||
server: {
|
||||
strictPort: true,
|
||||
strictPort: false,
|
||||
},
|
||||
|
||||
// to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`,
|
||||
|
|
|
@ -12,18 +12,21 @@ message ChatMessage {
|
|||
string content = 5;
|
||||
}
|
||||
|
||||
message Room {
|
||||
string name = 1;
|
||||
}
|
||||
message Room { string name = 1; }
|
||||
|
||||
message UserList { repeated User users = 1; }
|
||||
|
||||
message RoomList {repeated Room rooms = 1;}
|
||||
message RoomList { repeated Room rooms = 1; }
|
||||
|
||||
message User { string username = 1; }
|
||||
message User {
|
||||
string userId = 1;
|
||||
string username = 2;
|
||||
}
|
||||
|
||||
// RPC Messages
|
||||
|
||||
message ReceiveMsgsRequest { string userId = 1; }
|
||||
|
||||
message RoomActionResponse {}
|
||||
|
||||
message RoomAction {
|
||||
|
@ -38,7 +41,7 @@ service Chat {
|
|||
|
||||
// Messages
|
||||
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}
|
||||
rpc receiveMsgs(google.protobuf.Empty) returns (stream ChatMessage) {}
|
||||
rpc receiveMsgs(ReceiveMsgsRequest) returns (stream ChatMessage) {}
|
||||
|
||||
// List
|
||||
rpc listRooms(google.protobuf.Empty) returns (RoomList) {}
|
||||
|
|
|
@ -2,7 +2,12 @@ mod mux;
|
|||
|
||||
use futures::Stream;
|
||||
use hyper::Body;
|
||||
use mux::SharedMux;
|
||||
use mraow_common::chat_proto::{
|
||||
chat_server::{Chat, ChatServer},
|
||||
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
|
||||
UserList,
|
||||
};
|
||||
use mux::{SharedMux, TextMessage};
|
||||
use std::{
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
|
@ -10,13 +15,11 @@ use std::{
|
|||
};
|
||||
use tonic::{body::BoxBody, transport::Server, Request, Response, Status};
|
||||
use tower::{Layer, Service};
|
||||
use uuid::Uuid;
|
||||
|
||||
use mraow_common::chat_proto::{
|
||||
chat_server::{Chat, ChatServer},
|
||||
ChatMessage, RoomAction, RoomActionResponse, RoomList, User, UserList,
|
||||
};
|
||||
use crate::mux::{RoomId, RoomMessage, UserId};
|
||||
|
||||
type ResponseStream =
|
||||
pub type ResponseStream =
|
||||
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -40,16 +43,33 @@ impl Chat for ChatImpl {
|
|||
&self,
|
||||
request: Request<ChatMessage>,
|
||||
) -> Result<Response<()>, Status> {
|
||||
let request = request.into_inner();
|
||||
println!("Send message {request:?}");
|
||||
todo!()
|
||||
|
||||
// TODO: Get room id
|
||||
let room_id = RoomId(request.to_room_id);
|
||||
let message = RoomMessage::Text(TextMessage {
|
||||
content: request.content,
|
||||
});
|
||||
|
||||
self.mux.send_message_to_room(room_id, message);
|
||||
|
||||
Ok(Response::new(()))
|
||||
}
|
||||
|
||||
async fn receive_msgs(
|
||||
&self,
|
||||
request: Request<()>,
|
||||
request: Request<ReceiveMsgsRequest>,
|
||||
) -> Result<Response<Self::receiveMsgsStream>, Status> {
|
||||
let request = request.into_inner();
|
||||
println!("Receive message {request:?}");
|
||||
todo!()
|
||||
|
||||
let user_id = Uuid::parse_str(&request.user_id).unwrap();
|
||||
let user_id = UserId(user_id);
|
||||
let stream = self.mux.create_listen_stream(user_id);
|
||||
|
||||
// TODO: Handle drop
|
||||
Ok(Response::new(stream))
|
||||
}
|
||||
|
||||
async fn list_rooms(
|
||||
|
@ -67,27 +87,6 @@ impl Chat for ChatImpl {
|
|||
println!("Get all users {request:?}");
|
||||
todo!()
|
||||
}
|
||||
|
||||
/*
|
||||
async fn say_hello(
|
||||
&self,
|
||||
request: Request<HelloRequest>,
|
||||
) -> Result<Response<HelloReply>, Status> {
|
||||
let addr = request.remote_addr();
|
||||
let request = request.into_inner();
|
||||
|
||||
println!(
|
||||
"Got a request from {:?}: {}",
|
||||
addr,
|
||||
request.message
|
||||
);
|
||||
|
||||
let reply = HelloReply {
|
||||
message: format!("Hello {}!", request.name),
|
||||
};
|
||||
Ok(Response::new(reply))
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
|
|
@ -1,32 +1,122 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
ops::{Deref, DerefMut},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use anyhow::Result;
|
||||
use dashmap::DashMap;
|
||||
use futures::{future, Future, FutureExt, Sink, Stream};
|
||||
use tokio::{
|
||||
sync::{
|
||||
broadcast::{Receiver, Sender},
|
||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||
Mutex,
|
||||
},
|
||||
task::JoinHandle,
|
||||
use dashmap::{DashMap, DashSet};
|
||||
|
||||
use futures::{stream, FutureExt, Stream, StreamExt};
|
||||
use mraow_common::chat_proto::{
|
||||
chat_server::{Chat, ChatServer},
|
||||
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
|
||||
UserList,
|
||||
};
|
||||
use tokio::sync::{
|
||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||
Mutex,
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub struct UserId(Uuid);
|
||||
use crate::ResponseStream;
|
||||
|
||||
pub struct TextMessage {
|
||||
pub content: String,
|
||||
}
|
||||
|
||||
pub enum RoomMessage {
|
||||
Text(TextMessage),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
pub struct UserId(pub Uuid);
|
||||
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||
pub struct RoomId(pub String);
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
pub struct SharedMux(Arc<Mutex<Mux>>);
|
||||
pub struct SharedMux(Arc<Mux>);
|
||||
|
||||
impl Deref for SharedMux {
|
||||
type Target = Mux;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared server state
|
||||
#[derive(Default)]
|
||||
pub struct Mux {
|
||||
rooms: DashMap<String, Room>,
|
||||
rooms: DashMap<RoomId, Room>,
|
||||
users: DashMap<UserId, ConnectedUser>,
|
||||
}
|
||||
|
||||
impl Mux {
|
||||
pub fn user_join_room(&self, user_id: UserId, room_id: RoomId) {
|
||||
self
|
||||
.rooms
|
||||
.entry(room_id)
|
||||
.and_modify(|room| {
|
||||
room.members.insert(user_id.clone());
|
||||
})
|
||||
.or_insert_with(|| {
|
||||
let members = DashSet::new();
|
||||
members.insert(user_id);
|
||||
Room { members }
|
||||
});
|
||||
}
|
||||
|
||||
pub fn send_message_to_room(&self, room_id: RoomId, message: RoomMessage) {
|
||||
let message = Arc::new(message);
|
||||
|
||||
let recipients =
|
||||
match self.rooms.get(&room_id).map(|room| room.members.clone()) {
|
||||
Some(v) => v,
|
||||
None => return,
|
||||
};
|
||||
|
||||
for user_id in recipients {
|
||||
// TODO: This should technically never be None?
|
||||
let user = self.users.get(&user_id).unwrap();
|
||||
|
||||
user.tx.send(message.clone());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ensure_user(&self, user_id: UserId) {
|
||||
self.users.entry(user_id).or_insert_with(|| {
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
ConnectedUser { tx, rx: Some(rx) }
|
||||
});
|
||||
}
|
||||
|
||||
pub fn create_listen_stream(&self, user_id: UserId) -> ResponseStream {
|
||||
let rx = {
|
||||
self.ensure_user(user_id.clone());
|
||||
let mut user = self.users.get_mut(&user_id).unwrap();
|
||||
user.rx.take().unwrap()
|
||||
};
|
||||
let stream = UnboundedReceiverStream::new(rx);
|
||||
|
||||
stream
|
||||
.map(|message| {
|
||||
let chat_message = ChatMessage {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(chat_message)
|
||||
})
|
||||
.boxed()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Room {
|
||||
members: DashSet<UserId>,
|
||||
}
|
||||
|
||||
pub struct ConnectedUser {
|
||||
tx: UnboundedSender<Arc<RoomMessage>>,
|
||||
rx: Option<UnboundedReceiver<Arc<RoomMessage>>>,
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue