Refactor wip
This commit is contained in:
parent
4ac42b0cf3
commit
b7b094b253
6 changed files with 117 additions and 103 deletions
|
@ -47,8 +47,8 @@
|
|||
(httpkit/with-channel request ws-channel
|
||||
(let [to-client (chan)
|
||||
from-client (chan)]
|
||||
(ws/add-client! ws-channel to-client from-client)
|
||||
(state/add-client! to-client from-client)))))
|
||||
(ws/add-connected-client! ws-channel to-client from-client)
|
||||
(state/sync-new-client! to-client from-client)))))
|
||||
|
||||
(defroutes routes
|
||||
(GET "/" [] websocket-handler)
|
||||
|
|
|
@ -1,19 +1,16 @@
|
|||
(ns grub.state
|
||||
(:require [grub.sync :as sync]
|
||||
[grub.util :as util]
|
||||
[grub.common-state :as cs]
|
||||
[clojure.core.async :as a :refer [<! >! chan go]]
|
||||
[hasch.core :as hasch]))
|
||||
|
||||
(def empty-state
|
||||
{:grubs {}
|
||||
:recipes {}})
|
||||
|
||||
(def state (atom empty-state))
|
||||
(def state (atom cs/empty-state))
|
||||
(def to-db (atom nil))
|
||||
(def to-all (chan))
|
||||
(def from-all (a/mult to-all))
|
||||
|
||||
(defn get-initial-state [grubs recipes]
|
||||
(defn initial-state [grubs recipes]
|
||||
{:grubs (util/map-by-key :id grubs)
|
||||
:recipes (util/map-by-key :id recipes)})
|
||||
|
||||
|
@ -21,7 +18,8 @@
|
|||
(let [server-shadow* @server-shadow]
|
||||
(when (not= state* server-shadow*)
|
||||
(let [diff (sync/diff-states server-shadow* state*)
|
||||
msg {:diff diff
|
||||
msg {:type :diff
|
||||
:diff diff
|
||||
:hash (hasch/uuid state*)
|
||||
:shadow-hash (hasch/uuid server-shadow*)}]
|
||||
(println "Sync because:")
|
||||
|
@ -33,30 +31,36 @@
|
|||
;; TODO: only reset server shadow if send succeeds
|
||||
(reset! server-shadow state*)))))
|
||||
|
||||
(defn add-client! [to from]
|
||||
(defn sync-new-client! [to from]
|
||||
(let [client-id (java.util.UUID/randomUUID)
|
||||
server-shadow (atom empty-state)]
|
||||
server-shadow (atom cs/empty-state)]
|
||||
(add-watch state client-id (fn [k ref old new]
|
||||
(sync-remote-changes to new server-shadow)))
|
||||
(a/go-loop []
|
||||
(if-let [{:keys [diff hash shadow-hash]} (<! from)]
|
||||
(do
|
||||
(println "Received client diff:" shadow-hash "->" hash)
|
||||
(println "Before shadow:" (hasch/uuid @server-shadow) @server-shadow)
|
||||
(if (= (hasch/uuid @server-shadow) shadow-hash)
|
||||
(println "Before hash check: good")
|
||||
(println "Before hash check: FAIL"))
|
||||
(let [new-shadow (swap! server-shadow #(sync/patch-state % diff))
|
||||
new-state (swap! state #(sync/patch-state % diff))]
|
||||
;; TODO: check if hashes match
|
||||
(println "After shadow:" (hasch/uuid new-shadow) new-shadow)
|
||||
(if (= (hasch/uuid new-shadow) hash)
|
||||
(println "After hash check: good")
|
||||
(println "After hash check: FAIL"))
|
||||
(>! @to-db diff)
|
||||
(recur)))
|
||||
(remove-watch state client-id)))))
|
||||
(let [{:keys [type diff hash shadow-hash] :as msg} (<! from)]
|
||||
(if (not (nil? msg))
|
||||
(do
|
||||
(condp = type
|
||||
:diff (do
|
||||
(println "Received client diff:" shadow-hash "->" hash)
|
||||
(println "Before shadow:" (hasch/uuid @server-shadow) @server-shadow)
|
||||
(if (= (hasch/uuid @server-shadow) shadow-hash)
|
||||
(println "Before hash check: good")
|
||||
(println "Before hash check: FAIL"))
|
||||
(let [new-shadow (swap! server-shadow #(sync/patch-state % diff))
|
||||
new-state (swap! state #(sync/patch-state % diff))]
|
||||
;; TODO: check if hashes match
|
||||
(println "After shadow:" (hasch/uuid new-shadow) new-shadow)
|
||||
(if (= (hasch/uuid new-shadow) hash)
|
||||
(println "After hash check: good")
|
||||
(println "After hash check: FAIL"))
|
||||
(>! @to-db diff)))
|
||||
:complete (let [new-state (reset! server-shadow @state)]
|
||||
(a/put! to (cs/complete-sync-response new-state)))
|
||||
(println "Invalid msg:" msg))
|
||||
(recur))
|
||||
(remove-watch state client-id))))))
|
||||
|
||||
(defn init [_to-db grubs recipes]
|
||||
(reset! state (get-initial-state grubs recipes))
|
||||
(reset! state (initial-state grubs recipes))
|
||||
(reset! to-db _to-db))
|
||||
|
|
|
@ -3,18 +3,16 @@
|
|||
[org.httpkit.server :as httpkit]
|
||||
[clojure.core.async :as a :refer [<! >! chan go]]))
|
||||
|
||||
(defn add-client! [ws-channel to from]
|
||||
(defn disconnected [status ws-channel to from]
|
||||
(println "Client disconnected:" (.toString ws-channel) "with status" status)
|
||||
(a/close! to)
|
||||
(a/close! from))
|
||||
|
||||
(defn add-connected-client! [ws-channel to from]
|
||||
(println "Client connected:" (.toString ws-channel))
|
||||
(httpkit/on-close ws-channel
|
||||
(fn [status]
|
||||
(println "Client disconnected:" (.toString ws-channel)
|
||||
"with status" status)
|
||||
(a/close! to)
|
||||
(a/close! from)))
|
||||
(a/go-loop [] (if-let [event (<! to)]
|
||||
(do (httpkit/send! ws-channel (pr-str event))
|
||||
(recur))
|
||||
(httpkit/close ws-channel)))
|
||||
(httpkit/on-receive ws-channel #(a/put! from (read-string %)))
|
||||
(a/go-loop []
|
||||
(if-let [event (<! to)]
|
||||
(do
|
||||
(httpkit/send! ws-channel (str event))
|
||||
(recur))
|
||||
(httpkit/close ws-channel))))
|
||||
(httpkit/on-close ws-channel #(disconnected % ws-channel to from)))
|
||||
|
|
|
@ -8,10 +8,8 @@
|
|||
(defn init-app []
|
||||
(view/render-app state/app-state)
|
||||
(let [to-remote (chan)
|
||||
to-state (chan)
|
||||
from-remote (ws/get-remote-chan to-remote)
|
||||
from-state (state/update-state-on-event! to-state)]
|
||||
(a/pipe from-remote to-state)
|
||||
(a/pipe from-state to-remote)))
|
||||
from-remote (chan)]
|
||||
(ws/connect-client! to-remote from-remote)
|
||||
(state/sync-state! from-remote to-remote)))
|
||||
|
||||
(init-app)
|
||||
|
|
|
@ -1,28 +1,52 @@
|
|||
(ns grub.state
|
||||
(:require [grub.sync :as sync]
|
||||
[grub.common-state :as cs]
|
||||
[cljs.core.async :as a :refer [<! >! chan]]
|
||||
[hasch.core :as hasch])
|
||||
(:require-macros [grub.macros :refer [log logs]]
|
||||
[cljs.core.async.macros :refer [go go-loop]]))
|
||||
|
||||
(def empty-state {:grubs {} :recipes {}})
|
||||
(def app-state (atom empty-state))
|
||||
(def client-shadow (atom empty-state))
|
||||
(def app-state (atom cs/empty-state))
|
||||
(def client-shadow (atom cs/empty-state))
|
||||
|
||||
(defn update-state-on-event! [in]
|
||||
(let [out (chan)]
|
||||
(go-loop []
|
||||
(when-let [{:keys [diff hash shadow-hash]} (<! in)]
|
||||
(logs "Received server diff:" shadow-hash "->" hash)
|
||||
(logs "Before shadow:" (hasch/uuid @client-shadow) @client-shadow)
|
||||
(if (= (hasch/uuid @client-shadow) shadow-hash)
|
||||
(log "Before hash check: good")
|
||||
(log "Before hash check: FAIL"))
|
||||
(let [new-shadow (swap! client-shadow #(sync/patch-state % diff))
|
||||
new-state (swap! app-state #(sync/patch-state % diff))]
|
||||
(logs "After shadow:" (hasch/uuid @client-shadow) @client-shadow)
|
||||
(if (= (hasch/uuid new-shadow) hash)
|
||||
(log "After hash check: good")
|
||||
(log "After hash check: FAIL"))
|
||||
(recur))))
|
||||
out))
|
||||
(defn sync-local-changes [to-remote state*]
|
||||
(let [client-shadow* @client-shadow]
|
||||
(when (not= state* client-shadow*)
|
||||
(let [diff (sync/diff-states client-shadow* state*)
|
||||
msg {:type :diff
|
||||
:diff diff
|
||||
:hash (hasch/uuid state*)
|
||||
:shadow-hash (hasch/uuid client-shadow*)}]
|
||||
(logs "Sync because:")
|
||||
(logs "Server = " client-shadow*)
|
||||
(logs "Client = " state*)
|
||||
(logs "Diff:" diff)
|
||||
(logs "Send" (hasch/uuid client-shadow*) "->" (hasch/uuid state*))
|
||||
;; TODO: reset client shadow only if send succeeds
|
||||
(a/put! to-remote msg)
|
||||
(reset! client-shadow state*)))))
|
||||
|
||||
(defn sync-state! [to from]
|
||||
(go-loop []
|
||||
(when-let [{:keys [type diff hash shadow-hash] :as msg} (<! to)]
|
||||
(condp = type
|
||||
:diff (do
|
||||
(logs "Received server diff:" shadow-hash "->" hash)
|
||||
(logs "Before shadow:" (hasch/uuid @client-shadow) @client-shadow)
|
||||
(if (= (hasch/uuid @client-shadow) shadow-hash)
|
||||
(log "Before hash check: good")
|
||||
(log "Before hash check: FAIL"))
|
||||
(let [new-shadow (swap! client-shadow #(sync/patch-state % diff))
|
||||
new-state (swap! app-state #(sync/patch-state % diff))]
|
||||
(logs "After shadow:" (hasch/uuid @client-shadow) @client-shadow)
|
||||
(if (= (hasch/uuid new-shadow) hash)
|
||||
(log "After hash check: good")
|
||||
(log "After hash check: FAIL"))))
|
||||
:complete (do (log "Received complete sync, reset state")
|
||||
(logs msg)
|
||||
(reset! client-shadow (:state msg))
|
||||
(reset! app-state (:state msg)))
|
||||
(logs "Invalid msg:" msg))
|
||||
(recur)))
|
||||
(add-watch app-state :app-state (fn [k ref old new] (sync-local-changes from new)))
|
||||
(a/put! from cs/complete-sync-request))
|
||||
|
|
|
@ -10,44 +10,34 @@
|
|||
(:require-macros [cljs.core.async.macros :refer [go go-loop]]
|
||||
[grub.macros :refer [log logs]]))
|
||||
|
||||
(def websocket* (atom nil))
|
||||
(def server-url (str "ws://" (.-host (.-location js/document))))
|
||||
(def pending-msg (atom nil))
|
||||
|
||||
(defn sync-local-changes []
|
||||
(when (and (.isOpen @websocket*)
|
||||
(not= @state/app-state @state/client-shadow))
|
||||
(let [app-state @state/app-state
|
||||
client-shadow @state/client-shadow
|
||||
diff (sync/diff-states client-shadow app-state)
|
||||
msg {:diff diff
|
||||
:hash (hasch/uuid app-state)
|
||||
:shadow-hash (hasch/uuid client-shadow)}]
|
||||
(logs "Sync because:")
|
||||
(logs "Server = " client-shadow)
|
||||
(logs "Client = " app-state)
|
||||
(logs "Diff:" diff)
|
||||
(logs "Send" (hasch/uuid client-shadow) "->" (hasch/uuid app-state))
|
||||
;; TODO: reset client shadow only if send succeeds
|
||||
(.send @websocket* msg)
|
||||
(reset! state/client-shadow app-state))))
|
||||
(defn send-pending-msg [websocket]
|
||||
(when (and (.isOpen websocket)
|
||||
(not (nil? @pending-msg)))
|
||||
(.send websocket @pending-msg)
|
||||
(reset! pending-msg nil)))
|
||||
|
||||
(defn on-connected [event]
|
||||
(defn on-connected [websocket event]
|
||||
(log "Connected:" event)
|
||||
(sync-local-changes))
|
||||
(send-pending-msg websocket))
|
||||
|
||||
(defn on-message-fn [out]
|
||||
(fn [event]
|
||||
(let [msg (cljs.reader/read-string (.-message event))]
|
||||
(a/put! out msg))))
|
||||
(defn on-message [from event]
|
||||
(let [msg (cljs.reader/read-string (.-message event))]
|
||||
(a/put! from msg)))
|
||||
|
||||
(defn get-remote-chan [to-remote]
|
||||
(let [server-url (str "ws://" (.-host (.-location js/document)))
|
||||
handler (goog.events.EventHandler.)
|
||||
remote-events (chan)]
|
||||
(reset! websocket* (goog.net.WebSocket.))
|
||||
(.listen handler @websocket* goog.net.WebSocket.EventType.OPENED on-connected false)
|
||||
(.listen handler @websocket* goog.net.WebSocket.EventType.MESSAGE (on-message-fn remote-events) false)
|
||||
(.listen handler @websocket* goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %) false)
|
||||
(.listen handler @websocket* goog.net.WebSocket.EventType.ERROR #(log "Error:" %) false)
|
||||
(add-watch state/app-state :app-state #(sync-local-changes))
|
||||
(.open @websocket* server-url)
|
||||
remote-events))
|
||||
(defn connect-client! [to from]
|
||||
(let [handler (goog.events.EventHandler.)
|
||||
websocket (goog.net.WebSocket.)
|
||||
listen (fn [type fun] (.listen handler websocket type fun false))]
|
||||
(listen goog.net.WebSocket.EventType.OPENED (partial on-connected websocket))
|
||||
(listen goog.net.WebSocket.EventType.MESSAGE (partial on-message from))
|
||||
(listen goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %))
|
||||
(listen goog.net.WebSocket.EventType.ERROR #(log "Error:" %))
|
||||
(go (loop []
|
||||
(when-let [msg (<! to)]
|
||||
(reset! pending-msg msg)
|
||||
(send-pending-msg websocket)
|
||||
(recur))))
|
||||
(.open websocket server-url)))
|
||||
|
|
Loading…
Reference in a new issue