From f087309c0f08697d36e7b6f01d76f554af9c034b Mon Sep 17 00:00:00 2001 From: Nicholas Kariniemi Date: Sat, 30 Aug 2014 15:40:17 +0300 Subject: [PATCH] Possibly fully working differential sync --- src/clj/grub/state.clj | 74 ++++++++++++++++++--------------- src/cljs/grub/core.cljs | 2 +- src/cljs/grub/state.cljs | 52 ++++++++++++++++------- src/cljx/grub/common_state.cljx | 17 ++++---- 4 files changed, 88 insertions(+), 57 deletions(-) diff --git a/src/clj/grub/state.clj b/src/clj/grub/state.clj index d210363..d8ee3d3 100644 --- a/src/clj/grub/state.clj +++ b/src/clj/grub/state.clj @@ -13,9 +13,11 @@ (def state-history (atom [])) (defn save-history-state [history new-state] - (when-not (= (last history) new-state) + (when-not (= (hasch/uuid (last history)) (hasch/uuid new-state)) (println "Adding state to history: " (hasch/uuid new-state)) - (println "History size:" (inc (count history))) + (println "History:") + (doseq [s (conj history new-state)] + (println (hasch/uuid s))) (conj history new-state))) (defn get-history-state [hash] @@ -34,53 +36,57 @@ (defn sync-new-client! [to from] (let [client-id (java.util.UUID/randomUUID) - server-shadow (atom cs/empty-state)] + client-state (atom cs/empty-state) + log (fn [& args] + (apply println client-id args))] (add-watch state client-id (fn [_ _ _ current-state] - (when-let [msg (cs/diff-states @server-shadow current-state)] + (when-let [msg (cs/diff-states @client-state current-state)] (a/put! to msg) + ;; send ACK even if nothing changes ;; TODO: reset only if send succeeds? - (reset! server-shadow current-state)))) + (reset! client-state current-state)))) (a/go-loop [] (if-let [{:keys [type diff hash shadow-hash] :as msg} (! @to-db diff)) - (do (println "Applying diff failed --> full sync") + (do (log "Applying diff failed --> full sync") (let [sync-state @state] - (reset! server-shadow sync-state) + (reset! client-state sync-state) (a/put! to (cs/complete-sync-response sync-state)))))) - ;; we have something different than they thought - ;; check history - (if-let [history-state (get-history-state shadow-hash)] - ;; Found what they thought in history, - ;; reset client state to this - ;; and continue as normal - (do - (println "Hash check failed --> Reset from history") - (reset! server-shadow history-state) - (let [new-shadow (swap! server-shadow sync/patch-state diff)] - (if (= (hasch/uuid new-shadow) hash) - (let [new-state (swap! state sync/patch-state diff)] - (>! @to-db diff)) - (do (println "Applying diff failed --> full sync") - (let [sync-state @state] - (reset! server-shadow sync-state) - (a/put! to (cs/complete-sync-response sync-state))))))) - ;; No history found, do complete sync - (do (println "Hash check failed, not in history --> full sync") - (let [sync-state @state] - (reset! server-shadow sync-state) - (a/put! to (cs/complete-sync-response sync-state)))))) - :complete (let [new-state (reset! server-shadow @state)] + ;; We have something different than they thought + ;; Check history + (do + (log "Hash check failed --> Reset from history") + (if-let [history-state (get-history-state shadow-hash)] + ;; Found what they thought we had in history, + ;; reset client state to this and continue as normal + (do + (reset! client-state history-state) + (let [new-shadow (swap! client-state sync/patch-state diff)] + (if (= (hasch/uuid new-shadow) hash) + (let [new-state (swap! state sync/patch-state diff)] + (>! @to-db diff)) + (do (log "Applying diff failed --> full sync") + (let [sync-state @state] + (reset! client-state sync-state) + (a/put! to (cs/complete-sync-response sync-state))))))) + ;; Not found in history, do complete sync + (do (log "Hash check failed, not in history --> full sync") + (let [sync-state @state] + (reset! client-state sync-state) + (a/put! to (cs/complete-sync-response sync-state))))))) + :complete (let [new-state (reset! client-state @state)] + (log "full sync") (a/put! to (cs/complete-sync-response new-state))) - (println "Invalid msg:" msg)) + (log "Invalid msg:" msg)) (recur)) (remove-watch state client-id))))) diff --git a/src/cljs/grub/core.cljs b/src/cljs/grub/core.cljs index 329485d..f61b084 100644 --- a/src/cljs/grub/core.cljs +++ b/src/cljs/grub/core.cljs @@ -12,7 +12,7 @@ (state/sync-state! from-remote to-remote reset?))) (defn init-app [] - (view/render-app state/app-state) + (view/render-app state/state) (connect-to-server true)) (init-app) diff --git a/src/cljs/grub/state.cljs b/src/cljs/grub/state.cljs index 85b2825..a87db33 100644 --- a/src/cljs/grub/state.cljs +++ b/src/cljs/grub/state.cljs @@ -6,26 +6,48 @@ (:require-macros [grub.macros :refer [log logs]] [cljs.core.async.macros :refer [go go-loop]])) -(def app-state (atom cs/empty-state)) +(def state (atom cs/empty-state)) + +(def unacked-history (atom {})) + +(defn get-unacked-state [hash] + (logs "Look for history state:" hash) + (get @unacked-history hash)) (defn sync-state! [to from reset?] - (let [client-shadow (atom cs/empty-state)] - (add-watch app-state :app-state (fn [_ _ _ current-state] - (when-let [msg (cs/diff-states @client-shadow current-state)] - (a/put! from msg) - ;; TODO: reset only if send succeeds - (reset! client-shadow current-state)))) + (let [server-state (atom cs/empty-state)] + (add-watch state :state (fn [_ _ _ current-state] + (when-not (= @server-state current-state) + (let [msg (cs/diff-states @server-state current-state)] + (when-not (get @unacked-history (hasch/uuid current-state)) + (logs "state change! msg: " msg) + (swap! unacked-history assoc (hasch/uuid current-state) current-state) + (logs "History:" (keys @unacked-history)) + (a/put! from msg)) + )))) (go-loop [] (if-let [{:keys [type diff hash shadow-hash] :as msg} ( complete sync") - (a/put! from cs/complete-sync-request)))) - :complete (do (reset! client-shadow (:state msg)) - (reset! app-state (:state msg))) + :diff (do + (logs "Received diff:" msg) + (when (not (= (hasch/uuid @server-state) shadow-hash)) + (reset! server-state (get-unacked-state shadow-hash))) + (reset! unacked-history {}) + (let [ ;; what they now think we have (after updating) + new-shadow (swap! server-state #(sync/patch-state % diff))] + ;; should match hash + (if (= (hasch/uuid new-shadow) hash) + ;; apply same changes locally + ;; if there are differences, they will be sent back + (swap! state sync/patch-state diff) + (do (log "Hash check failed --> complete sync") + (a/put! from cs/complete-sync-request))))) + :complete (do + (logs "Complete sync:" (hasch/uuid (:state msg))) + (reset! unacked-history {}) + (reset! server-state (:state msg)) + (reset! state (:state msg))) (logs "Invalid msg:" msg)) (recur)) - (remove-watch app-state :app-state))) + (remove-watch state :state))) (when reset? (a/put! from cs/complete-sync-request)))) diff --git a/src/cljx/grub/common_state.cljx b/src/cljx/grub/common_state.cljx index 396459d..8a174af 100644 --- a/src/cljx/grub/common_state.cljx +++ b/src/cljx/grub/common_state.cljx @@ -16,15 +16,18 @@ :shadow-hash shadow-hash}) (defn diff-states [shadow state] - (when (not= state shadow) - (let [diff (sync/diff-states shadow state) - hash (hasch/uuid state) - shadow-hash (hasch/uuid shadow) - msg (diff-msg diff hash shadow-hash)] - msg + (let [diff (sync/diff-states shadow state) + ;; what we now have + hash (hasch/uuid state) + + ;; what we had/what you used to have + ;; should match what they think we have + shadow-hash (hasch/uuid shadow) + msg (diff-msg diff hash shadow-hash)] + msg ;(logs "Sync because:") ;(logs "Local = " state) ;(logs "Remote = " shadow) ;(logs "Diff:" diff) ;(logs "Send" shadow-hash "->" hash) - ))) + ))