Separate state handle, sync algorithm
This commit is contained in:
parent
1b8339dec0
commit
dc355eb6ec
4 changed files with 142 additions and 124 deletions
12
src/clj/grub/message.clj
Normal file
12
src/clj/grub/message.clj
Normal file
|
@ -0,0 +1,12 @@
|
|||
(ns grub.message)
|
||||
|
||||
(def full-sync-request {:type :complete})
|
||||
|
||||
(defn full-sync [state]
|
||||
{:type :complete
|
||||
:state state})
|
||||
|
||||
(defn diff-msg [diff shadow-hash]
|
||||
{:type :diff
|
||||
:diff diff
|
||||
:shadow-hash shadow-hash})
|
|
@ -1,146 +1,57 @@
|
|||
(ns grub.state
|
||||
(:require [grub.diff :as diff]
|
||||
[grub.util :as util]
|
||||
[grub.common-state :as cs]
|
||||
[clojure.core.async :as a :refer [<! >! chan go]]
|
||||
[hasch.core :as hasch]))
|
||||
|
||||
(defn initial-state [grubs recipes]
|
||||
[{:grubs (util/map-by-key :id grubs)
|
||||
:recipes (util/map-by-key :id recipes)}])
|
||||
[grub.message :as message]
|
||||
[grub.sync :as sync]
|
||||
[clojure.core.async :as a :refer [<! >! chan go]]))
|
||||
|
||||
;; Server state
|
||||
(def states (ref []))
|
||||
|
||||
(defn get-history-state [states hash]
|
||||
(:state (first (filter #(= (:hash %) hash) states))))
|
||||
|
||||
(defn add-history-state [states new-state]
|
||||
(let [{:keys [state hash]} (first states)
|
||||
new-hash (hasch/uuid new-state)]
|
||||
(if (= hash new-hash)
|
||||
states
|
||||
(conj states {:hash hash :state state}))))
|
||||
|
||||
(defn receive-diff [states diff shadow-hash]
|
||||
(let [state (:state (first states))
|
||||
shadow (get-history-state states shadow-hash)]
|
||||
(if shadow
|
||||
{:new-state (diff/patch-state state diff)
|
||||
:new-shadow (diff/patch-state shadow diff)
|
||||
:full-sync? false}
|
||||
{:new-state state
|
||||
:new-shadow state
|
||||
:full-sync? true})))
|
||||
|
||||
(defn apply-diff? [states diff shadow-hash]
|
||||
(get-history-state states shadow-hash))
|
||||
|
||||
(def to-db (atom nil))
|
||||
|
||||
(defn make-client [in client-state states]
|
||||
(let [out (chan)
|
||||
full-sync! (fn [] (let [new-client (dosync (ref-set client-state @states))]
|
||||
(a/put! out (cs/complete-sync-response new-client))))]
|
||||
full-sync! (fn []
|
||||
(let [new-client (dosync (let [state (sync/get-current-state @states)]
|
||||
(ref-set client-state state)))]
|
||||
(println "full-sync!")
|
||||
(a/put! out (message/full-sync new-client))))]
|
||||
(a/go-loop
|
||||
[]
|
||||
(when-let [{:keys [type diff shadow-hash state]} (<! in)]
|
||||
(condp = type
|
||||
:new-state (a/put! out (diff/diff-states @client-state (first @states)))
|
||||
:diff (dosync
|
||||
(let [state (:state (first @states))
|
||||
shadow (get-history-state states shadow-hash)]
|
||||
(if shadow
|
||||
(do (alter states add-history-state (diff/patch-state state diff))
|
||||
(alter client-state diff/patch-state shadow diff))
|
||||
(full-sync!))))
|
||||
:full-sync (full-sync!)
|
||||
nil)
|
||||
(recur)))
|
||||
[]
|
||||
(when-let [msg (<! in)]
|
||||
(condp = (:type msg)
|
||||
:new-state (let [diff-result (sync/diff-states (:new-states msg) @client-state)
|
||||
{:keys [diff shadow-hash]} diff-result
|
||||
out-msg (message/diff-msg diff shadow-hash)]
|
||||
(println "new-state!")
|
||||
(a/put! out out-msg))
|
||||
:diff (dosync
|
||||
(println "diff!")
|
||||
(let [{:keys [diff shadow-hash]} msg
|
||||
apply-result (sync/apply-diff @states diff shadow-hash)
|
||||
{:keys [new-states new-shadow full-sync?]} apply-result]
|
||||
(ref-set states new-states)
|
||||
(ref-set client-state new-shadow)
|
||||
(when full-sync? (full-sync!))))
|
||||
:full-sync (full-sync!)
|
||||
nil)
|
||||
(recur)))
|
||||
out))
|
||||
|
||||
(defn sync-new-client! [>client <client]
|
||||
(let [client-id (java.util.UUID/randomUUID)
|
||||
state-changes (chan)
|
||||
state-change-events (a/map< (fn [s] {:type :new-state :new-states s}) state-changes)
|
||||
events (chan)
|
||||
client-state (ref cs/empty-state)]
|
||||
(add-watch states client-id (fn [_ _ _ [state _]] (a/put! state-changes state)))
|
||||
(a/pipe (a/merge <client
|
||||
(a/map< (fn [s] {:type :new-state :state s}) state-changes))
|
||||
events)
|
||||
(let [out (make-client events client-id client-state states)]
|
||||
client-state (ref sync/empty-state)]
|
||||
(add-watch states client-id (fn [_ _ _ new-states] (a/put! state-changes new-states)))
|
||||
(a/pipe (a/merge [<client state-change-events]) events)
|
||||
(let [out (make-client events client-state states)]
|
||||
(a/go-loop [] (if-let [v (<! out)]
|
||||
(do (>! >client v)
|
||||
(recur))
|
||||
(remove-watch states client-id))))
|
||||
|
||||
;; (let [full-sync! (fn [] (let [new-client (dosync (ref-set client-state @states))]
|
||||
;; (a/put! >client (cs/complete-sync-response new-client))))]
|
||||
;; (a/go-loop
|
||||
;; []
|
||||
;; (if-let [{:keys [type diff shadow-hash state]} (<! events)]
|
||||
;; (condp = type
|
||||
;; :new-state (a/put! >client (diff/diff-states @client-state (first @states)))
|
||||
;; :diff (dosync
|
||||
;; (let [state (:state (first @states))
|
||||
;; shadow (get-history-state states shadow-hash)]
|
||||
;; (if shadow
|
||||
;; (do (alter states add-history-state (diff/patch-state state diff))
|
||||
;; (alter client-state diff/patch-state shadow diff))
|
||||
;; (full-sync!))))
|
||||
;; :full-sync (full-sync!))
|
||||
;; (remove-watch states client-id))))
|
||||
))
|
||||
|
||||
;; (defn sync-new-client! [to from]
|
||||
;; (let [client-id (java.util.UUID/randomUUID)
|
||||
;; state-changes (chan)]
|
||||
;; (add-watch states client-id (fn [_ _ _ [current-state _]]
|
||||
;; (put! state-changes current-state)))
|
||||
;; (a/go-loop [client-state cs/empty-state]
|
||||
;; (if-let [[{:keys [type] :as msg} c] (<! from)]
|
||||
;; (do (condp = type
|
||||
;; :diff
|
||||
;; (if (= (hasch/uuid @client-state) shadow-hash)
|
||||
;; ;; We have what they thought we had
|
||||
;; ;; Apply changes normally
|
||||
;; (let [new-shadow (swap! client-state diff/patch-state diff)]
|
||||
;; (log "Hash matched state, apply changes")
|
||||
;; (if (= (hasch/uuid new-shadow) hash)
|
||||
;; (let [new-state (swap! state diff/patch-state diff)]
|
||||
;; (>! @to-db diff))
|
||||
;; (do (log "Applying diff failed --> full sync")
|
||||
;; (let [sync-state @state]
|
||||
;; (reset! client-state sync-state)
|
||||
;; (a/put! to (cs/complete-sync-response sync-state))))))
|
||||
;; ;; We have something different than they thought
|
||||
;; ;; Check history
|
||||
;; (do
|
||||
;; (log "Hash check failed --> Reset from history")
|
||||
;; (if-let [history-state (get-history-state shadow-hash)]
|
||||
;; ;; Found what they thought we had in history,
|
||||
;; ;; reset client state to this and continue as normal
|
||||
;; (do
|
||||
;; (reset! client-state history-state)
|
||||
;; (let [new-shadow (swap! client-state diff/patch-state diff)]
|
||||
;; (if (= (hasch/uuid new-shadow) hash)
|
||||
;; (let [new-state (swap! state diff/patch-state diff)]
|
||||
;; (>! @to-db diff))
|
||||
;; (do (log "Applying diff failed --> full sync")
|
||||
;; (let [sync-state @state]
|
||||
;; (reset! client-state sync-state)
|
||||
;; (a/put! to (cs/complete-sync-response sync-state)))))))
|
||||
;; ;; Not found in history, do complete sync
|
||||
;; (do (log "Hash check failed, not in history --> full sync")
|
||||
;; (let [sync-state @state]
|
||||
;; (reset! client-state sync-state)
|
||||
;; (a/put! to (cs/complete-sync-response sync-state)))))))
|
||||
;; :complete (let [new-state (reset! client-state @state)]
|
||||
;; (log "full sync")
|
||||
;; (a/put! to (cs/complete-sync-response new-state)))
|
||||
;; (log "Invalid msg:" msg))
|
||||
;; (recur))
|
||||
;; (remove-watch state client-id)))))
|
||||
(remove-watch states client-id))))))
|
||||
|
||||
(defn init [_to-db grubs recipes]
|
||||
(reset! states (initial-state grubs recipes))
|
||||
(dosync (ref-set states (sync/initial-state grubs recipes)))
|
||||
(reset! to-db _to-db))
|
||||
|
|
40
src/clj/grub/sync.clj
Normal file
40
src/clj/grub/sync.clj
Normal file
|
@ -0,0 +1,40 @@
|
|||
(ns grub.sync
|
||||
(:require [grub.diff :as diff]
|
||||
[grub.util :as util]
|
||||
[hasch.core :as hasch]))
|
||||
|
||||
(def empty-state {:grubs {} :recipes {}})
|
||||
|
||||
(defn initial-state [grubs recipes]
|
||||
(let [state {:grubs (util/map-by-key :id grubs)
|
||||
:recipes (util/map-by-key :id recipes)}]
|
||||
[{:state state :hash (hasch/uuid state)}]))
|
||||
|
||||
(defn get-current-state [states]
|
||||
(:state (last states)))
|
||||
|
||||
(defn get-history-state [states hash]
|
||||
(:state (first (filter #(= (:hash %) hash) states))))
|
||||
|
||||
(defn add-history-state [states new-state]
|
||||
(let [last-hash (:hash (last states))
|
||||
new-hash (hasch/uuid new-state)]
|
||||
(if (= last-hash new-hash)
|
||||
states
|
||||
(conj states {:hash new-hash :state new-state}))))
|
||||
|
||||
(defn diff-states [states shadow]
|
||||
(let [state (get-current-state states)]
|
||||
{:shadow-hash (hasch/uuid shadow)
|
||||
:diff (diff/diff-states shadow state)}))
|
||||
|
||||
(defn apply-diff [states diff shadow-hash]
|
||||
(let [state (:state (first states))
|
||||
shadow (get-history-state states shadow-hash)]
|
||||
(if shadow
|
||||
{:new-states (add-history-state states (diff/patch-state state diff))
|
||||
:new-shadow (diff/patch-state shadow diff)
|
||||
:full-sync? false}
|
||||
{:new-states states
|
||||
:new-shadow state
|
||||
:full-sync? true})))
|
55
src/test/grub/test/unit/sync.clj
Normal file
55
src/test/grub/test/unit/sync.clj
Normal file
|
@ -0,0 +1,55 @@
|
|||
(ns grub.test.unit.sync
|
||||
(:require [grub.sync :as s]
|
||||
[clojure.test :refer :all]
|
||||
[hasch.core :as hasch]))
|
||||
|
||||
(deftest apply-diff-normally
|
||||
;; Apply changes and return ACK for in sync client/server
|
||||
(let [state {:grubs {"1" {:text "2 apples" :completed false}}
|
||||
:recipes {}}
|
||||
hash (hasch/uuid state)
|
||||
states [{:hash hash :state state}]
|
||||
diff {:grubs {:updated {"1" {:completed true}} :deleted #{}}}
|
||||
shadow-hash hash
|
||||
{:keys [new-states new-shadow full-sync?]} (s/receive-diff states diff shadow-hash)]
|
||||
(do
|
||||
(is (= {:grubs {"1" {:text "2 apples" :completed true}}
|
||||
:recipes {}}
|
||||
(:state (last new-states))))
|
||||
(is (= {:grubs {"1" {:text "2 apples" :completed true}}
|
||||
:recipes {}}
|
||||
new-shadow))
|
||||
(is (not full-sync?)))))
|
||||
|
||||
(deftest server-state-changed
|
||||
;; Send differences back if server state changed
|
||||
(let [state {:grubs {"1" {:text "3 apples" :completed false}} :recipes {}}
|
||||
prev {:grubs {"1" {:text "2 apples" :completed false}} :recipes {}}
|
||||
states [{:hash (hasch/uuid state) :state state}
|
||||
{:hash (hasch/uuid prev) :state prev}]
|
||||
diff {:grubs {:updated {"1" {:completed true}} :deleted #{}}}
|
||||
shadow-hash (hasch/uuid prev)
|
||||
{:keys [new-states new-shadow full-sync?]} (s/receive-diff states diff shadow-hash)]
|
||||
(do
|
||||
(is (= {:grubs {"1" {:text "3 apples" :completed true}}
|
||||
:recipes {}}
|
||||
(:state (last new-states))))
|
||||
(is (= {:grubs {"1" {:text "2 apples" :completed true}}
|
||||
:recipes {}}
|
||||
new-shadow))
|
||||
(is (not full-sync?)))))
|
||||
|
||||
(deftest full-sync-if-client-too-far-out-of-sync
|
||||
;; Shadow hash not in history means client has fallen too far
|
||||
;; out of sync. Send a full sync
|
||||
(let [state {:grubs {"1" {:text "2 apples" :completed false}} :recipes {}}
|
||||
prev {:grubs {"1" {:text "2 apples" :completed false}} :recipes {}}
|
||||
states [{:hash (hasch/uuid state) :state state}
|
||||
{:hash (hasch/uuid prev) :state prev}]
|
||||
shadow-hash (hasch/uuid {:grubs {} :recipes {}})
|
||||
diff {:grubs {:updated {"1" {:completed true}} :deleted #{}}}
|
||||
{:keys [new-states new-shadow full-sync?]} (s/receive-diff states diff shadow-hash)]
|
||||
(do
|
||||
(is (= state (:state (last new-states))))
|
||||
(is (= state new-shadow))
|
||||
(is full-sync?))))
|
Loading…
Reference in a new issue