This commit is contained in:
Michael Zhang 2023-08-10 16:10:48 -05:00
commit 70acdb39b4
14 changed files with 3947 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/target
/node_modules

1549
Cargo.lock generated Normal file

File diff suppressed because it is too large Load diff

23
Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "grub"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.72"
automerge = "0.5.1"
axum = { version = "0.6.20", features = ["http2", "macros", "ws"] }
chrono = { version = "0.4.26", features = ["serde"] }
ciborium = "0.2.1"
dashmap = "5.5.0"
derivative = "2.2.0"
flume = "0.10.14"
futures = "0.3.28"
serde = { version = "1.0.183", features = ["derive"] }
serde_json = "1.0.104"
tokio = { version = "1.30.0", features = ["full"] }
uuid = "1.4.1"
[dependencies.mzlib]
git = "https://git.mzhang.io/michael/mzlib"
features = ["axum"]

17
client/App.tsx Normal file
View file

@ -0,0 +1,17 @@
import { v4 } from "uuid";
import Room from "./Room";
export default function App() {
const x = new URL(window.location.href);
const roomId = x.pathname.replace(/^\//, "");
if (!roomId) {
location.href = `/${v4()}`;
}
return (
<>
Hellosu, <Room roomId={roomId} />
</>
);
}

200
client/Room.tsx Normal file
View file

@ -0,0 +1,200 @@
import CBOR from "cbor";
import { useEffect, useState } from "react";
import useWebSocket, { ReadyState } from "react-use-websocket";
import * as Automerge from "@automerge/automerge";
import * as uuid from "uuid";
const connectionStatusMap = {
[ReadyState.CONNECTING]: "Connecting",
[ReadyState.OPEN]: "Open",
[ReadyState.CLOSING]: "Closing",
[ReadyState.CLOSED]: "Closed",
[ReadyState.UNINSTANTIATED]: "Uninstantiated",
};
export default function Room({ roomId }) {
const [readyState, setReadyState] = useState(ReadyState.CLOSED);
const [connectedClients, setConnectedClients] = useState([]);
const [clientId, setClientId] = useState(null);
const [chats, setChats] = useState([]);
const [doc, setDoc] = useState(Automerge.init());
const [syncState, setSyncState] = useState(Automerge.initSyncState());
function updateDoc(newDoc) {
setDoc(newDoc);
}
const {
sendMessage,
lastJsonMessage,
readyState: newReadyState,
} = useWebSocket("ws://localhost:3100/ws", {
onOpen: ({}) => {
console.log("Shiet, connected.");
},
onMessage: async (event) => {
const data = CBOR.decode(await event.data.arrayBuffer());
console.log("received", data);
if (data.type === "ServerHello") {
setClientId(uuid.stringify(data.client_id));
}
if (data.type === "RoomClientList") {
setConnectedClients(data.clients.map((x) => uuid.stringify(x)));
}
if (data.type === "ChatMessage") {
setChats([...chats, data]);
}
if (data.type === "Automerge") {
const [nextDoc, nextSyncState, patch] = Automerge.receiveSyncMessage(
doc,
syncState,
data.message[1]
);
setDoc(nextDoc);
setSyncState(nextSyncState);
console.log("patch", patch);
}
},
});
function sendWtfMessage(data) {
let cbor = CBOR.encode(data);
console.log(
"cbor-encoded",
[...new Uint8Array(cbor)]
.map((x) => x.toString(16).padStart(2, "0"))
.join(" ")
);
sendMessage(cbor);
}
useEffect(() => {
console.log("hellosu", readyState, newReadyState);
if (
readyState === ReadyState.CONNECTING &&
newReadyState === ReadyState.OPEN
) {
// On Open
sendWtfMessage({ type: "JoinRoom", room_id: roomId });
console.log("Sent connection message");
}
setReadyState(newReadyState);
}, [newReadyState]);
const connectionStatus = connectionStatusMap[readyState];
return (
<>
<p>Room Id: {roomId}</p>
<p>Connection status: {connectionStatus}</p>
{newReadyState === ReadyState.OPEN && (
<ReadyPart
doc={doc}
updateDoc={updateDoc}
syncState={syncState}
chats={chats}
roomId={roomId}
clientId={clientId}
connectedClients={connectedClients}
sendWtfMessage={sendWtfMessage}
/>
)}
</>
);
}
function ReadyPart({
doc,
syncState,
roomId,
clientId,
chats,
connectedClients,
sendWtfMessage,
updateDoc,
}) {
const [message, setMessage] = useState("");
const [addItemName, setAddItemName] = useState("");
function onSubmit(e) {
e.preventDefault();
sendWtfMessage({
type: "ChatMessage",
timestamp: new Date().toISOString(),
message_id: uuid.v4(),
room_id: roomId,
author: clientId,
content: message,
});
setMessage("");
}
const items = doc.items || [];
function addItem(e) {
e.preventDefault();
const newDoc = Automerge.change(doc, (doc) => {
if (!doc.items) doc.items = [];
doc.items.push({
id: uuid.v4(),
content: addItemName,
});
});
updateDoc(newDoc);
const [syncMessage, binary] = Automerge.generateSyncMessage(doc, syncState);
if (syncMessage)
sendWtfMessage({
type: "Automerge",
client_id: clientId,
room_id: roomId,
message: binary,
});
setAddItemName("");
}
return (
<>
Connected:
<ul>
{connectedClients.map((x) => (
<li key={x}>{x}</li>
))}
</ul>
Messages:
<ul>
{chats.map((x) => (
<li key={x.message_id}>
[{x.timestamp}] {uuid.stringify(x.author)}: {x.content}
</li>
))}
</ul>
<form onSubmit={onSubmit}>
<input
type="text"
value={message}
onChange={(e) => setMessage(e.target.value)}
placeholder="Type a message..."
/>
</form>
Grubs:
<ul>
{items.map((x) => (
<li key={x.id}>{x.content}</li>
))}
</ul>
<form onSubmit={addItem}>
<input
type="text"
value={addItemName}
onChange={(e) => setAddItemName(e.target.value)}
placeholder="Type a message..."
/>
<button type="submit">add item</button>
</form>
</>
);
}

6
client/index.ts Normal file
View file

@ -0,0 +1,6 @@
import { createRoot } from "react-dom/client";
import App from "./App";
const el = document.getElementById("app")!;
const root = createRoot(el);
root.render(App());

7
index.html Normal file
View file

@ -0,0 +1,7 @@
<!DOCTYPE html>
<html lang="en">
<body>
<div id="app"></div>
<script type="module" src="client/index.ts"></script>
</body>
</html>

1739
package-lock.json generated Normal file

File diff suppressed because it is too large Load diff

27
package.json Normal file
View file

@ -0,0 +1,27 @@
{
"scripts": {
"dev": "vite"
},
"devDependencies": {
"@types/react": "^18.2.20",
"@types/react-dom": "^18.2.7",
"@types/uuid": "^9.0.2",
"@vitejs/plugin-react": "^4.0.4",
"typescript": "^5.1.6",
"vite": "^4.4.9",
"vite-plugin-top-level-await": "^1.3.1",
"vite-plugin-wasm": "^3.2.2"
},
"dependencies": {
"@automerge/automerge": "^2.0.3",
"cbor": "npm:@jprochazk/cbor@^0.5.0",
"localforage": "^1.10.0",
"match-sorter": "^6.3.1",
"react": "^18.2.0",
"react-dom": "^18.2.0",
"react-router-dom": "^6.15.0",
"react-use-websocket": "^4.3.1",
"sort-by": "^1.2.0",
"uuid": "^9.0.0"
}
}

4
rustfmt.toml Normal file
View file

@ -0,0 +1,4 @@
max_width = 80
tab_spaces = 2
wrap_comments = true
fn_single_line = true

271
src/main.rs Normal file
View file

@ -0,0 +1,271 @@
pub mod message;
#[macro_use]
extern crate derivative;
use std::{io::Cursor, mem, sync::Arc};
use automerge::{
sync::{State as SyncState, SyncDoc},
Automerge, Value,
};
use axum::{
extract::{
ws::{Message as WsMessage, WebSocket, WebSocketUpgrade},
State,
},
response::Response,
routing::get,
Router,
};
use chrono::{DateTime, Utc};
use dashmap::{DashMap, DashSet};
use flume::r#async::SendSink;
use futures::{stream, FutureExt, SinkExt, StreamExt};
use message::Message;
use mzlib::axum_error::Result;
use uuid::Uuid;
#[derive(Default, Clone)]
struct AppState {
clients: Clients,
rooms: Rooms,
}
#[derive(Default, Clone)]
struct Clients(Arc<DashMap<Uuid, Client>>);
struct Client {
writer: SendSink<'static, Result<axum::extract::ws::Message, axum::Error>>,
rooms: DashSet<Uuid>,
}
#[derive(Default, Clone)]
struct Rooms(Arc<DashMap<Uuid, Room>>);
#[derive(Default)]
struct Room {
document: Automerge,
sync_state: SyncState,
connected_clients: DashSet<Uuid>,
}
impl Room {
pub fn split_borrow(&mut self) -> (&mut Automerge, &mut SyncState) {
(&mut self.document, &mut self.sync_state)
}
}
#[tokio::main]
async fn main() -> Result<()> {
let state = AppState::default();
let app = Router::new().route("/ws", get(handler)).with_state(state);
axum::Server::bind(&"0.0.0.0:3100".parse().unwrap())
.serve(app.into_make_service())
.await?;
Ok(())
}
async fn handler(
ws: WebSocketUpgrade,
State(state): State<AppState>,
) -> Response {
ws.on_upgrade(|socket| handle_socket(state, socket).map(|res| res.unwrap()))
}
async fn handle_socket(state: AppState, socket: WebSocket) -> Result<()> {
let (mut socket_tx, socket_rx) = socket.split();
// Generate an ID for this connection
let client_id = Uuid::new_v4();
println!("Connected client {client_id:?}");
// First, let's create multiplexed versions of these
let mut socket_tx = {
let (tx, rx) = flume::unbounded();
tokio::spawn(rx.into_stream().forward(socket_tx));
tx.into_sink()
};
let mut socket_rx = {
let (tx, rx) = flume::unbounded();
tokio::spawn(
socket_rx
.forward(tx.into_sink().sink_map_err(|err| axum::Error::new(err))),
);
rx.into_stream()
};
// Now register this client into the set of clients
let rooms = DashSet::default();
let client = Client {
writer: socket_tx.clone(),
rooms: rooms.clone(),
};
state.clients.0.insert(client_id, client);
// Send a hello message
socket_tx
.send(Ok(
Message::ServerHello {
timestamp: Utc::now(),
client_id: client_id.clone(),
}
.into_ws_message()?,
))
.await?;
// Wait for messages
while let Some(msg) = socket_rx.next().await {
match msg {
WsMessage::Binary(ref bin) => {
let cursor = Cursor::new(bin);
let message = ciborium::from_reader::<Message, _>(cursor)?;
println!("received message: {message:?}");
match &message {
Message::Automerge {
client_id,
room_id,
message: inner_message,
} => {
let mut room = state
.rooms
.0
.entry(room_id.clone())
.or_insert_with(|| Room::default());
rooms.insert(room_id.clone());
// Update local state
{
let (document, sync_state) = room.split_borrow();
document
.receive_sync_message(sync_state, inner_message.clone().0)?;
}
println!("inner doc: {:?}", room.document);
// Remove current client, so send to everyone else
let mut connected_clients = room.connected_clients.clone();
connected_clients.remove(client_id);
// Send to everyone else
let message = message.clone();
broadcast(&state, connected_clients, message).await;
}
Message::JoinRoom { room_id } => {
let room = state
.rooms
.0
.entry(room_id.clone())
.or_insert_with(|| Room::default());
rooms.insert(room_id.clone());
// Add to the list of connected clients
room.connected_clients.insert(client_id);
let connected_clients = room.connected_clients.clone();
mem::drop(room);
println!("Added to room");
// Tell each clients which clients are connected
broadcast(
&state,
connected_clients.clone(),
Message::RoomClientList {
room_id: room_id.clone(),
clients: connected_clients.into_iter().collect(),
},
)
.await;
}
Message::ChatMessage {
timestamp,
message_id,
room_id,
author,
content,
} => {
let room = state
.rooms
.0
.entry(room_id.clone())
.or_insert_with(|| Room::default());
rooms.insert(room_id.clone());
// Add to the list of connected clients
let connected_clients = room.connected_clients.clone();
mem::drop(room);
println!("Added to room");
// Broadcast to the room
broadcast(&state, connected_clients.clone(), message).await;
}
_ => {}
};
}
_ => {}
}
}
state.clients.0.remove(&client_id);
println!("Disconnecting client {client_id:?}");
// Tell other clients about the disconnection
stream::iter(rooms)
.for_each(|room_id| {
let room = state
.rooms
.0
.entry(room_id.clone())
.or_insert_with(|| Room::default());
room.connected_clients.remove(&client_id);
let connected_clients = room.connected_clients.clone();
mem::drop(room);
let room_id = room_id.clone();
{
let state_ref = &state;
async move {
broadcast(
state_ref,
connected_clients.clone(),
Message::RoomClientList {
room_id: room_id,
clients: connected_clients.into_iter().collect(),
},
)
.await;
}
}
})
.await;
Ok(())
}
async fn broadcast<I>(state: &AppState, client_ids: I, message: Message)
where
I: IntoIterator<Item = Uuid>,
{
let client_ids = client_ids.into_iter().collect::<Vec<_>>();
println!("Broadcasting message to {} clients:", client_ids.len());
println!(" - message: {message:?}");
stream::iter(
client_ids
.into_iter()
.filter_map(|client_id| state.clients.0.get(&client_id))
.map(|client| client.writer.clone()),
)
.for_each_concurrent(None, move |mut writer| {
let message = message.clone();
async move {
writer.send(Ok(message.into_ws_message().unwrap())).await;
}
})
.await;
}

91
src/message.rs Normal file
View file

@ -0,0 +1,91 @@
use std::collections::HashSet;
use std::fmt::{write, Formatter};
use std::io::Cursor;
use axum::extract::ws;
use chrono::{DateTime, Utc};
use ciborium::Value;
use mzlib::axum_error::Result;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use uuid::Uuid;
#[derive(Derivative, Clone, Serialize, Deserialize)]
#[derivative(Debug)]
#[serde(tag = "type")]
pub enum Message {
ServerHello {
timestamp: DateTime<Utc>,
client_id: Uuid,
},
JoinRoom {
room_id: Uuid,
},
RoomClientList {
room_id: Uuid,
clients: HashSet<Uuid>,
},
ChatMessage {
timestamp: DateTime<Utc>,
message_id: Uuid,
room_id: Uuid,
author: Uuid,
content: String,
},
Automerge {
client_id: Uuid,
room_id: Uuid,
#[derivative(Debug = "ignore")]
message: AutomergeMessage,
},
}
impl Message {
pub fn into_ws_message(self) -> Result<ws::Message> {
let vec = Vec::new();
let mut cursor = Cursor::new(vec);
ciborium::into_writer(&self, &mut cursor)?;
let vec = cursor.into_inner();
Ok(ws::Message::Binary(vec))
}
}
#[derive(Clone)]
pub struct AutomergeMessage(pub automerge::sync::Message);
impl Serialize for AutomergeMessage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let bytes = self.0.clone().encode();
serializer.serialize_bytes(&bytes)
}
}
impl<'de> Deserialize<'de> for AutomergeMessage {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct ThisVisitor;
impl<'v> Visitor<'v> for ThisVisitor {
type Value = AutomergeMessage;
fn expecting(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "an automerge sync message")
}
fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
automerge::sync::Message::decode(v)
.map(AutomergeMessage)
.map_err(|_err| E::custom("invalid automerge sync message"))
}
}
let visitor = ThisVisitor;
deserializer.deserialize_bytes(visitor)
}
}

3
tsconfig.json Normal file
View file

@ -0,0 +1,3 @@
{
"compilerOptions": { "lib": ["ESNext", "DOM"], "jsx": "react-jsx" }
}

8
vite.config.ts Normal file
View file

@ -0,0 +1,8 @@
import { defineConfig } from "vite";
import wasm from "vite-plugin-wasm";
import topLevelAwait from "vite-plugin-top-level-await";
import react from "@vitejs/plugin-react";
export default defineConfig({
plugins: [react(), wasm(), topLevelAwait()],
});