diff --git a/src/clj/grub/db.clj b/src/clj/grub/db.clj index 7e2765b..90cdac5 100644 --- a/src/clj/grub/db.clj +++ b/src/clj/grub/db.clj @@ -26,11 +26,17 @@ (let [deleted-grubs (:deleted grubs) updated-grubs (->> (:updated grubs) (seq) - (map (fn [[k v]] (assoc v :_id v)))) + (map (fn [[k v]] + (-> v + (dissoc :id) + (assoc :_id k))))) deleted-recipes (:deleted recipes) updated-recipes (->> (:updated recipes) (seq) - (map (fn [[k v]] (assoc v :_id v))))] + (map (fn [[k v]] + (-> v + (dissoc :id) + (assoc :_id k)))))] (doseq [g deleted-grubs] (mc/remove-by-id @db grub-collection g)) (doseq [g updated-grubs] diff --git a/src/clj/grub/state.clj b/src/clj/grub/state.clj index 0e7be18..0ed189b 100644 --- a/src/clj/grub/state.clj +++ b/src/clj/grub/state.clj @@ -16,24 +16,45 @@ {:grubs (util/map-by-key :id grubs) :recipes (util/map-by-key :id recipes)}) +(defn sync-remote-changes [to-client state* server-shadow] + (let [server-shadow* @server-shadow] + (when (not= state* server-shadow*) + (let [diff (sync/diff-states server-shadow* state*) + msg {:diff diff + :hash (hash state*) + :shadow-hash (hash server-shadow*)}] + (println "Sync because:") + (println "Server = " state*) + (println "Client = " server-shadow*) + (println "Diff:" diff) + (println "Send" (hash server-shadow*) "->" (hash state*)) + (a/put! to-client msg) + ;; TODO: only reset server shadow if send succeeds + (reset! server-shadow state*))))) + (defn add-client! [to from] - (let [client-id (java.util.UUID/randomUUID)] - (println "New client id:" client-id) + (let [client-id (java.util.UUID/randomUUID) + server-shadow (atom empty-state)] + (add-watch state client-id (fn [k ref old new] + (sync-remote-changes to new server-shadow))) (a/go-loop [] - (when-let [diff (! @to-db diff) - (>! to-all {:diff diff :source-id client-id}) - (recur))) - (let [all-diffs (chan)] - (a/tap from-all all-diffs) - (a/go-loop [] (if-let [{:keys [diff source-id] :as event} (! to diff)) - (recur)) - (a/untap from-all all-diffs)))) - (a/put! to (sync/diff-states empty-state @state)))) + (if-let [{:keys [diff hash shadow-hash]} (" hash) + (println "Before shadow:" (clojure.core/hash @server-shadow) @server-shadow) + (if (= (clojure.core/hash @server-shadow) shadow-hash) + (println "Before hash check: good") + (println "Before hash check: FAIL")) + (let [new-shadow (swap! server-shadow #(sync/patch-state % diff)) + new-state (swap! state #(sync/patch-state % diff))] + ;; TODO: check if hashes match + (println "After shadow:" (clojure.core/hash new-shadow) new-shadow) + (if (= (clojure.core/hash new-shadow) hash) + (println "After hash check: good") + (println "After hash check: FAIL")) + (>! @to-db diff) + (recur))) + (remove-watch state client-id))))) (defn init [_to-db grubs recipes] (reset! state (get-initial-state grubs recipes)) diff --git a/src/cljs/grub/state.cljs b/src/cljs/grub/state.cljs index b107bbe..99b76db 100644 --- a/src/cljs/grub/state.cljs +++ b/src/cljs/grub/state.cljs @@ -4,18 +4,22 @@ (:require-macros [grub.macros :refer [log logs]] [cljs.core.async.macros :refer [go go-loop]])) -(def app-state (atom {:grubs {} - :recipes {}})) +(def empty-state {:grubs {} :recipes {}}) +(def app-state (atom empty-state)) +(def client-shadow (atom empty-state)) (defn update-state-on-event! [in] (let [out (chan)] - (add-watch app-state :app-state - (fn [key ref old new] - (when-not (= old new) - (let [diff (sync/diff-states old new)] - (a/put! out diff))))) (go-loop [] - (when-let [diff (" hash) + (if (= (cljs.core/hash @client-shadow) shadow-hash) + (log "Before hash check: good") + (log "Before hash check: FAIL")) + (let [new-shadow (swap! client-shadow #(sync/patch-state % diff)) + new-state (swap! app-state #(sync/patch-state % diff))] + (if (= (cljs.core/hash new-shadow) hash) + (log "After hash check: good") + (log "After hash check: FAIL")) + (recur)))) out)) diff --git a/src/cljs/grub/websocket.cljs b/src/cljs/grub/websocket.cljs index f1d805f..788567b 100644 --- a/src/cljs/grub/websocket.cljs +++ b/src/cljs/grub/websocket.cljs @@ -1,5 +1,7 @@ (ns grub.websocket - (:require [cljs.core.async :as a :refer [! chan]] + (:require [grub.state :as state] + [grub.sync :as sync] + [cljs.core.async :as a :refer [! chan]] [cljs.reader] goog.net.WebSocket goog.events.EventHandler @@ -8,27 +10,33 @@ [grub.macros :refer [log logs]])) (def websocket* (atom nil)) -(def pending-events (atom [])) + +(defn sync-local-changes [] + (when (and (.isOpen @websocket*) + (not= @state/app-state @state/client-shadow)) + (let [app-state @state/app-state + client-shadow @state/client-shadow + diff (sync/diff-states client-shadow app-state) + msg {:diff diff + :hash (hash app-state) + :shadow-hash (hash client-shadow)}] + (logs "Sync because:") + (logs "Server = " client-shadow) + (logs "Client = " app-state) + (logs "Diff:" diff) + (logs "Send" (hash client-shadow) "->" (hash app-state)) + ;; TODO: reset client shadow only if send succeeds + (.send @websocket* msg) + (reset! state/client-shadow app-state)))) (defn on-connected [event] (log "Connected:" event) - (when (> (count @pending-events)) - (doseq [event @pending-events] (.send @websocket* event)) - (reset! pending-events []))) - -(defn send-outgoing-events [ch] - (go-loop [] - (let [event (! out grub-event))))) - + (let [msg (cljs.reader/read-string (.-message event))] + (a/put! out msg)))) (defn get-remote-chan [to-remote] (let [server-url (str "ws://" (.-host (.-location js/document))) @@ -39,6 +47,6 @@ (.listen handler @websocket* goog.net.WebSocket.EventType.MESSAGE (on-message-fn remote-events) false) (.listen handler @websocket* goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %) false) (.listen handler @websocket* goog.net.WebSocket.EventType.ERROR #(log "Error:" %) false) - (send-outgoing-events to-remote) + (add-watch state/app-state :app-state #(sync-local-changes)) (.open @websocket* server-url) remote-events))