diff --git a/src/clj/grub/core.clj b/src/clj/grub/core.clj index f4f1313..d892f09 100644 --- a/src/clj/grub/core.clj +++ b/src/clj/grub/core.clj @@ -47,8 +47,8 @@ (httpkit/with-channel request ws-channel (let [to-client (chan) from-client (chan)] - (ws/add-client! ws-channel to-client from-client) - (state/add-client! to-client from-client))))) + (ws/add-connected-client! ws-channel to-client from-client) + (state/sync-new-client! to-client from-client))))) (defroutes routes (GET "/" [] websocket-handler) diff --git a/src/clj/grub/state.clj b/src/clj/grub/state.clj index a44eb94..1b4b2ef 100644 --- a/src/clj/grub/state.clj +++ b/src/clj/grub/state.clj @@ -1,19 +1,16 @@ (ns grub.state (:require [grub.sync :as sync] [grub.util :as util] + [grub.common-state :as cs] [clojure.core.async :as a :refer [! chan go]] [hasch.core :as hasch])) -(def empty-state - {:grubs {} - :recipes {}}) - -(def state (atom empty-state)) +(def state (atom cs/empty-state)) (def to-db (atom nil)) (def to-all (chan)) (def from-all (a/mult to-all)) -(defn get-initial-state [grubs recipes] +(defn initial-state [grubs recipes] {:grubs (util/map-by-key :id grubs) :recipes (util/map-by-key :id recipes)}) @@ -21,7 +18,8 @@ (let [server-shadow* @server-shadow] (when (not= state* server-shadow*) (let [diff (sync/diff-states server-shadow* state*) - msg {:diff diff + msg {:type :diff + :diff diff :hash (hasch/uuid state*) :shadow-hash (hasch/uuid server-shadow*)}] (println "Sync because:") @@ -33,30 +31,36 @@ ;; TODO: only reset server shadow if send succeeds (reset! server-shadow state*))))) -(defn add-client! [to from] +(defn sync-new-client! [to from] (let [client-id (java.util.UUID/randomUUID) - server-shadow (atom empty-state)] + server-shadow (atom cs/empty-state)] (add-watch state client-id (fn [k ref old new] (sync-remote-changes to new server-shadow))) (a/go-loop [] - (if-let [{:keys [diff hash shadow-hash]} (" hash) - (println "Before shadow:" (hasch/uuid @server-shadow) @server-shadow) - (if (= (hasch/uuid @server-shadow) shadow-hash) - (println "Before hash check: good") - (println "Before hash check: FAIL")) - (let [new-shadow (swap! server-shadow #(sync/patch-state % diff)) - new-state (swap! state #(sync/patch-state % diff))] - ;; TODO: check if hashes match - (println "After shadow:" (hasch/uuid new-shadow) new-shadow) - (if (= (hasch/uuid new-shadow) hash) - (println "After hash check: good") - (println "After hash check: FAIL")) - (>! @to-db diff) - (recur))) - (remove-watch state client-id))))) + (let [{:keys [type diff hash shadow-hash] :as msg} (" hash) + (println "Before shadow:" (hasch/uuid @server-shadow) @server-shadow) + (if (= (hasch/uuid @server-shadow) shadow-hash) + (println "Before hash check: good") + (println "Before hash check: FAIL")) + (let [new-shadow (swap! server-shadow #(sync/patch-state % diff)) + new-state (swap! state #(sync/patch-state % diff))] + ;; TODO: check if hashes match + (println "After shadow:" (hasch/uuid new-shadow) new-shadow) + (if (= (hasch/uuid new-shadow) hash) + (println "After hash check: good") + (println "After hash check: FAIL")) + (>! @to-db diff))) + :complete (let [new-state (reset! server-shadow @state)] + (a/put! to (cs/complete-sync-response new-state))) + (println "Invalid msg:" msg)) + (recur)) + (remove-watch state client-id)))))) (defn init [_to-db grubs recipes] - (reset! state (get-initial-state grubs recipes)) + (reset! state (initial-state grubs recipes)) (reset! to-db _to-db)) diff --git a/src/clj/grub/websocket.clj b/src/clj/grub/websocket.clj index fdc9538..57b4a0e 100644 --- a/src/clj/grub/websocket.clj +++ b/src/clj/grub/websocket.clj @@ -3,18 +3,16 @@ [org.httpkit.server :as httpkit] [clojure.core.async :as a :refer [! chan go]])) -(defn add-client! [ws-channel to from] +(defn disconnected [status ws-channel to from] + (println "Client disconnected:" (.toString ws-channel) "with status" status) + (a/close! to) + (a/close! from)) + +(defn add-connected-client! [ws-channel to from] (println "Client connected:" (.toString ws-channel)) - (httpkit/on-close ws-channel - (fn [status] - (println "Client disconnected:" (.toString ws-channel) - "with status" status) - (a/close! to) - (a/close! from))) + (a/go-loop [] (if-let [event (! chan]] [hasch.core :as hasch]) (:require-macros [grub.macros :refer [log logs]] [cljs.core.async.macros :refer [go go-loop]])) -(def empty-state {:grubs {} :recipes {}}) -(def app-state (atom empty-state)) -(def client-shadow (atom empty-state)) +(def app-state (atom cs/empty-state)) +(def client-shadow (atom cs/empty-state)) -(defn update-state-on-event! [in] - (let [out (chan)] - (go-loop [] - (when-let [{:keys [diff hash shadow-hash]} (" hash) - (logs "Before shadow:" (hasch/uuid @client-shadow) @client-shadow) - (if (= (hasch/uuid @client-shadow) shadow-hash) - (log "Before hash check: good") - (log "Before hash check: FAIL")) - (let [new-shadow (swap! client-shadow #(sync/patch-state % diff)) - new-state (swap! app-state #(sync/patch-state % diff))] - (logs "After shadow:" (hasch/uuid @client-shadow) @client-shadow) - (if (= (hasch/uuid new-shadow) hash) - (log "After hash check: good") - (log "After hash check: FAIL")) - (recur)))) - out)) +(defn sync-local-changes [to-remote state*] + (let [client-shadow* @client-shadow] + (when (not= state* client-shadow*) + (let [diff (sync/diff-states client-shadow* state*) + msg {:type :diff + :diff diff + :hash (hasch/uuid state*) + :shadow-hash (hasch/uuid client-shadow*)}] + (logs "Sync because:") + (logs "Server = " client-shadow*) + (logs "Client = " state*) + (logs "Diff:" diff) + (logs "Send" (hasch/uuid client-shadow*) "->" (hasch/uuid state*)) + ;; TODO: reset client shadow only if send succeeds + (a/put! to-remote msg) + (reset! client-shadow state*))))) + +(defn sync-state! [to from] + (go-loop [] + (when-let [{:keys [type diff hash shadow-hash] :as msg} (" hash) + (logs "Before shadow:" (hasch/uuid @client-shadow) @client-shadow) + (if (= (hasch/uuid @client-shadow) shadow-hash) + (log "Before hash check: good") + (log "Before hash check: FAIL")) + (let [new-shadow (swap! client-shadow #(sync/patch-state % diff)) + new-state (swap! app-state #(sync/patch-state % diff))] + (logs "After shadow:" (hasch/uuid @client-shadow) @client-shadow) + (if (= (hasch/uuid new-shadow) hash) + (log "After hash check: good") + (log "After hash check: FAIL")))) + :complete (do (log "Received complete sync, reset state") + (logs msg) + (reset! client-shadow (:state msg)) + (reset! app-state (:state msg))) + (logs "Invalid msg:" msg)) + (recur))) + (add-watch app-state :app-state (fn [k ref old new] (sync-local-changes from new))) + (a/put! from cs/complete-sync-request)) diff --git a/src/cljs/grub/websocket.cljs b/src/cljs/grub/websocket.cljs index 1df0699..56455bd 100644 --- a/src/cljs/grub/websocket.cljs +++ b/src/cljs/grub/websocket.cljs @@ -10,44 +10,34 @@ (:require-macros [cljs.core.async.macros :refer [go go-loop]] [grub.macros :refer [log logs]])) -(def websocket* (atom nil)) +(def server-url (str "ws://" (.-host (.-location js/document)))) +(def pending-msg (atom nil)) -(defn sync-local-changes [] - (when (and (.isOpen @websocket*) - (not= @state/app-state @state/client-shadow)) - (let [app-state @state/app-state - client-shadow @state/client-shadow - diff (sync/diff-states client-shadow app-state) - msg {:diff diff - :hash (hasch/uuid app-state) - :shadow-hash (hasch/uuid client-shadow)}] - (logs "Sync because:") - (logs "Server = " client-shadow) - (logs "Client = " app-state) - (logs "Diff:" diff) - (logs "Send" (hasch/uuid client-shadow) "->" (hasch/uuid app-state)) - ;; TODO: reset client shadow only if send succeeds - (.send @websocket* msg) - (reset! state/client-shadow app-state)))) +(defn send-pending-msg [websocket] + (when (and (.isOpen websocket) + (not (nil? @pending-msg))) + (.send websocket @pending-msg) + (reset! pending-msg nil))) -(defn on-connected [event] +(defn on-connected [websocket event] (log "Connected:" event) - (sync-local-changes)) + (send-pending-msg websocket)) -(defn on-message-fn [out] - (fn [event] - (let [msg (cljs.reader/read-string (.-message event))] - (a/put! out msg)))) +(defn on-message [from event] + (let [msg (cljs.reader/read-string (.-message event))] + (a/put! from msg))) -(defn get-remote-chan [to-remote] - (let [server-url (str "ws://" (.-host (.-location js/document))) - handler (goog.events.EventHandler.) - remote-events (chan)] - (reset! websocket* (goog.net.WebSocket.)) - (.listen handler @websocket* goog.net.WebSocket.EventType.OPENED on-connected false) - (.listen handler @websocket* goog.net.WebSocket.EventType.MESSAGE (on-message-fn remote-events) false) - (.listen handler @websocket* goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %) false) - (.listen handler @websocket* goog.net.WebSocket.EventType.ERROR #(log "Error:" %) false) - (add-watch state/app-state :app-state #(sync-local-changes)) - (.open @websocket* server-url) - remote-events)) +(defn connect-client! [to from] + (let [handler (goog.events.EventHandler.) + websocket (goog.net.WebSocket.) + listen (fn [type fun] (.listen handler websocket type fun false))] + (listen goog.net.WebSocket.EventType.OPENED (partial on-connected websocket)) + (listen goog.net.WebSocket.EventType.MESSAGE (partial on-message from)) + (listen goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %)) + (listen goog.net.WebSocket.EventType.ERROR #(log "Error:" %)) + (go (loop [] + (when-let [msg (