Possibly fully working differential sync
This commit is contained in:
parent
a572b9e1eb
commit
f087309c0f
4 changed files with 88 additions and 57 deletions
|
@ -13,9 +13,11 @@
|
||||||
(def state-history (atom []))
|
(def state-history (atom []))
|
||||||
|
|
||||||
(defn save-history-state [history new-state]
|
(defn save-history-state [history new-state]
|
||||||
(when-not (= (last history) new-state)
|
(when-not (= (hasch/uuid (last history)) (hasch/uuid new-state))
|
||||||
(println "Adding state to history: " (hasch/uuid new-state))
|
(println "Adding state to history: " (hasch/uuid new-state))
|
||||||
(println "History size:" (inc (count history)))
|
(println "History:")
|
||||||
|
(doseq [s (conj history new-state)]
|
||||||
|
(println (hasch/uuid s)))
|
||||||
(conj history new-state)))
|
(conj history new-state)))
|
||||||
|
|
||||||
(defn get-history-state [hash]
|
(defn get-history-state [hash]
|
||||||
|
@ -34,53 +36,57 @@
|
||||||
|
|
||||||
(defn sync-new-client! [to from]
|
(defn sync-new-client! [to from]
|
||||||
(let [client-id (java.util.UUID/randomUUID)
|
(let [client-id (java.util.UUID/randomUUID)
|
||||||
server-shadow (atom cs/empty-state)]
|
client-state (atom cs/empty-state)
|
||||||
|
log (fn [& args]
|
||||||
|
(apply println client-id args))]
|
||||||
(add-watch state client-id (fn [_ _ _ current-state]
|
(add-watch state client-id (fn [_ _ _ current-state]
|
||||||
(when-let [msg (cs/diff-states @server-shadow current-state)]
|
(when-let [msg (cs/diff-states @client-state current-state)]
|
||||||
(a/put! to msg)
|
(a/put! to msg)
|
||||||
|
;; send ACK even if nothing changes
|
||||||
;; TODO: reset only if send succeeds?
|
;; TODO: reset only if send succeeds?
|
||||||
(reset! server-shadow current-state))))
|
(reset! client-state current-state))))
|
||||||
(a/go-loop []
|
(a/go-loop []
|
||||||
(if-let [{:keys [type diff hash shadow-hash] :as msg} (<! from)]
|
(if-let [{:keys [type diff hash shadow-hash] :as msg} (<! from)]
|
||||||
(do (condp = type
|
(do (condp = type
|
||||||
:diff
|
:diff
|
||||||
(if (= (hasch/uuid @server-shadow) shadow-hash)
|
(if (= (hasch/uuid @client-state) shadow-hash)
|
||||||
;; we have what they thought we had
|
;; we have what they thought we had
|
||||||
;; apply changes normally
|
;; apply changes normally
|
||||||
(let [new-shadow (swap! server-shadow sync/patch-state diff)]
|
(let [new-shadow (swap! client-state sync/patch-state diff)]
|
||||||
(println "Hash matched state, apply changes")
|
(log "Hash matched state, apply changes")
|
||||||
(if (= (hasch/uuid new-shadow) hash)
|
(if (= (hasch/uuid new-shadow) hash)
|
||||||
(let [new-state (swap! state sync/patch-state diff)]
|
(let [new-state (swap! state sync/patch-state diff)]
|
||||||
(>! @to-db diff))
|
(>! @to-db diff))
|
||||||
(do (println "Applying diff failed --> full sync")
|
(do (log "Applying diff failed --> full sync")
|
||||||
(let [sync-state @state]
|
(let [sync-state @state]
|
||||||
(reset! server-shadow sync-state)
|
(reset! client-state sync-state)
|
||||||
(a/put! to (cs/complete-sync-response sync-state))))))
|
(a/put! to (cs/complete-sync-response sync-state))))))
|
||||||
;; we have something different than they thought
|
;; We have something different than they thought
|
||||||
;; check history
|
;; Check history
|
||||||
(if-let [history-state (get-history-state shadow-hash)]
|
|
||||||
;; Found what they thought in history,
|
|
||||||
;; reset client state to this
|
|
||||||
;; and continue as normal
|
|
||||||
(do
|
(do
|
||||||
(println "Hash check failed --> Reset from history")
|
(log "Hash check failed --> Reset from history")
|
||||||
(reset! server-shadow history-state)
|
(if-let [history-state (get-history-state shadow-hash)]
|
||||||
(let [new-shadow (swap! server-shadow sync/patch-state diff)]
|
;; Found what they thought we had in history,
|
||||||
|
;; reset client state to this and continue as normal
|
||||||
|
(do
|
||||||
|
(reset! client-state history-state)
|
||||||
|
(let [new-shadow (swap! client-state sync/patch-state diff)]
|
||||||
(if (= (hasch/uuid new-shadow) hash)
|
(if (= (hasch/uuid new-shadow) hash)
|
||||||
(let [new-state (swap! state sync/patch-state diff)]
|
(let [new-state (swap! state sync/patch-state diff)]
|
||||||
(>! @to-db diff))
|
(>! @to-db diff))
|
||||||
(do (println "Applying diff failed --> full sync")
|
(do (log "Applying diff failed --> full sync")
|
||||||
(let [sync-state @state]
|
(let [sync-state @state]
|
||||||
(reset! server-shadow sync-state)
|
(reset! client-state sync-state)
|
||||||
(a/put! to (cs/complete-sync-response sync-state)))))))
|
(a/put! to (cs/complete-sync-response sync-state)))))))
|
||||||
;; No history found, do complete sync
|
;; Not found in history, do complete sync
|
||||||
(do (println "Hash check failed, not in history --> full sync")
|
(do (log "Hash check failed, not in history --> full sync")
|
||||||
(let [sync-state @state]
|
(let [sync-state @state]
|
||||||
(reset! server-shadow sync-state)
|
(reset! client-state sync-state)
|
||||||
(a/put! to (cs/complete-sync-response sync-state))))))
|
(a/put! to (cs/complete-sync-response sync-state)))))))
|
||||||
:complete (let [new-state (reset! server-shadow @state)]
|
:complete (let [new-state (reset! client-state @state)]
|
||||||
|
(log "full sync")
|
||||||
(a/put! to (cs/complete-sync-response new-state)))
|
(a/put! to (cs/complete-sync-response new-state)))
|
||||||
(println "Invalid msg:" msg))
|
(log "Invalid msg:" msg))
|
||||||
(recur))
|
(recur))
|
||||||
(remove-watch state client-id)))))
|
(remove-watch state client-id)))))
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
(state/sync-state! from-remote to-remote reset?)))
|
(state/sync-state! from-remote to-remote reset?)))
|
||||||
|
|
||||||
(defn init-app []
|
(defn init-app []
|
||||||
(view/render-app state/app-state)
|
(view/render-app state/state)
|
||||||
(connect-to-server true))
|
(connect-to-server true))
|
||||||
|
|
||||||
(init-app)
|
(init-app)
|
||||||
|
|
|
@ -6,26 +6,48 @@
|
||||||
(:require-macros [grub.macros :refer [log logs]]
|
(:require-macros [grub.macros :refer [log logs]]
|
||||||
[cljs.core.async.macros :refer [go go-loop]]))
|
[cljs.core.async.macros :refer [go go-loop]]))
|
||||||
|
|
||||||
(def app-state (atom cs/empty-state))
|
(def state (atom cs/empty-state))
|
||||||
|
|
||||||
|
(def unacked-history (atom {}))
|
||||||
|
|
||||||
|
(defn get-unacked-state [hash]
|
||||||
|
(logs "Look for history state:" hash)
|
||||||
|
(get @unacked-history hash))
|
||||||
|
|
||||||
(defn sync-state! [to from reset?]
|
(defn sync-state! [to from reset?]
|
||||||
(let [client-shadow (atom cs/empty-state)]
|
(let [server-state (atom cs/empty-state)]
|
||||||
(add-watch app-state :app-state (fn [_ _ _ current-state]
|
(add-watch state :state (fn [_ _ _ current-state]
|
||||||
(when-let [msg (cs/diff-states @client-shadow current-state)]
|
(when-not (= @server-state current-state)
|
||||||
(a/put! from msg)
|
(let [msg (cs/diff-states @server-state current-state)]
|
||||||
;; TODO: reset only if send succeeds
|
(when-not (get @unacked-history (hasch/uuid current-state))
|
||||||
(reset! client-shadow current-state))))
|
(logs "state change! msg: " msg)
|
||||||
|
(swap! unacked-history assoc (hasch/uuid current-state) current-state)
|
||||||
|
(logs "History:" (keys @unacked-history))
|
||||||
|
(a/put! from msg))
|
||||||
|
))))
|
||||||
(go-loop []
|
(go-loop []
|
||||||
(if-let [{:keys [type diff hash shadow-hash] :as msg} (<! to)]
|
(if-let [{:keys [type diff hash shadow-hash] :as msg} (<! to)]
|
||||||
(do (condp = type
|
(do (condp = type
|
||||||
:diff (let [new-shadow (swap! client-shadow #(sync/patch-state % diff))]
|
:diff (do
|
||||||
|
(logs "Received diff:" msg)
|
||||||
|
(when (not (= (hasch/uuid @server-state) shadow-hash))
|
||||||
|
(reset! server-state (get-unacked-state shadow-hash)))
|
||||||
|
(reset! unacked-history {})
|
||||||
|
(let [ ;; what they now think we have (after updating)
|
||||||
|
new-shadow (swap! server-state #(sync/patch-state % diff))]
|
||||||
|
;; should match hash
|
||||||
(if (= (hasch/uuid new-shadow) hash)
|
(if (= (hasch/uuid new-shadow) hash)
|
||||||
(swap! app-state #(sync/patch-state % diff))
|
;; apply same changes locally
|
||||||
|
;; if there are differences, they will be sent back
|
||||||
|
(swap! state sync/patch-state diff)
|
||||||
(do (log "Hash check failed --> complete sync")
|
(do (log "Hash check failed --> complete sync")
|
||||||
(a/put! from cs/complete-sync-request))))
|
(a/put! from cs/complete-sync-request)))))
|
||||||
:complete (do (reset! client-shadow (:state msg))
|
:complete (do
|
||||||
(reset! app-state (:state msg)))
|
(logs "Complete sync:" (hasch/uuid (:state msg)))
|
||||||
|
(reset! unacked-history {})
|
||||||
|
(reset! server-state (:state msg))
|
||||||
|
(reset! state (:state msg)))
|
||||||
(logs "Invalid msg:" msg))
|
(logs "Invalid msg:" msg))
|
||||||
(recur))
|
(recur))
|
||||||
(remove-watch app-state :app-state)))
|
(remove-watch state :state)))
|
||||||
(when reset? (a/put! from cs/complete-sync-request))))
|
(when reset? (a/put! from cs/complete-sync-request))))
|
||||||
|
|
|
@ -16,9 +16,12 @@
|
||||||
:shadow-hash shadow-hash})
|
:shadow-hash shadow-hash})
|
||||||
|
|
||||||
(defn diff-states [shadow state]
|
(defn diff-states [shadow state]
|
||||||
(when (not= state shadow)
|
|
||||||
(let [diff (sync/diff-states shadow state)
|
(let [diff (sync/diff-states shadow state)
|
||||||
|
;; what we now have
|
||||||
hash (hasch/uuid state)
|
hash (hasch/uuid state)
|
||||||
|
|
||||||
|
;; what we had/what you used to have
|
||||||
|
;; should match what they think we have
|
||||||
shadow-hash (hasch/uuid shadow)
|
shadow-hash (hasch/uuid shadow)
|
||||||
msg (diff-msg diff hash shadow-hash)]
|
msg (diff-msg diff hash shadow-hash)]
|
||||||
msg
|
msg
|
||||||
|
@ -27,4 +30,4 @@
|
||||||
;(logs "Remote = " shadow)
|
;(logs "Remote = " shadow)
|
||||||
;(logs "Diff:" diff)
|
;(logs "Diff:" diff)
|
||||||
;(logs "Send" shadow-hash "->" hash)
|
;(logs "Send" shadow-hash "->" hash)
|
||||||
)))
|
))
|
||||||
|
|
Loading…
Reference in a new issue