Compare commits
3 commits
b1ef494759
...
e78d6ddf99
Author | SHA1 | Date | |
---|---|---|---|
e78d6ddf99 | |||
080b4e0e09 | |||
d9f08b8deb |
14 changed files with 292 additions and 147 deletions
16
Cargo.lock
generated
16
Cargo.lock
generated
|
@ -609,6 +609,19 @@ dependencies = [
|
||||||
"syn 2.0.15",
|
"syn 2.0.15",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "dashmap"
|
||||||
|
version = "5.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"hashbrown",
|
||||||
|
"lock_api",
|
||||||
|
"once_cell",
|
||||||
|
"parking_lot_core",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "derive_more"
|
name = "derive_more"
|
||||||
version = "0.99.17"
|
version = "0.99.17"
|
||||||
|
@ -1777,6 +1790,7 @@ dependencies = [
|
||||||
"tauri-build",
|
"tauri-build",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tonic",
|
"tonic",
|
||||||
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1793,6 +1807,7 @@ name = "mraow-server"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
|
"dashmap",
|
||||||
"futures",
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"mraow-common",
|
"mraow-common",
|
||||||
|
@ -1805,6 +1820,7 @@ dependencies = [
|
||||||
"tower",
|
"tower",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
{
|
{
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
"build": "vite build",
|
||||||
"dev": "vite"
|
"dev": "vite"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
|
|
@ -21,6 +21,7 @@ mraow-common = { path = "../../common" }
|
||||||
tokio = { version = "1.28.0", features = ["full"] }
|
tokio = { version = "1.28.0", features = ["full"] }
|
||||||
anyhow = "1.0.71"
|
anyhow = "1.0.71"
|
||||||
tonic = "0.9.2"
|
tonic = "0.9.2"
|
||||||
|
uuid = { version = "1.3.2", features = ["v4"] }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["mraow-common/client"]
|
default = ["mraow-common/client"]
|
||||||
|
|
|
@ -1,21 +1,47 @@
|
||||||
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
|
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
|
||||||
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
|
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
|
||||||
|
|
||||||
use anyhow::Result;
|
use std::{sync::Arc, thread};
|
||||||
use mraow_common::chat_proto::chat_client::ChatClient;
|
|
||||||
use tauri::async_runtime::{Mutex, TokioHandle};
|
|
||||||
use tonic::transport::channel::Channel;
|
|
||||||
|
|
||||||
type MyGreeterClient = ChatClient<Channel>;
|
use anyhow::Result;
|
||||||
|
use mraow_common::chat_proto::{
|
||||||
|
chat_client::ChatClient, ChatMessage, ReceiveMsgsRequest, RoomAction,
|
||||||
|
};
|
||||||
|
use serde_json::json;
|
||||||
|
use tauri::{
|
||||||
|
async_runtime::{Mutex, TokioHandle},
|
||||||
|
Manager, State,
|
||||||
|
};
|
||||||
|
use tonic::{transport::channel::Channel, IntoRequest};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
type MyChatClient = Arc<Mutex<ChatClient<Channel>>>;
|
||||||
|
|
||||||
|
pub struct UserId(Uuid);
|
||||||
|
|
||||||
#[tauri::command]
|
#[tauri::command]
|
||||||
async fn send_message(
|
async fn send_message(
|
||||||
state: tauri::State<'_, Mutex<MyGreeterClient>>,
|
state: State<'_, MyChatClient>,
|
||||||
|
user_id: State<'_, UserId>,
|
||||||
message: String,
|
message: String,
|
||||||
) -> Result<(), ()> {
|
) -> Result<(), ()> {
|
||||||
println!("SHIET {state:?}");
|
println!("SHIET {state:?}");
|
||||||
|
|
||||||
// let mut client = state.lock().await;
|
let mut client = state.lock().await;
|
||||||
|
|
||||||
|
let user_id = user_id.inner();
|
||||||
|
let resp = client
|
||||||
|
.send_msg(ChatMessage {
|
||||||
|
from_user_id: user_id.0.to_string(),
|
||||||
|
to_room_id: "general".to_string(),
|
||||||
|
content: message,
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
println!("Sent message to server. {resp:?}");
|
||||||
|
|
||||||
/* client
|
/* client
|
||||||
.say_hello(HelloRequest {
|
.say_hello(HelloRequest {
|
||||||
message,
|
message,
|
||||||
|
@ -30,15 +56,51 @@ async fn send_message(
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
tauri::async_runtime::set(TokioHandle::current());
|
tauri::async_runtime::set(TokioHandle::current());
|
||||||
|
|
||||||
/*
|
let uuid = Uuid::new_v4();
|
||||||
TODO: Make sure this doesn't necessarily have to connect before starting the app
|
let user_id = UserId(uuid.clone());
|
||||||
let greeter_client = ChatClient::connect("http://[::1]:50051").await?;
|
|
||||||
let greeter_client = Mutex::new(greeter_client);
|
let chat_client = ChatClient::connect("http://[::1]:50051").await?;
|
||||||
println!("Connected :)");
|
let chat_client = Arc::new(Mutex::new(chat_client));
|
||||||
*/
|
let chat_client2 = chat_client.clone();
|
||||||
|
|
||||||
tauri::Builder::default()
|
tauri::Builder::default()
|
||||||
// .manage(greeter_client)
|
.setup(move |app| {
|
||||||
|
let main_window = app.get_window("main").unwrap();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut client = chat_client2.lock().await;
|
||||||
|
client
|
||||||
|
.room_action(RoomAction {
|
||||||
|
room_id: "general".to_string(),
|
||||||
|
user_id: uuid.to_string(),
|
||||||
|
action: "join".to_string(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let stream = client
|
||||||
|
.receive_msgs(ReceiveMsgsRequest {
|
||||||
|
user_id: uuid.to_string(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
std::mem::drop(client);
|
||||||
|
|
||||||
|
let mut stream = stream.into_inner();
|
||||||
|
while let Ok(Some(message)) = stream.message().await {
|
||||||
|
println!("SHIET message {message:?}");
|
||||||
|
|
||||||
|
main_window
|
||||||
|
.emit_all("new-message", json!({ "content" : message.content }))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.manage(user_id)
|
||||||
|
.manage(chat_client)
|
||||||
.invoke_handler(tauri::generate_handler![send_message])
|
.invoke_handler(tauri::generate_handler![send_message])
|
||||||
.run(tauri::generate_context!())
|
.run(tauri::generate_context!())
|
||||||
.expect("error while running tauri application");
|
.expect("error while running tauri application");
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
"icons/icon.icns",
|
"icons/icon.icns",
|
||||||
"icons/icon.ico"
|
"icons/icon.ico"
|
||||||
],
|
],
|
||||||
"identifier": "com.tauri.dev",
|
"identifier": "io.mzhang.mraow",
|
||||||
"longDescription": "",
|
"longDescription": "",
|
||||||
"macOS": {
|
"macOS": {
|
||||||
"entitlements": null,
|
"entitlements": null,
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import { Provider } from "react-redux";
|
import { Provider } from "react-redux";
|
||||||
import { store } from "./store";
|
import { store, useAppDispatch } from "./store";
|
||||||
import { useState } from "react";
|
|
||||||
|
|
||||||
import styles from "./App.module.scss";
|
import styles from "./App.module.scss";
|
||||||
import LeftSidebar from "./components/LeftSidebar";
|
import LeftSidebar from "./components/LeftSidebar";
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
import { invoke } from "@tauri-apps/api/tauri";
|
import { invoke } from "@tauri-apps/api/tauri";
|
||||||
import styles from "./CenterPanel.module.scss";
|
import styles from "./CenterPanel.module.scss";
|
||||||
import { useState } from "react";
|
import { useEffect, useState } from "react";
|
||||||
import { useAppDispatch, useAppSelector } from "../store";
|
import { useAppDispatch, useAppSelector } from "../store";
|
||||||
import { messageSelectors, messageSlice } from "../store/messages";
|
import { messageSelectors, messageSlice } from "../store/messages";
|
||||||
import { v4 as uuidv4 } from "uuid";
|
import { v4 as uuidv4 } from "uuid";
|
||||||
|
import { emit, listen } from "@tauri-apps/api/event";
|
||||||
|
import { appWindow, WebviewWindow } from "@tauri-apps/api/window";
|
||||||
|
|
||||||
export default function CenterPanel() {
|
export default function CenterPanel() {
|
||||||
const [currentMessage, setCurrentMessage] = useState("");
|
const [currentMessage, setCurrentMessage] = useState("");
|
||||||
|
@ -12,12 +14,32 @@ export default function CenterPanel() {
|
||||||
messageSelectors.selectAll(state)
|
messageSelectors.selectAll(state)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
let unlisten;
|
||||||
|
|
||||||
|
(async () => {
|
||||||
|
unlisten = await appWindow.listen("new-message", (event) => {
|
||||||
|
console.log("NEW EVENT", event);
|
||||||
|
const id = "lol";
|
||||||
|
const time = Date.now();
|
||||||
|
const content = event.payload.content;
|
||||||
|
dispatch(messageSlice.actions.addMessage({ id, time, content }));
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log("Listen handler active.");
|
||||||
|
})();
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
if (unlisten) unlisten();
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
const onSubmit = (e) => {
|
const onSubmit = (e) => {
|
||||||
e.preventDefault();
|
e.preventDefault();
|
||||||
invoke("send_message", { message: currentMessage });
|
invoke("send_message", { message: currentMessage });
|
||||||
|
|
||||||
const id = uuidv4();
|
const id = uuidv4();
|
||||||
const time = new Date();
|
const time = Date.now();
|
||||||
dispatch(
|
dispatch(
|
||||||
messageSlice.actions.addMessage({ id, time, content: currentMessage })
|
messageSlice.actions.addMessage({ id, time, content: currentMessage })
|
||||||
);
|
);
|
||||||
|
@ -31,7 +53,8 @@ export default function CenterPanel() {
|
||||||
<div className={styles.middlePart}>
|
<div className={styles.middlePart}>
|
||||||
{allMessages.map((msg) => (
|
{allMessages.map((msg) => (
|
||||||
<div key={msg.id}>
|
<div key={msg.id}>
|
||||||
<small>{msg.time.toISOString()}</small>
|
<small>{new Date(msg.time).toISOString()}</small>
|
||||||
|
|
||||||
{msg.content}
|
{msg.content}
|
||||||
</div>
|
</div>
|
||||||
))}
|
))}
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { RootState } from ".";
|
||||||
|
|
||||||
export type Message = {
|
export type Message = {
|
||||||
id: string;
|
id: string;
|
||||||
time: Date;
|
time: number;
|
||||||
content: string;
|
content: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1,2 +0,0 @@
|
||||||
{
|
|
||||||
}
|
|
|
@ -7,7 +7,7 @@ export default defineConfig({
|
||||||
|
|
||||||
// Tauri expects a fixed port, fail if that port is not available
|
// Tauri expects a fixed port, fail if that port is not available
|
||||||
server: {
|
server: {
|
||||||
strictPort: true,
|
strictPort: false,
|
||||||
},
|
},
|
||||||
|
|
||||||
// to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`,
|
// to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`,
|
||||||
|
|
|
@ -12,18 +12,21 @@ message ChatMessage {
|
||||||
string content = 5;
|
string content = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Room {
|
message Room { string name = 1; }
|
||||||
string name = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message UserList { repeated User users = 1; }
|
message UserList { repeated User users = 1; }
|
||||||
|
|
||||||
message RoomList {repeated Room rooms = 1;}
|
message RoomList { repeated Room rooms = 1; }
|
||||||
|
|
||||||
message User { string username = 1; }
|
message User {
|
||||||
|
string userId = 1;
|
||||||
|
string username = 2;
|
||||||
|
}
|
||||||
|
|
||||||
// RPC Messages
|
// RPC Messages
|
||||||
|
|
||||||
|
message ReceiveMsgsRequest { string userId = 1; }
|
||||||
|
|
||||||
message RoomActionResponse {}
|
message RoomActionResponse {}
|
||||||
|
|
||||||
message RoomAction {
|
message RoomAction {
|
||||||
|
@ -34,13 +37,13 @@ message RoomAction {
|
||||||
|
|
||||||
service Chat {
|
service Chat {
|
||||||
// Rooms
|
// Rooms
|
||||||
rpc roomAction(RoomAction) returns (JoinResponse) {}
|
rpc roomAction(RoomAction) returns (google.protobuf.Empty) {}
|
||||||
|
|
||||||
// Messages
|
// Messages
|
||||||
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}
|
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}
|
||||||
rpc receiveMsgs(google.protobuf.Empty) returns (stream ChatMessage) {}
|
rpc receiveMsgs(ReceiveMsgsRequest) returns (stream ChatMessage) {}
|
||||||
|
|
||||||
// List
|
// List
|
||||||
rpc listRooms(google.protobuf.Empty) returns (RoomList) {}
|
rpc listRooms(google.protobuf.Empty) returns (RoomList) {}
|
||||||
rpc getAllUsers(google.protobuf.Empty) returns (UserList) {}
|
rpc getAllUsers(google.protobuf.Empty) returns (UserList) {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ tracing = "0.1.37"
|
||||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||||
|
|
||||||
mraow-common = { path = "../common" }
|
mraow-common = { path = "../common" }
|
||||||
|
uuid = { version = "1.3.2", features = ["v4"] }
|
||||||
|
dashmap = "5.4.0"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = "0.9.2"
|
tonic-build = "0.9.2"
|
||||||
|
|
|
@ -2,6 +2,12 @@ mod mux;
|
||||||
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
|
use mraow_common::chat_proto::{
|
||||||
|
chat_server::{Chat, ChatServer},
|
||||||
|
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
|
||||||
|
UserList,
|
||||||
|
};
|
||||||
|
use mux::{SharedMux, TextMessage};
|
||||||
use std::{
|
use std::{
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
@ -9,17 +15,17 @@ use std::{
|
||||||
};
|
};
|
||||||
use tonic::{body::BoxBody, transport::Server, Request, Response, Status};
|
use tonic::{body::BoxBody, transport::Server, Request, Response, Status};
|
||||||
use tower::{Layer, Service};
|
use tower::{Layer, Service};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use mraow_common::chat_proto::{
|
use crate::mux::{RoomId, RoomMessage, UserId};
|
||||||
chat_server::{Chat, ChatServer},
|
|
||||||
ChatMessage, RoomAction, RoomActionResponse, RoomList, User, UserList,
|
|
||||||
};
|
|
||||||
|
|
||||||
type ResponseStream =
|
pub type ResponseStream =
|
||||||
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
|
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ChatImpl {}
|
pub struct ChatImpl {
|
||||||
|
mux: SharedMux,
|
||||||
|
}
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl Chat for ChatImpl {
|
impl Chat for ChatImpl {
|
||||||
|
@ -28,25 +34,49 @@ impl Chat for ChatImpl {
|
||||||
async fn room_action(
|
async fn room_action(
|
||||||
&self,
|
&self,
|
||||||
request: Request<RoomAction>,
|
request: Request<RoomAction>,
|
||||||
) -> Result<Response<RoomActionResponse>, Status> {
|
) -> Result<Response<()>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
println!("Join {request:?}");
|
println!("Join {request:?}");
|
||||||
todo!()
|
|
||||||
|
let user_id = Uuid::parse_str(&request.user_id).unwrap();
|
||||||
|
let user_id = UserId(user_id);
|
||||||
|
let room_id = RoomId(request.room_id);
|
||||||
|
self.mux.user_join_room(user_id, room_id);
|
||||||
|
|
||||||
|
Ok(Response::new(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_msg(
|
async fn send_msg(
|
||||||
&self,
|
&self,
|
||||||
request: Request<ChatMessage>,
|
request: Request<ChatMessage>,
|
||||||
) -> Result<Response<()>, Status> {
|
) -> Result<Response<()>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
println!("Send message {request:?}");
|
println!("Send message {request:?}");
|
||||||
todo!()
|
|
||||||
|
// TODO: Get room id
|
||||||
|
let room_id = RoomId(request.to_room_id);
|
||||||
|
let message = RoomMessage::Text(TextMessage {
|
||||||
|
content: request.content,
|
||||||
|
});
|
||||||
|
|
||||||
|
self.mux.send_message_to_room(room_id, message);
|
||||||
|
|
||||||
|
Ok(Response::new(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_msg(
|
async fn receive_msgs(
|
||||||
&self,
|
&self,
|
||||||
request: Request<()>,
|
request: Request<ReceiveMsgsRequest>,
|
||||||
) -> Result<Response<Self::receiveMsgStream>, Status> {
|
) -> Result<Response<Self::receiveMsgsStream>, Status> {
|
||||||
|
let request = request.into_inner();
|
||||||
println!("Receive message {request:?}");
|
println!("Receive message {request:?}");
|
||||||
todo!()
|
|
||||||
|
let user_id = Uuid::parse_str(&request.user_id).unwrap();
|
||||||
|
let user_id = UserId(user_id);
|
||||||
|
let stream = self.mux.create_listen_stream(user_id);
|
||||||
|
|
||||||
|
// TODO: Handle drop
|
||||||
|
Ok(Response::new(stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_rooms(
|
async fn list_rooms(
|
||||||
|
@ -64,27 +94,6 @@ impl Chat for ChatImpl {
|
||||||
println!("Get all users {request:?}");
|
println!("Get all users {request:?}");
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
async fn say_hello(
|
|
||||||
&self,
|
|
||||||
request: Request<HelloRequest>,
|
|
||||||
) -> Result<Response<HelloReply>, Status> {
|
|
||||||
let addr = request.remote_addr();
|
|
||||||
let request = request.into_inner();
|
|
||||||
|
|
||||||
println!(
|
|
||||||
"Got a request from {:?}: {}",
|
|
||||||
addr,
|
|
||||||
request.message
|
|
||||||
);
|
|
||||||
|
|
||||||
let reply = HelloReply {
|
|
||||||
message: format!("Hello {}!", request.name),
|
|
||||||
};
|
|
||||||
Ok(Response::new(reply))
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
|
@ -1,106 +1,137 @@
|
||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
ops::{Deref, DerefMut},
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use anyhow::Result;
|
use dashmap::{DashMap, DashSet};
|
||||||
use futures::{future, Future, FutureExt, Sink, Stream};
|
|
||||||
use tokio::{
|
use futures::{stream, FutureExt, Stream, StreamExt};
|
||||||
sync::{
|
use mraow_common::chat_proto::{
|
||||||
broadcast::{Receiver, Sender},
|
chat_server::{Chat, ChatServer},
|
||||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
|
||||||
},
|
UserList,
|
||||||
task::JoinHandle,
|
|
||||||
};
|
};
|
||||||
|
use tokio::sync::{
|
||||||
|
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||||
|
Mutex,
|
||||||
|
};
|
||||||
|
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
#[derive(Clone)]
|
use crate::ResponseStream;
|
||||||
pub struct SharedMux(Mutex<Mux>);
|
|
||||||
|
|
||||||
/// Shared server state
|
#[derive(Debug)]
|
||||||
pub struct Mux {
|
pub struct TextMessage {
|
||||||
rooms: HashMap<String, Room>,
|
pub content: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Debug)]
|
||||||
pub enum RoomMessage {}
|
pub enum RoomMessage {
|
||||||
|
Text(TextMessage),
|
||||||
pub struct Room {
|
|
||||||
name: String,
|
|
||||||
tx: Sender<RoomMessage>,
|
|
||||||
rx: Receiver<RoomMessage>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Room {
|
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||||
pub fn get_handle(&self) -> RoomHandle {
|
pub struct UserId(pub Uuid);
|
||||||
RoomHandle {
|
|
||||||
name: self.name.clone(),
|
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
||||||
tx: self.tx.clone(),
|
pub struct RoomId(pub String);
|
||||||
rx: self.tx.subscribe(),
|
|
||||||
}
|
#[derive(Clone, Default)]
|
||||||
|
pub struct SharedMux(Arc<Mux>);
|
||||||
|
|
||||||
|
impl Deref for SharedMux {
|
||||||
|
type Target = Mux;
|
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target {
|
||||||
|
&self.0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct RoomHandle {
|
/// Shared server state
|
||||||
name: String,
|
#[derive(Default)]
|
||||||
tx: Sender<RoomMessage>,
|
pub struct Mux {
|
||||||
rx: Receiver<RoomMessage>,
|
rooms: DashMap<RoomId, Room>,
|
||||||
|
users: DashMap<UserId, ConnectedUser>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum CompositeListenerMessage {
|
impl Mux {
|
||||||
Room { name: String, message: RoomMessage },
|
pub fn user_join_room(&self, user_id: UserId, room_id: RoomId) {
|
||||||
JoinRoom(String),
|
self
|
||||||
DropRoom(String),
|
.rooms
|
||||||
}
|
.entry(room_id)
|
||||||
|
.and_modify(|room| {
|
||||||
|
room.members.insert(user_id.clone());
|
||||||
|
})
|
||||||
|
.or_insert_with(|| {
|
||||||
|
let members = DashSet::new();
|
||||||
|
members.insert(user_id);
|
||||||
|
Room { members }
|
||||||
|
});
|
||||||
|
|
||||||
pub struct CompositeListener {
|
println!("Joined room. Rooms: {:?}", self.rooms);
|
||||||
current_listening_rooms: HashMap<String, RoomHandle>,
|
}
|
||||||
}
|
|
||||||
|
|
||||||
impl CompositeListener {}
|
pub fn send_message_to_room(&self, room_id: RoomId, message: RoomMessage) {
|
||||||
|
let message = Arc::new(message);
|
||||||
|
|
||||||
/// Create a "composite" listener that listens to multiple rooms / streams, and
|
let recipients =
|
||||||
/// is able to add more streams in the middle as well
|
match self.rooms.get(&room_id).map(|room| room.members.clone()) {
|
||||||
pub fn create_listener() -> (
|
Some(v) => v,
|
||||||
JoinHandle<()>,
|
None => return,
|
||||||
UnboundedSender<CompositeListenerMessage>,
|
};
|
||||||
UnboundedReceiver<CompositeListenerMessage>,
|
|
||||||
) {
|
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
/*
|
println!("Sending message {message:?} to recipients:");
|
||||||
let join_handle = tokio::spawn(async {
|
for user_id in recipients {
|
||||||
let mut currently_listening_to: HashMap<String, RoomHandle> =
|
// TODO: This should technically never be None?
|
||||||
HashMap::new();
|
let user = self.users.get(&user_id).unwrap();
|
||||||
|
println!(" - user: {user:?}");
|
||||||
|
|
||||||
loop {
|
user.tx.send(message.clone());
|
||||||
// Build select targets
|
|
||||||
let ouais = rx.recv().boxed();
|
|
||||||
let mut select_targets: Vec<
|
|
||||||
Pin<Box<dyn Future<Output = Option<CompositeListenerMessage>>>>,
|
|
||||||
> = vec![ouais];
|
|
||||||
for (_, room_handle) in currently_listening_to.iter() {
|
|
||||||
select_targets.push(
|
|
||||||
room_handle
|
|
||||||
.rx
|
|
||||||
.recv()
|
|
||||||
.map(|f| {
|
|
||||||
f.ok().map(|m| CompositeListenerMessage::Room {
|
|
||||||
name: room_handle.name.clone(),
|
|
||||||
message: m,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.boxed(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Select
|
|
||||||
let result = future::select_all(select_targets).await;
|
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
|
||||||
(join_handle, tx, rx)
|
pub fn ensure_user(&self, user_id: UserId) {
|
||||||
*/
|
self.users.entry(user_id).or_insert_with(|| {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
ConnectedUser { tx, rx: Some(rx) }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
todo!()
|
pub fn create_listen_stream(&self, user_id: UserId) -> ResponseStream {
|
||||||
|
let rx = {
|
||||||
|
self.ensure_user(user_id.clone());
|
||||||
|
let mut user = self.users.get_mut(&user_id).unwrap();
|
||||||
|
user.rx.take().unwrap()
|
||||||
|
};
|
||||||
|
let stream = UnboundedReceiverStream::new(rx);
|
||||||
|
|
||||||
|
stream
|
||||||
|
.map(move |message| {
|
||||||
|
println!("Sending message to {user_id:?}: {message:?}");
|
||||||
|
|
||||||
|
let message = match message.as_ref() {
|
||||||
|
RoomMessage::Text(v) => v,
|
||||||
|
};
|
||||||
|
|
||||||
|
let chat_message = ChatMessage {
|
||||||
|
content: message.content.clone(),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(chat_message)
|
||||||
|
})
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Room {
|
||||||
|
members: DashSet<UserId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectedUser {
|
||||||
|
tx: UnboundedSender<Arc<RoomMessage>>,
|
||||||
|
rx: Option<UnboundedReceiver<Arc<RoomMessage>>>,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue