Compare commits

...

3 commits

Author SHA1 Message Date
Michael Zhang e78d6ddf99
Chat ...kinda works? 2023-05-09 15:19:19 -05:00
Michael Zhang 080b4e0e09
Get send message working 2023-05-09 12:49:30 -05:00
Michael Zhang d9f08b8deb
fix 2023-05-09 11:37:52 -05:00
14 changed files with 292 additions and 147 deletions

16
Cargo.lock generated
View file

@ -609,6 +609,19 @@ dependencies = [
"syn 2.0.15", "syn 2.0.15",
] ]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "derive_more" name = "derive_more"
version = "0.99.17" version = "0.99.17"
@ -1777,6 +1790,7 @@ dependencies = [
"tauri-build", "tauri-build",
"tokio", "tokio",
"tonic", "tonic",
"uuid",
] ]
[[package]] [[package]]
@ -1793,6 +1807,7 @@ name = "mraow-server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"dashmap",
"futures", "futures",
"hyper", "hyper",
"mraow-common", "mraow-common",
@ -1805,6 +1820,7 @@ dependencies = [
"tower", "tower",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"uuid",
] ]
[[package]] [[package]]

View file

@ -1,5 +1,6 @@
{ {
"scripts": { "scripts": {
"build": "vite build",
"dev": "vite" "dev": "vite"
}, },
"devDependencies": { "devDependencies": {

View file

@ -21,6 +21,7 @@ mraow-common = { path = "../../common" }
tokio = { version = "1.28.0", features = ["full"] } tokio = { version = "1.28.0", features = ["full"] }
anyhow = "1.0.71" anyhow = "1.0.71"
tonic = "0.9.2" tonic = "0.9.2"
uuid = { version = "1.3.2", features = ["v4"] }
[features] [features]
default = ["mraow-common/client"] default = ["mraow-common/client"]

View file

@ -1,21 +1,47 @@
// Prevents additional console window on Windows in release, DO NOT REMOVE!! // Prevents additional console window on Windows in release, DO NOT REMOVE!!
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
use anyhow::Result; use std::{sync::Arc, thread};
use mraow_common::chat_proto::chat_client::ChatClient;
use tauri::async_runtime::{Mutex, TokioHandle};
use tonic::transport::channel::Channel;
type MyGreeterClient = ChatClient<Channel>; use anyhow::Result;
use mraow_common::chat_proto::{
chat_client::ChatClient, ChatMessage, ReceiveMsgsRequest, RoomAction,
};
use serde_json::json;
use tauri::{
async_runtime::{Mutex, TokioHandle},
Manager, State,
};
use tonic::{transport::channel::Channel, IntoRequest};
use uuid::Uuid;
type MyChatClient = Arc<Mutex<ChatClient<Channel>>>;
pub struct UserId(Uuid);
#[tauri::command] #[tauri::command]
async fn send_message( async fn send_message(
state: tauri::State<'_, Mutex<MyGreeterClient>>, state: State<'_, MyChatClient>,
user_id: State<'_, UserId>,
message: String, message: String,
) -> Result<(), ()> { ) -> Result<(), ()> {
println!("SHIET {state:?}"); println!("SHIET {state:?}");
// let mut client = state.lock().await; let mut client = state.lock().await;
let user_id = user_id.inner();
let resp = client
.send_msg(ChatMessage {
from_user_id: user_id.0.to_string(),
to_room_id: "general".to_string(),
content: message,
..Default::default()
})
.await
.unwrap();
println!("Sent message to server. {resp:?}");
/* client /* client
.say_hello(HelloRequest { .say_hello(HelloRequest {
message, message,
@ -30,15 +56,51 @@ async fn send_message(
async fn main() -> Result<()> { async fn main() -> Result<()> {
tauri::async_runtime::set(TokioHandle::current()); tauri::async_runtime::set(TokioHandle::current());
/* let uuid = Uuid::new_v4();
TODO: Make sure this doesn't necessarily have to connect before starting the app let user_id = UserId(uuid.clone());
let greeter_client = ChatClient::connect("http://[::1]:50051").await?;
let greeter_client = Mutex::new(greeter_client); let chat_client = ChatClient::connect("http://[::1]:50051").await?;
println!("Connected :)"); let chat_client = Arc::new(Mutex::new(chat_client));
*/ let chat_client2 = chat_client.clone();
tauri::Builder::default() tauri::Builder::default()
// .manage(greeter_client) .setup(move |app| {
let main_window = app.get_window("main").unwrap();
tokio::spawn(async move {
let mut client = chat_client2.lock().await;
client
.room_action(RoomAction {
room_id: "general".to_string(),
user_id: uuid.to_string(),
action: "join".to_string(),
})
.await
.unwrap();
let stream = client
.receive_msgs(ReceiveMsgsRequest {
user_id: uuid.to_string(),
})
.await
.unwrap();
std::mem::drop(client);
let mut stream = stream.into_inner();
while let Ok(Some(message)) = stream.message().await {
println!("SHIET message {message:?}");
main_window
.emit_all("new-message", json!({ "content" : message.content }))
.unwrap();
}
});
Ok(())
})
.manage(user_id)
.manage(chat_client)
.invoke_handler(tauri::generate_handler![send_message]) .invoke_handler(tauri::generate_handler![send_message])
.run(tauri::generate_context!()) .run(tauri::generate_context!())
.expect("error while running tauri application"); .expect("error while running tauri application");

View file

@ -29,7 +29,7 @@
"icons/icon.icns", "icons/icon.icns",
"icons/icon.ico" "icons/icon.ico"
], ],
"identifier": "com.tauri.dev", "identifier": "io.mzhang.mraow",
"longDescription": "", "longDescription": "",
"macOS": { "macOS": {
"entitlements": null, "entitlements": null,

View file

@ -1,6 +1,5 @@
import { Provider } from "react-redux"; import { Provider } from "react-redux";
import { store } from "./store"; import { store, useAppDispatch } from "./store";
import { useState } from "react";
import styles from "./App.module.scss"; import styles from "./App.module.scss";
import LeftSidebar from "./components/LeftSidebar"; import LeftSidebar from "./components/LeftSidebar";

View file

@ -1,9 +1,11 @@
import { invoke } from "@tauri-apps/api/tauri"; import { invoke } from "@tauri-apps/api/tauri";
import styles from "./CenterPanel.module.scss"; import styles from "./CenterPanel.module.scss";
import { useState } from "react"; import { useEffect, useState } from "react";
import { useAppDispatch, useAppSelector } from "../store"; import { useAppDispatch, useAppSelector } from "../store";
import { messageSelectors, messageSlice } from "../store/messages"; import { messageSelectors, messageSlice } from "../store/messages";
import { v4 as uuidv4 } from "uuid"; import { v4 as uuidv4 } from "uuid";
import { emit, listen } from "@tauri-apps/api/event";
import { appWindow, WebviewWindow } from "@tauri-apps/api/window";
export default function CenterPanel() { export default function CenterPanel() {
const [currentMessage, setCurrentMessage] = useState(""); const [currentMessage, setCurrentMessage] = useState("");
@ -12,12 +14,32 @@ export default function CenterPanel() {
messageSelectors.selectAll(state) messageSelectors.selectAll(state)
); );
useEffect(() => {
let unlisten;
(async () => {
unlisten = await appWindow.listen("new-message", (event) => {
console.log("NEW EVENT", event);
const id = "lol";
const time = Date.now();
const content = event.payload.content;
dispatch(messageSlice.actions.addMessage({ id, time, content }));
});
console.log("Listen handler active.");
})();
return () => {
if (unlisten) unlisten();
};
});
const onSubmit = (e) => { const onSubmit = (e) => {
e.preventDefault(); e.preventDefault();
invoke("send_message", { message: currentMessage }); invoke("send_message", { message: currentMessage });
const id = uuidv4(); const id = uuidv4();
const time = new Date(); const time = Date.now();
dispatch( dispatch(
messageSlice.actions.addMessage({ id, time, content: currentMessage }) messageSlice.actions.addMessage({ id, time, content: currentMessage })
); );
@ -31,7 +53,8 @@ export default function CenterPanel() {
<div className={styles.middlePart}> <div className={styles.middlePart}>
{allMessages.map((msg) => ( {allMessages.map((msg) => (
<div key={msg.id}> <div key={msg.id}>
<small>{msg.time.toISOString()}</small> <small>{new Date(msg.time).toISOString()}</small>
&nbsp;
{msg.content} {msg.content}
</div> </div>
))} ))}

View file

@ -3,7 +3,7 @@ import { RootState } from ".";
export type Message = { export type Message = {
id: string; id: string;
time: Date; time: number;
content: string; content: string;
}; };

View file

@ -1,2 +0,0 @@
{
}

View file

@ -7,7 +7,7 @@ export default defineConfig({
// Tauri expects a fixed port, fail if that port is not available // Tauri expects a fixed port, fail if that port is not available
server: { server: {
strictPort: true, strictPort: false,
}, },
// to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`, // to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`,

View file

@ -12,18 +12,21 @@ message ChatMessage {
string content = 5; string content = 5;
} }
message Room { message Room { string name = 1; }
string name = 1;
}
message UserList { repeated User users = 1; } message UserList { repeated User users = 1; }
message RoomList {repeated Room rooms = 1;} message RoomList { repeated Room rooms = 1; }
message User { string username = 1; } message User {
string userId = 1;
string username = 2;
}
// RPC Messages // RPC Messages
message ReceiveMsgsRequest { string userId = 1; }
message RoomActionResponse {} message RoomActionResponse {}
message RoomAction { message RoomAction {
@ -34,11 +37,11 @@ message RoomAction {
service Chat { service Chat {
// Rooms // Rooms
rpc roomAction(RoomAction) returns (JoinResponse) {} rpc roomAction(RoomAction) returns (google.protobuf.Empty) {}
// Messages // Messages
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {} rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}
rpc receiveMsgs(google.protobuf.Empty) returns (stream ChatMessage) {} rpc receiveMsgs(ReceiveMsgsRequest) returns (stream ChatMessage) {}
// List // List
rpc listRooms(google.protobuf.Empty) returns (RoomList) {} rpc listRooms(google.protobuf.Empty) returns (RoomList) {}

View file

@ -20,6 +20,8 @@ tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
mraow-common = { path = "../common" } mraow-common = { path = "../common" }
uuid = { version = "1.3.2", features = ["v4"] }
dashmap = "5.4.0"
[build-dependencies] [build-dependencies]
tonic-build = "0.9.2" tonic-build = "0.9.2"

View file

@ -2,6 +2,12 @@ mod mux;
use futures::Stream; use futures::Stream;
use hyper::Body; use hyper::Body;
use mraow_common::chat_proto::{
chat_server::{Chat, ChatServer},
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
UserList,
};
use mux::{SharedMux, TextMessage};
use std::{ use std::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
@ -9,17 +15,17 @@ use std::{
}; };
use tonic::{body::BoxBody, transport::Server, Request, Response, Status}; use tonic::{body::BoxBody, transport::Server, Request, Response, Status};
use tower::{Layer, Service}; use tower::{Layer, Service};
use uuid::Uuid;
use mraow_common::chat_proto::{ use crate::mux::{RoomId, RoomMessage, UserId};
chat_server::{Chat, ChatServer},
ChatMessage, RoomAction, RoomActionResponse, RoomList, User, UserList,
};
type ResponseStream = pub type ResponseStream =
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>; Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
#[derive(Default)] #[derive(Default)]
pub struct ChatImpl {} pub struct ChatImpl {
mux: SharedMux,
}
#[tonic::async_trait] #[tonic::async_trait]
impl Chat for ChatImpl { impl Chat for ChatImpl {
@ -28,25 +34,49 @@ impl Chat for ChatImpl {
async fn room_action( async fn room_action(
&self, &self,
request: Request<RoomAction>, request: Request<RoomAction>,
) -> Result<Response<RoomActionResponse>, Status> { ) -> Result<Response<()>, Status> {
let request = request.into_inner();
println!("Join {request:?}"); println!("Join {request:?}");
todo!()
let user_id = Uuid::parse_str(&request.user_id).unwrap();
let user_id = UserId(user_id);
let room_id = RoomId(request.room_id);
self.mux.user_join_room(user_id, room_id);
Ok(Response::new(()))
} }
async fn send_msg( async fn send_msg(
&self, &self,
request: Request<ChatMessage>, request: Request<ChatMessage>,
) -> Result<Response<()>, Status> { ) -> Result<Response<()>, Status> {
let request = request.into_inner();
println!("Send message {request:?}"); println!("Send message {request:?}");
todo!()
// TODO: Get room id
let room_id = RoomId(request.to_room_id);
let message = RoomMessage::Text(TextMessage {
content: request.content,
});
self.mux.send_message_to_room(room_id, message);
Ok(Response::new(()))
} }
async fn receive_msg( async fn receive_msgs(
&self, &self,
request: Request<()>, request: Request<ReceiveMsgsRequest>,
) -> Result<Response<Self::receiveMsgStream>, Status> { ) -> Result<Response<Self::receiveMsgsStream>, Status> {
let request = request.into_inner();
println!("Receive message {request:?}"); println!("Receive message {request:?}");
todo!()
let user_id = Uuid::parse_str(&request.user_id).unwrap();
let user_id = UserId(user_id);
let stream = self.mux.create_listen_stream(user_id);
// TODO: Handle drop
Ok(Response::new(stream))
} }
async fn list_rooms( async fn list_rooms(
@ -64,27 +94,6 @@ impl Chat for ChatImpl {
println!("Get all users {request:?}"); println!("Get all users {request:?}");
todo!() todo!()
} }
/*
async fn say_hello(
&self,
request: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
let addr = request.remote_addr();
let request = request.into_inner();
println!(
"Got a request from {:?}: {}",
addr,
request.message
);
let reply = HelloReply {
message: format!("Hello {}!", request.name),
};
Ok(Response::new(reply))
}
*/
} }
#[tokio::main] #[tokio::main]

View file

@ -1,106 +1,137 @@
use std::{ use std::{
collections::HashMap, ops::{Deref, DerefMut},
pin::Pin, pin::Pin,
task::{Context, Poll}, sync::Arc,
}; };
use anyhow::Result; use dashmap::{DashMap, DashSet};
use futures::{future, Future, FutureExt, Sink, Stream};
use tokio::{ use futures::{stream, FutureExt, Stream, StreamExt};
sync::{ use mraow_common::chat_proto::{
broadcast::{Receiver, Sender}, chat_server::{Chat, ChatServer},
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
UserList,
};
use tokio::sync::{
mpsc::{self, UnboundedReceiver, UnboundedSender}, mpsc::{self, UnboundedReceiver, UnboundedSender},
}, Mutex,
task::JoinHandle,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream;
use uuid::Uuid;
#[derive(Clone)] use crate::ResponseStream;
pub struct SharedMux(Mutex<Mux>);
#[derive(Debug)]
pub struct TextMessage {
pub content: String,
}
#[derive(Debug)]
pub enum RoomMessage {
Text(TextMessage),
}
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct UserId(pub Uuid);
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct RoomId(pub String);
#[derive(Clone, Default)]
pub struct SharedMux(Arc<Mux>);
impl Deref for SharedMux {
type Target = Mux;
fn deref(&self) -> &Self::Target {
&self.0
}
}
/// Shared server state /// Shared server state
#[derive(Default)]
pub struct Mux { pub struct Mux {
rooms: HashMap<String, Room>, rooms: DashMap<RoomId, Room>,
users: DashMap<UserId, ConnectedUser>,
} }
#[derive(Clone)] impl Mux {
pub enum RoomMessage {} pub fn user_join_room(&self, user_id: UserId, room_id: RoomId) {
self
pub struct Room { .rooms
name: String, .entry(room_id)
tx: Sender<RoomMessage>, .and_modify(|room| {
rx: Receiver<RoomMessage>, room.members.insert(user_id.clone());
}
impl Room {
pub fn get_handle(&self) -> RoomHandle {
RoomHandle {
name: self.name.clone(),
tx: self.tx.clone(),
rx: self.tx.subscribe(),
}
}
}
pub struct RoomHandle {
name: String,
tx: Sender<RoomMessage>,
rx: Receiver<RoomMessage>,
}
pub enum CompositeListenerMessage {
Room { name: String, message: RoomMessage },
JoinRoom(String),
DropRoom(String),
}
pub struct CompositeListener {
current_listening_rooms: HashMap<String, RoomHandle>,
}
impl CompositeListener {}
/// Create a "composite" listener that listens to multiple rooms / streams, and
/// is able to add more streams in the middle as well
pub fn create_listener() -> (
JoinHandle<()>,
UnboundedSender<CompositeListenerMessage>,
UnboundedReceiver<CompositeListenerMessage>,
) {
let (tx, rx) = mpsc::unbounded_channel();
/*
let join_handle = tokio::spawn(async {
let mut currently_listening_to: HashMap<String, RoomHandle> =
HashMap::new();
loop {
// Build select targets
let ouais = rx.recv().boxed();
let mut select_targets: Vec<
Pin<Box<dyn Future<Output = Option<CompositeListenerMessage>>>>,
> = vec![ouais];
for (_, room_handle) in currently_listening_to.iter() {
select_targets.push(
room_handle
.rx
.recv()
.map(|f| {
f.ok().map(|m| CompositeListenerMessage::Room {
name: room_handle.name.clone(),
message: m,
}) })
}) .or_insert_with(|| {
.boxed(), let members = DashSet::new();
); members.insert(user_id);
} Room { members }
// Select
let result = future::select_all(select_targets).await;
}
}); });
(join_handle, tx, rx) println!("Joined room. Rooms: {:?}", self.rooms);
*/ }
todo!() pub fn send_message_to_room(&self, room_id: RoomId, message: RoomMessage) {
let message = Arc::new(message);
let recipients =
match self.rooms.get(&room_id).map(|room| room.members.clone()) {
Some(v) => v,
None => return,
};
println!("Sending message {message:?} to recipients:");
for user_id in recipients {
// TODO: This should technically never be None?
let user = self.users.get(&user_id).unwrap();
println!(" - user: {user:?}");
user.tx.send(message.clone());
}
}
pub fn ensure_user(&self, user_id: UserId) {
self.users.entry(user_id).or_insert_with(|| {
let (tx, rx) = mpsc::unbounded_channel();
ConnectedUser { tx, rx: Some(rx) }
});
}
pub fn create_listen_stream(&self, user_id: UserId) -> ResponseStream {
let rx = {
self.ensure_user(user_id.clone());
let mut user = self.users.get_mut(&user_id).unwrap();
user.rx.take().unwrap()
};
let stream = UnboundedReceiverStream::new(rx);
stream
.map(move |message| {
println!("Sending message to {user_id:?}: {message:?}");
let message = match message.as_ref() {
RoomMessage::Text(v) => v,
};
let chat_message = ChatMessage {
content: message.content.clone(),
..Default::default()
};
Ok(chat_message)
})
.boxed()
}
}
#[derive(Debug)]
pub struct Room {
members: DashSet<UserId>,
}
#[derive(Debug)]
pub struct ConnectedUser {
tx: UnboundedSender<Arc<RoomMessage>>,
rx: Option<UnboundedReceiver<Arc<RoomMessage>>>,
} }