Compare commits
No commits in common. "e78d6ddf996156b9b2f5788cd827be93c3203b79" and "b1ef49475912b82928195e37e7966b839c112fff" have entirely different histories.
e78d6ddf99
...
b1ef494759
14 changed files with 151 additions and 296 deletions
16
Cargo.lock
generated
16
Cargo.lock
generated
|
@ -609,19 +609,6 @@ dependencies = [
|
||||||
"syn 2.0.15",
|
"syn 2.0.15",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "dashmap"
|
|
||||||
version = "5.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
|
|
||||||
dependencies = [
|
|
||||||
"cfg-if",
|
|
||||||
"hashbrown",
|
|
||||||
"lock_api",
|
|
||||||
"once_cell",
|
|
||||||
"parking_lot_core",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "derive_more"
|
name = "derive_more"
|
||||||
version = "0.99.17"
|
version = "0.99.17"
|
||||||
|
@ -1790,7 +1777,6 @@ dependencies = [
|
||||||
"tauri-build",
|
"tauri-build",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tonic",
|
"tonic",
|
||||||
"uuid",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1807,7 +1793,6 @@ name = "mraow-server"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"anyhow",
|
"anyhow",
|
||||||
"dashmap",
|
|
||||||
"futures",
|
"futures",
|
||||||
"hyper",
|
"hyper",
|
||||||
"mraow-common",
|
"mraow-common",
|
||||||
|
@ -1820,7 +1805,6 @@ dependencies = [
|
||||||
"tower",
|
"tower",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
"uuid",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
{
|
{
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"build": "vite build",
|
|
||||||
"dev": "vite"
|
"dev": "vite"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
|
|
@ -21,7 +21,6 @@ mraow-common = { path = "../../common" }
|
||||||
tokio = { version = "1.28.0", features = ["full"] }
|
tokio = { version = "1.28.0", features = ["full"] }
|
||||||
anyhow = "1.0.71"
|
anyhow = "1.0.71"
|
||||||
tonic = "0.9.2"
|
tonic = "0.9.2"
|
||||||
uuid = { version = "1.3.2", features = ["v4"] }
|
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["mraow-common/client"]
|
default = ["mraow-common/client"]
|
||||||
|
|
|
@ -1,47 +1,21 @@
|
||||||
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
|
// Prevents additional console window on Windows in release, DO NOT REMOVE!!
|
||||||
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
|
#![cfg_attr(not(debug_assertions), windows_subsystem = "windows")]
|
||||||
|
|
||||||
use std::{sync::Arc, thread};
|
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use mraow_common::chat_proto::{
|
use mraow_common::chat_proto::chat_client::ChatClient;
|
||||||
chat_client::ChatClient, ChatMessage, ReceiveMsgsRequest, RoomAction,
|
use tauri::async_runtime::{Mutex, TokioHandle};
|
||||||
};
|
use tonic::transport::channel::Channel;
|
||||||
use serde_json::json;
|
|
||||||
use tauri::{
|
|
||||||
async_runtime::{Mutex, TokioHandle},
|
|
||||||
Manager, State,
|
|
||||||
};
|
|
||||||
use tonic::{transport::channel::Channel, IntoRequest};
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
type MyChatClient = Arc<Mutex<ChatClient<Channel>>>;
|
type MyGreeterClient = ChatClient<Channel>;
|
||||||
|
|
||||||
pub struct UserId(Uuid);
|
|
||||||
|
|
||||||
#[tauri::command]
|
#[tauri::command]
|
||||||
async fn send_message(
|
async fn send_message(
|
||||||
state: State<'_, MyChatClient>,
|
state: tauri::State<'_, Mutex<MyGreeterClient>>,
|
||||||
user_id: State<'_, UserId>,
|
|
||||||
message: String,
|
message: String,
|
||||||
) -> Result<(), ()> {
|
) -> Result<(), ()> {
|
||||||
println!("SHIET {state:?}");
|
println!("SHIET {state:?}");
|
||||||
|
|
||||||
let mut client = state.lock().await;
|
// let mut client = state.lock().await;
|
||||||
|
|
||||||
let user_id = user_id.inner();
|
|
||||||
let resp = client
|
|
||||||
.send_msg(ChatMessage {
|
|
||||||
from_user_id: user_id.0.to_string(),
|
|
||||||
to_room_id: "general".to_string(),
|
|
||||||
content: message,
|
|
||||||
..Default::default()
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
println!("Sent message to server. {resp:?}");
|
|
||||||
|
|
||||||
/* client
|
/* client
|
||||||
.say_hello(HelloRequest {
|
.say_hello(HelloRequest {
|
||||||
message,
|
message,
|
||||||
|
@ -56,51 +30,15 @@ async fn send_message(
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
tauri::async_runtime::set(TokioHandle::current());
|
tauri::async_runtime::set(TokioHandle::current());
|
||||||
|
|
||||||
let uuid = Uuid::new_v4();
|
/*
|
||||||
let user_id = UserId(uuid.clone());
|
TODO: Make sure this doesn't necessarily have to connect before starting the app
|
||||||
|
let greeter_client = ChatClient::connect("http://[::1]:50051").await?;
|
||||||
let chat_client = ChatClient::connect("http://[::1]:50051").await?;
|
let greeter_client = Mutex::new(greeter_client);
|
||||||
let chat_client = Arc::new(Mutex::new(chat_client));
|
println!("Connected :)");
|
||||||
let chat_client2 = chat_client.clone();
|
*/
|
||||||
|
|
||||||
tauri::Builder::default()
|
tauri::Builder::default()
|
||||||
.setup(move |app| {
|
// .manage(greeter_client)
|
||||||
let main_window = app.get_window("main").unwrap();
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut client = chat_client2.lock().await;
|
|
||||||
client
|
|
||||||
.room_action(RoomAction {
|
|
||||||
room_id: "general".to_string(),
|
|
||||||
user_id: uuid.to_string(),
|
|
||||||
action: "join".to_string(),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let stream = client
|
|
||||||
.receive_msgs(ReceiveMsgsRequest {
|
|
||||||
user_id: uuid.to_string(),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
std::mem::drop(client);
|
|
||||||
|
|
||||||
let mut stream = stream.into_inner();
|
|
||||||
while let Ok(Some(message)) = stream.message().await {
|
|
||||||
println!("SHIET message {message:?}");
|
|
||||||
|
|
||||||
main_window
|
|
||||||
.emit_all("new-message", json!({ "content" : message.content }))
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.manage(user_id)
|
|
||||||
.manage(chat_client)
|
|
||||||
.invoke_handler(tauri::generate_handler![send_message])
|
.invoke_handler(tauri::generate_handler![send_message])
|
||||||
.run(tauri::generate_context!())
|
.run(tauri::generate_context!())
|
||||||
.expect("error while running tauri application");
|
.expect("error while running tauri application");
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
"icons/icon.icns",
|
"icons/icon.icns",
|
||||||
"icons/icon.ico"
|
"icons/icon.ico"
|
||||||
],
|
],
|
||||||
"identifier": "io.mzhang.mraow",
|
"identifier": "com.tauri.dev",
|
||||||
"longDescription": "",
|
"longDescription": "",
|
||||||
"macOS": {
|
"macOS": {
|
||||||
"entitlements": null,
|
"entitlements": null,
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import { Provider } from "react-redux";
|
import { Provider } from "react-redux";
|
||||||
import { store, useAppDispatch } from "./store";
|
import { store } from "./store";
|
||||||
|
import { useState } from "react";
|
||||||
|
|
||||||
import styles from "./App.module.scss";
|
import styles from "./App.module.scss";
|
||||||
import LeftSidebar from "./components/LeftSidebar";
|
import LeftSidebar from "./components/LeftSidebar";
|
||||||
|
|
|
@ -1,11 +1,9 @@
|
||||||
import { invoke } from "@tauri-apps/api/tauri";
|
import { invoke } from "@tauri-apps/api/tauri";
|
||||||
import styles from "./CenterPanel.module.scss";
|
import styles from "./CenterPanel.module.scss";
|
||||||
import { useEffect, useState } from "react";
|
import { useState } from "react";
|
||||||
import { useAppDispatch, useAppSelector } from "../store";
|
import { useAppDispatch, useAppSelector } from "../store";
|
||||||
import { messageSelectors, messageSlice } from "../store/messages";
|
import { messageSelectors, messageSlice } from "../store/messages";
|
||||||
import { v4 as uuidv4 } from "uuid";
|
import { v4 as uuidv4 } from "uuid";
|
||||||
import { emit, listen } from "@tauri-apps/api/event";
|
|
||||||
import { appWindow, WebviewWindow } from "@tauri-apps/api/window";
|
|
||||||
|
|
||||||
export default function CenterPanel() {
|
export default function CenterPanel() {
|
||||||
const [currentMessage, setCurrentMessage] = useState("");
|
const [currentMessage, setCurrentMessage] = useState("");
|
||||||
|
@ -14,32 +12,12 @@ export default function CenterPanel() {
|
||||||
messageSelectors.selectAll(state)
|
messageSelectors.selectAll(state)
|
||||||
);
|
);
|
||||||
|
|
||||||
useEffect(() => {
|
|
||||||
let unlisten;
|
|
||||||
|
|
||||||
(async () => {
|
|
||||||
unlisten = await appWindow.listen("new-message", (event) => {
|
|
||||||
console.log("NEW EVENT", event);
|
|
||||||
const id = "lol";
|
|
||||||
const time = Date.now();
|
|
||||||
const content = event.payload.content;
|
|
||||||
dispatch(messageSlice.actions.addMessage({ id, time, content }));
|
|
||||||
});
|
|
||||||
|
|
||||||
console.log("Listen handler active.");
|
|
||||||
})();
|
|
||||||
|
|
||||||
return () => {
|
|
||||||
if (unlisten) unlisten();
|
|
||||||
};
|
|
||||||
});
|
|
||||||
|
|
||||||
const onSubmit = (e) => {
|
const onSubmit = (e) => {
|
||||||
e.preventDefault();
|
e.preventDefault();
|
||||||
invoke("send_message", { message: currentMessage });
|
invoke("send_message", { message: currentMessage });
|
||||||
|
|
||||||
const id = uuidv4();
|
const id = uuidv4();
|
||||||
const time = Date.now();
|
const time = new Date();
|
||||||
dispatch(
|
dispatch(
|
||||||
messageSlice.actions.addMessage({ id, time, content: currentMessage })
|
messageSlice.actions.addMessage({ id, time, content: currentMessage })
|
||||||
);
|
);
|
||||||
|
@ -53,8 +31,7 @@ export default function CenterPanel() {
|
||||||
<div className={styles.middlePart}>
|
<div className={styles.middlePart}>
|
||||||
{allMessages.map((msg) => (
|
{allMessages.map((msg) => (
|
||||||
<div key={msg.id}>
|
<div key={msg.id}>
|
||||||
<small>{new Date(msg.time).toISOString()}</small>
|
<small>{msg.time.toISOString()}</small>
|
||||||
|
|
||||||
{msg.content}
|
{msg.content}
|
||||||
</div>
|
</div>
|
||||||
))}
|
))}
|
||||||
|
|
|
@ -3,7 +3,7 @@ import { RootState } from ".";
|
||||||
|
|
||||||
export type Message = {
|
export type Message = {
|
||||||
id: string;
|
id: string;
|
||||||
time: number;
|
time: Date;
|
||||||
content: string;
|
content: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
2
client/tauri.conf.json5
Normal file
2
client/tauri.conf.json5
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
{
|
||||||
|
}
|
|
@ -7,7 +7,7 @@ export default defineConfig({
|
||||||
|
|
||||||
// Tauri expects a fixed port, fail if that port is not available
|
// Tauri expects a fixed port, fail if that port is not available
|
||||||
server: {
|
server: {
|
||||||
strictPort: false,
|
strictPort: true,
|
||||||
},
|
},
|
||||||
|
|
||||||
// to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`,
|
// to make use of `TAURI_PLATFORM`, `TAURI_ARCH`, `TAURI_FAMILY`,
|
||||||
|
|
|
@ -12,21 +12,18 @@ message ChatMessage {
|
||||||
string content = 5;
|
string content = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Room { string name = 1; }
|
message Room {
|
||||||
|
string name = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message UserList { repeated User users = 1; }
|
message UserList { repeated User users = 1; }
|
||||||
|
|
||||||
message RoomList {repeated Room rooms = 1;}
|
message RoomList {repeated Room rooms = 1;}
|
||||||
|
|
||||||
message User {
|
message User { string username = 1; }
|
||||||
string userId = 1;
|
|
||||||
string username = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// RPC Messages
|
// RPC Messages
|
||||||
|
|
||||||
message ReceiveMsgsRequest { string userId = 1; }
|
|
||||||
|
|
||||||
message RoomActionResponse {}
|
message RoomActionResponse {}
|
||||||
|
|
||||||
message RoomAction {
|
message RoomAction {
|
||||||
|
@ -37,11 +34,11 @@ message RoomAction {
|
||||||
|
|
||||||
service Chat {
|
service Chat {
|
||||||
// Rooms
|
// Rooms
|
||||||
rpc roomAction(RoomAction) returns (google.protobuf.Empty) {}
|
rpc roomAction(RoomAction) returns (JoinResponse) {}
|
||||||
|
|
||||||
// Messages
|
// Messages
|
||||||
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}
|
rpc sendMsg(ChatMessage) returns (google.protobuf.Empty) {}
|
||||||
rpc receiveMsgs(ReceiveMsgsRequest) returns (stream ChatMessage) {}
|
rpc receiveMsgs(google.protobuf.Empty) returns (stream ChatMessage) {}
|
||||||
|
|
||||||
// List
|
// List
|
||||||
rpc listRooms(google.protobuf.Empty) returns (RoomList) {}
|
rpc listRooms(google.protobuf.Empty) returns (RoomList) {}
|
||||||
|
|
|
@ -20,8 +20,6 @@ tracing = "0.1.37"
|
||||||
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
|
||||||
|
|
||||||
mraow-common = { path = "../common" }
|
mraow-common = { path = "../common" }
|
||||||
uuid = { version = "1.3.2", features = ["v4"] }
|
|
||||||
dashmap = "5.4.0"
|
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = "0.9.2"
|
tonic-build = "0.9.2"
|
||||||
|
|
|
@ -2,12 +2,6 @@ mod mux;
|
||||||
|
|
||||||
use futures::Stream;
|
use futures::Stream;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use mraow_common::chat_proto::{
|
|
||||||
chat_server::{Chat, ChatServer},
|
|
||||||
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
|
|
||||||
UserList,
|
|
||||||
};
|
|
||||||
use mux::{SharedMux, TextMessage};
|
|
||||||
use std::{
|
use std::{
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
|
@ -15,17 +9,17 @@ use std::{
|
||||||
};
|
};
|
||||||
use tonic::{body::BoxBody, transport::Server, Request, Response, Status};
|
use tonic::{body::BoxBody, transport::Server, Request, Response, Status};
|
||||||
use tower::{Layer, Service};
|
use tower::{Layer, Service};
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::mux::{RoomId, RoomMessage, UserId};
|
use mraow_common::chat_proto::{
|
||||||
|
chat_server::{Chat, ChatServer},
|
||||||
|
ChatMessage, RoomAction, RoomActionResponse, RoomList, User, UserList,
|
||||||
|
};
|
||||||
|
|
||||||
pub type ResponseStream =
|
type ResponseStream =
|
||||||
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
|
Pin<Box<dyn Stream<Item = Result<ChatMessage, Status>> + Send>>;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
pub struct ChatImpl {
|
pub struct ChatImpl {}
|
||||||
mux: SharedMux,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
#[tonic::async_trait]
|
||||||
impl Chat for ChatImpl {
|
impl Chat for ChatImpl {
|
||||||
|
@ -34,49 +28,25 @@ impl Chat for ChatImpl {
|
||||||
async fn room_action(
|
async fn room_action(
|
||||||
&self,
|
&self,
|
||||||
request: Request<RoomAction>,
|
request: Request<RoomAction>,
|
||||||
) -> Result<Response<()>, Status> {
|
) -> Result<Response<RoomActionResponse>, Status> {
|
||||||
let request = request.into_inner();
|
|
||||||
println!("Join {request:?}");
|
println!("Join {request:?}");
|
||||||
|
todo!()
|
||||||
let user_id = Uuid::parse_str(&request.user_id).unwrap();
|
|
||||||
let user_id = UserId(user_id);
|
|
||||||
let room_id = RoomId(request.room_id);
|
|
||||||
self.mux.user_join_room(user_id, room_id);
|
|
||||||
|
|
||||||
Ok(Response::new(()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_msg(
|
async fn send_msg(
|
||||||
&self,
|
&self,
|
||||||
request: Request<ChatMessage>,
|
request: Request<ChatMessage>,
|
||||||
) -> Result<Response<()>, Status> {
|
) -> Result<Response<()>, Status> {
|
||||||
let request = request.into_inner();
|
|
||||||
println!("Send message {request:?}");
|
println!("Send message {request:?}");
|
||||||
|
todo!()
|
||||||
// TODO: Get room id
|
|
||||||
let room_id = RoomId(request.to_room_id);
|
|
||||||
let message = RoomMessage::Text(TextMessage {
|
|
||||||
content: request.content,
|
|
||||||
});
|
|
||||||
|
|
||||||
self.mux.send_message_to_room(room_id, message);
|
|
||||||
|
|
||||||
Ok(Response::new(()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn receive_msgs(
|
async fn receive_msg(
|
||||||
&self,
|
&self,
|
||||||
request: Request<ReceiveMsgsRequest>,
|
request: Request<()>,
|
||||||
) -> Result<Response<Self::receiveMsgsStream>, Status> {
|
) -> Result<Response<Self::receiveMsgStream>, Status> {
|
||||||
let request = request.into_inner();
|
|
||||||
println!("Receive message {request:?}");
|
println!("Receive message {request:?}");
|
||||||
|
todo!()
|
||||||
let user_id = Uuid::parse_str(&request.user_id).unwrap();
|
|
||||||
let user_id = UserId(user_id);
|
|
||||||
let stream = self.mux.create_listen_stream(user_id);
|
|
||||||
|
|
||||||
// TODO: Handle drop
|
|
||||||
Ok(Response::new(stream))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_rooms(
|
async fn list_rooms(
|
||||||
|
@ -94,6 +64,27 @@ impl Chat for ChatImpl {
|
||||||
println!("Get all users {request:?}");
|
println!("Get all users {request:?}");
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
async fn say_hello(
|
||||||
|
&self,
|
||||||
|
request: Request<HelloRequest>,
|
||||||
|
) -> Result<Response<HelloReply>, Status> {
|
||||||
|
let addr = request.remote_addr();
|
||||||
|
let request = request.into_inner();
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"Got a request from {:?}: {}",
|
||||||
|
addr,
|
||||||
|
request.message
|
||||||
|
);
|
||||||
|
|
||||||
|
let reply = HelloReply {
|
||||||
|
message: format!("Hello {}!", request.name),
|
||||||
|
};
|
||||||
|
Ok(Response::new(reply))
|
||||||
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
|
|
|
@ -1,137 +1,106 @@
|
||||||
use std::{
|
use std::{
|
||||||
ops::{Deref, DerefMut},
|
collections::HashMap,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::Arc,
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
|
|
||||||
use dashmap::{DashMap, DashSet};
|
use anyhow::Result;
|
||||||
|
use futures::{future, Future, FutureExt, Sink, Stream};
|
||||||
use futures::{stream, FutureExt, Stream, StreamExt};
|
use tokio::{
|
||||||
use mraow_common::chat_proto::{
|
sync::{
|
||||||
chat_server::{Chat, ChatServer},
|
broadcast::{Receiver, Sender},
|
||||||
ChatMessage, ReceiveMsgsRequest, RoomAction, RoomActionResponse, RoomList,
|
|
||||||
UserList,
|
|
||||||
};
|
|
||||||
use tokio::sync::{
|
|
||||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
||||||
Mutex,
|
},
|
||||||
|
task::JoinHandle,
|
||||||
};
|
};
|
||||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::ResponseStream;
|
#[derive(Clone)]
|
||||||
|
pub struct SharedMux(Mutex<Mux>);
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct TextMessage {
|
|
||||||
pub content: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub enum RoomMessage {
|
|
||||||
Text(TextMessage),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
|
||||||
pub struct UserId(pub Uuid);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
|
||||||
pub struct RoomId(pub String);
|
|
||||||
|
|
||||||
#[derive(Clone, Default)]
|
|
||||||
pub struct SharedMux(Arc<Mux>);
|
|
||||||
|
|
||||||
impl Deref for SharedMux {
|
|
||||||
type Target = Mux;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Shared server state
|
/// Shared server state
|
||||||
#[derive(Default)]
|
|
||||||
pub struct Mux {
|
pub struct Mux {
|
||||||
rooms: DashMap<RoomId, Room>,
|
rooms: HashMap<String, Room>,
|
||||||
users: DashMap<UserId, ConnectedUser>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Mux {
|
#[derive(Clone)]
|
||||||
pub fn user_join_room(&self, user_id: UserId, room_id: RoomId) {
|
pub enum RoomMessage {}
|
||||||
self
|
|
||||||
.rooms
|
|
||||||
.entry(room_id)
|
|
||||||
.and_modify(|room| {
|
|
||||||
room.members.insert(user_id.clone());
|
|
||||||
})
|
|
||||||
.or_insert_with(|| {
|
|
||||||
let members = DashSet::new();
|
|
||||||
members.insert(user_id);
|
|
||||||
Room { members }
|
|
||||||
});
|
|
||||||
|
|
||||||
println!("Joined room. Rooms: {:?}", self.rooms);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn send_message_to_room(&self, room_id: RoomId, message: RoomMessage) {
|
|
||||||
let message = Arc::new(message);
|
|
||||||
|
|
||||||
let recipients =
|
|
||||||
match self.rooms.get(&room_id).map(|room| room.members.clone()) {
|
|
||||||
Some(v) => v,
|
|
||||||
None => return,
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("Sending message {message:?} to recipients:");
|
|
||||||
for user_id in recipients {
|
|
||||||
// TODO: This should technically never be None?
|
|
||||||
let user = self.users.get(&user_id).unwrap();
|
|
||||||
println!(" - user: {user:?}");
|
|
||||||
|
|
||||||
user.tx.send(message.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn ensure_user(&self, user_id: UserId) {
|
|
||||||
self.users.entry(user_id).or_insert_with(|| {
|
|
||||||
let (tx, rx) = mpsc::unbounded_channel();
|
|
||||||
ConnectedUser { tx, rx: Some(rx) }
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn create_listen_stream(&self, user_id: UserId) -> ResponseStream {
|
|
||||||
let rx = {
|
|
||||||
self.ensure_user(user_id.clone());
|
|
||||||
let mut user = self.users.get_mut(&user_id).unwrap();
|
|
||||||
user.rx.take().unwrap()
|
|
||||||
};
|
|
||||||
let stream = UnboundedReceiverStream::new(rx);
|
|
||||||
|
|
||||||
stream
|
|
||||||
.map(move |message| {
|
|
||||||
println!("Sending message to {user_id:?}: {message:?}");
|
|
||||||
|
|
||||||
let message = match message.as_ref() {
|
|
||||||
RoomMessage::Text(v) => v,
|
|
||||||
};
|
|
||||||
|
|
||||||
let chat_message = ChatMessage {
|
|
||||||
content: message.content.clone(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(chat_message)
|
|
||||||
})
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Room {
|
pub struct Room {
|
||||||
members: DashSet<UserId>,
|
name: String,
|
||||||
|
tx: Sender<RoomMessage>,
|
||||||
|
rx: Receiver<RoomMessage>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
impl Room {
|
||||||
pub struct ConnectedUser {
|
pub fn get_handle(&self) -> RoomHandle {
|
||||||
tx: UnboundedSender<Arc<RoomMessage>>,
|
RoomHandle {
|
||||||
rx: Option<UnboundedReceiver<Arc<RoomMessage>>>,
|
name: self.name.clone(),
|
||||||
|
tx: self.tx.clone(),
|
||||||
|
rx: self.tx.subscribe(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct RoomHandle {
|
||||||
|
name: String,
|
||||||
|
tx: Sender<RoomMessage>,
|
||||||
|
rx: Receiver<RoomMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum CompositeListenerMessage {
|
||||||
|
Room { name: String, message: RoomMessage },
|
||||||
|
JoinRoom(String),
|
||||||
|
DropRoom(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CompositeListener {
|
||||||
|
current_listening_rooms: HashMap<String, RoomHandle>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CompositeListener {}
|
||||||
|
|
||||||
|
/// Create a "composite" listener that listens to multiple rooms / streams, and
|
||||||
|
/// is able to add more streams in the middle as well
|
||||||
|
pub fn create_listener() -> (
|
||||||
|
JoinHandle<()>,
|
||||||
|
UnboundedSender<CompositeListenerMessage>,
|
||||||
|
UnboundedReceiver<CompositeListenerMessage>,
|
||||||
|
) {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
/*
|
||||||
|
let join_handle = tokio::spawn(async {
|
||||||
|
let mut currently_listening_to: HashMap<String, RoomHandle> =
|
||||||
|
HashMap::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Build select targets
|
||||||
|
let ouais = rx.recv().boxed();
|
||||||
|
let mut select_targets: Vec<
|
||||||
|
Pin<Box<dyn Future<Output = Option<CompositeListenerMessage>>>>,
|
||||||
|
> = vec![ouais];
|
||||||
|
for (_, room_handle) in currently_listening_to.iter() {
|
||||||
|
select_targets.push(
|
||||||
|
room_handle
|
||||||
|
.rx
|
||||||
|
.recv()
|
||||||
|
.map(|f| {
|
||||||
|
f.ok().map(|m| CompositeListenerMessage::Room {
|
||||||
|
name: room_handle.name.clone(),
|
||||||
|
message: m,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.boxed(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Select
|
||||||
|
let result = future::select_all(select_targets).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
(join_handle, tx, rx)
|
||||||
|
*/
|
||||||
|
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue