State checks - wip
This commit is contained in:
parent
becfb42627
commit
a672155a92
4 changed files with 84 additions and 45 deletions
|
@ -26,11 +26,17 @@
|
|||
(let [deleted-grubs (:deleted grubs)
|
||||
updated-grubs (->> (:updated grubs)
|
||||
(seq)
|
||||
(map (fn [[k v]] (assoc v :_id v))))
|
||||
(map (fn [[k v]]
|
||||
(-> v
|
||||
(dissoc :id)
|
||||
(assoc :_id k)))))
|
||||
deleted-recipes (:deleted recipes)
|
||||
updated-recipes (->> (:updated recipes)
|
||||
(seq)
|
||||
(map (fn [[k v]] (assoc v :_id v))))]
|
||||
(map (fn [[k v]]
|
||||
(-> v
|
||||
(dissoc :id)
|
||||
(assoc :_id k)))))]
|
||||
(doseq [g deleted-grubs]
|
||||
(mc/remove-by-id @db grub-collection g))
|
||||
(doseq [g updated-grubs]
|
||||
|
|
|
@ -16,24 +16,45 @@
|
|||
{:grubs (util/map-by-key :id grubs)
|
||||
:recipes (util/map-by-key :id recipes)})
|
||||
|
||||
(defn sync-remote-changes [to-client state* server-shadow]
|
||||
(let [server-shadow* @server-shadow]
|
||||
(when (not= state* server-shadow*)
|
||||
(let [diff (sync/diff-states server-shadow* state*)
|
||||
msg {:diff diff
|
||||
:hash (hash state*)
|
||||
:shadow-hash (hash server-shadow*)}]
|
||||
(println "Sync because:")
|
||||
(println "Server = " state*)
|
||||
(println "Client = " server-shadow*)
|
||||
(println "Diff:" diff)
|
||||
(println "Send" (hash server-shadow*) "->" (hash state*))
|
||||
(a/put! to-client msg)
|
||||
;; TODO: only reset server shadow if send succeeds
|
||||
(reset! server-shadow state*)))))
|
||||
|
||||
(defn add-client! [to from]
|
||||
(let [client-id (java.util.UUID/randomUUID)]
|
||||
(println "New client id:" client-id)
|
||||
(let [client-id (java.util.UUID/randomUUID)
|
||||
server-shadow (atom empty-state)]
|
||||
(add-watch state client-id (fn [k ref old new]
|
||||
(sync-remote-changes to new server-shadow)))
|
||||
(a/go-loop []
|
||||
(when-let [diff (<! from)]
|
||||
(swap! state #(sync/patch-state % diff))
|
||||
(>! @to-db diff)
|
||||
(>! to-all {:diff diff :source-id client-id})
|
||||
(recur)))
|
||||
(let [all-diffs (chan)]
|
||||
(a/tap from-all all-diffs)
|
||||
(a/go-loop [] (if-let [{:keys [diff source-id] :as event} (<! all-diffs)]
|
||||
(do
|
||||
(when-not (= source-id client-id)
|
||||
(>! to diff))
|
||||
(recur))
|
||||
(a/untap from-all all-diffs))))
|
||||
(a/put! to (sync/diff-states empty-state @state))))
|
||||
(if-let [{:keys [diff hash shadow-hash]} (<! from)]
|
||||
(do
|
||||
(println "Received client diff:" shadow-hash "->" hash)
|
||||
(println "Before shadow:" (clojure.core/hash @server-shadow) @server-shadow)
|
||||
(if (= (clojure.core/hash @server-shadow) shadow-hash)
|
||||
(println "Before hash check: good")
|
||||
(println "Before hash check: FAIL"))
|
||||
(let [new-shadow (swap! server-shadow #(sync/patch-state % diff))
|
||||
new-state (swap! state #(sync/patch-state % diff))]
|
||||
;; TODO: check if hashes match
|
||||
(println "After shadow:" (clojure.core/hash new-shadow) new-shadow)
|
||||
(if (= (clojure.core/hash new-shadow) hash)
|
||||
(println "After hash check: good")
|
||||
(println "After hash check: FAIL"))
|
||||
(>! @to-db diff)
|
||||
(recur)))
|
||||
(remove-watch state client-id)))))
|
||||
|
||||
(defn init [_to-db grubs recipes]
|
||||
(reset! state (get-initial-state grubs recipes))
|
||||
|
|
|
@ -4,18 +4,22 @@
|
|||
(:require-macros [grub.macros :refer [log logs]]
|
||||
[cljs.core.async.macros :refer [go go-loop]]))
|
||||
|
||||
(def app-state (atom {:grubs {}
|
||||
:recipes {}}))
|
||||
(def empty-state {:grubs {} :recipes {}})
|
||||
(def app-state (atom empty-state))
|
||||
(def client-shadow (atom empty-state))
|
||||
|
||||
(defn update-state-on-event! [in]
|
||||
(let [out (chan)]
|
||||
(add-watch app-state :app-state
|
||||
(fn [key ref old new]
|
||||
(when-not (= old new)
|
||||
(let [diff (sync/diff-states old new)]
|
||||
(a/put! out diff)))))
|
||||
(go-loop []
|
||||
(when-let [diff (<! in)]
|
||||
(swap! app-state #(sync/patch-state % diff))
|
||||
(recur)))
|
||||
(when-let [{:keys [diff hash shadow-hash]} (<! in)]
|
||||
(logs "Received server diff:" shadow-hash "->" hash)
|
||||
(if (= (cljs.core/hash @client-shadow) shadow-hash)
|
||||
(log "Before hash check: good")
|
||||
(log "Before hash check: FAIL"))
|
||||
(let [new-shadow (swap! client-shadow #(sync/patch-state % diff))
|
||||
new-state (swap! app-state #(sync/patch-state % diff))]
|
||||
(if (= (cljs.core/hash new-shadow) hash)
|
||||
(log "After hash check: good")
|
||||
(log "After hash check: FAIL"))
|
||||
(recur))))
|
||||
out))
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
(ns grub.websocket
|
||||
(:require [cljs.core.async :as a :refer [<! >! chan]]
|
||||
(:require [grub.state :as state]
|
||||
[grub.sync :as sync]
|
||||
[cljs.core.async :as a :refer [<! >! chan]]
|
||||
[cljs.reader]
|
||||
goog.net.WebSocket
|
||||
goog.events.EventHandler
|
||||
|
@ -8,27 +10,33 @@
|
|||
[grub.macros :refer [log logs]]))
|
||||
|
||||
(def websocket* (atom nil))
|
||||
(def pending-events (atom []))
|
||||
|
||||
(defn sync-local-changes []
|
||||
(when (and (.isOpen @websocket*)
|
||||
(not= @state/app-state @state/client-shadow))
|
||||
(let [app-state @state/app-state
|
||||
client-shadow @state/client-shadow
|
||||
diff (sync/diff-states client-shadow app-state)
|
||||
msg {:diff diff
|
||||
:hash (hash app-state)
|
||||
:shadow-hash (hash client-shadow)}]
|
||||
(logs "Sync because:")
|
||||
(logs "Server = " client-shadow)
|
||||
(logs "Client = " app-state)
|
||||
(logs "Diff:" diff)
|
||||
(logs "Send" (hash client-shadow) "->" (hash app-state))
|
||||
;; TODO: reset client shadow only if send succeeds
|
||||
(.send @websocket* msg)
|
||||
(reset! state/client-shadow app-state))))
|
||||
|
||||
(defn on-connected [event]
|
||||
(log "Connected:" event)
|
||||
(when (> (count @pending-events))
|
||||
(doseq [event @pending-events] (.send @websocket* event))
|
||||
(reset! pending-events [])))
|
||||
|
||||
(defn send-outgoing-events [ch]
|
||||
(go-loop []
|
||||
(let [event (<! ch)]
|
||||
(if (.isOpen @websocket*)
|
||||
(.send @websocket* event)
|
||||
(swap! pending-events conj event))
|
||||
(recur))))
|
||||
(sync-local-changes))
|
||||
|
||||
(defn on-message-fn [out]
|
||||
(fn [event]
|
||||
(let [grub-event (cljs.reader/read-string (.-message event))]
|
||||
(go (>! out grub-event)))))
|
||||
|
||||
(let [msg (cljs.reader/read-string (.-message event))]
|
||||
(a/put! out msg))))
|
||||
|
||||
(defn get-remote-chan [to-remote]
|
||||
(let [server-url (str "ws://" (.-host (.-location js/document)))
|
||||
|
@ -39,6 +47,6 @@
|
|||
(.listen handler @websocket* goog.net.WebSocket.EventType.MESSAGE (on-message-fn remote-events) false)
|
||||
(.listen handler @websocket* goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %) false)
|
||||
(.listen handler @websocket* goog.net.WebSocket.EventType.ERROR #(log "Error:" %) false)
|
||||
(send-outgoing-events to-remote)
|
||||
(add-watch state/app-state :app-state #(sync-local-changes))
|
||||
(.open @websocket* server-url)
|
||||
remote-events))
|
||||
|
|
Loading…
Reference in a new issue