Working n-way client sync with Datomic

- Based on latest CSP specification
- Also added end-to-end test for eventual consistency
This commit is contained in:
Nicholas Kariniemi 2015-12-10 08:59:32 +02:00
parent 4058d17102
commit b31489b2b8
21 changed files with 360 additions and 352 deletions

View file

@ -15,10 +15,14 @@
[org.clojure/tools.cli "0.3.1"] [org.clojure/tools.cli "0.3.1"]
[sablono "0.3.4"] [sablono "0.3.4"]
[cljs-uuid "0.0.4"] [cljs-uuid "0.0.4"]
[midje "1.6.3"] [midje "1.8.2"]
[com.cognitect/transit-clj "0.8.275"] [com.cognitect/transit-clj "0.8.275"]
[com.cognitect/transit-cljs "0.8.220"] [com.cognitect/transit-cljs "0.8.220"]
[com.datomic/datomic-pro "0.9.5173" :exclusions [com.fasterxml.jackson.core/jackson-annotations]]] [com.datomic/datomic-pro "0.9.5344"
:exclusions [com.fasterxml.jackson.core/jackson-annotations
org.apache.httpcomponents/httpclient]]
[clj-webdriver "0.7.2"]
[org.seleniumhq.selenium/selenium-java "2.47.0"]]
:repositories {"my.datomic.com" {:url "https://my.datomic.com/repo" :repositories {"my.datomic.com" {:url "https://my.datomic.com/repo"
:creds :gpg}} :creds :gpg}}
:profiles {:uberjar {:aot :all} :profiles {:uberjar {:aot :all}

View file

@ -3,6 +3,7 @@
(:require [grub.websocket :as ws] (:require [grub.websocket :as ws]
[grub.db :as db] [grub.db :as db]
[grub.server-sync :as sync] [grub.server-sync :as sync]
[grub.test.e2e.sync :as e2e]
[ring.middleware.resource :as resource] [ring.middleware.resource :as resource]
[ring.middleware.content-type :as content-type] [ring.middleware.content-type :as content-type]
[ring.util.response :as resp] [ring.util.response :as resp]
@ -49,18 +50,21 @@
:port 3000 :port 3000
:stop-server nil}) :stop-server nil})
(defn sync-client-with-db! [ws-channel db-conn] (defn sync-client-with-db! [ws-channel db-conn db-reports]
(let [from-client (chan) (let [from-client (chan)
to-client (chan) to-client (chan)
diffs (chan) diffs (chan)
full-sync-reqs (chan) full-sync-reqs (chan)
{:keys [report-queue tap]} (db/report-queue-subscribe db-reports)
on-close (fn [] on-close (fn []
(db/report-queue-unsubscribe db-reports tap)
(a/close! from-client) (a/close! from-client)
(a/close! to-client) (a/close! to-client)
(a/close! diffs) (a/close! diffs)
(a/close! full-sync-reqs))] (a/close! full-sync-reqs)
)]
(ws/add-connected-client! ws-channel to-client from-client on-close) (ws/add-connected-client! ws-channel to-client from-client on-close)
(sync/sync-server! to-client diffs full-sync-reqs db-conn) (sync/start-sync! to-client diffs full-sync-reqs db-conn report-queue)
(go (loop [] (let [event (<! from-client)] (go (loop [] (let [event (<! from-client)]
(cond (cond
(nil? event) nil ;; drop out of loop (nil? event) nil ;; drop out of loop
@ -68,10 +72,10 @@
(= (:type event) :full-sync-request) (do (>! full-sync-reqs event) (recur)) (= (:type event) :full-sync-request) (do (>! full-sync-reqs event) (recur))
:else (do (println "Unknown event:" event) (recur)))))))) :else (do (println "Unknown event:" event) (recur))))))))
(defn handle-websocket [handler db-conn] (defn handle-websocket [handler db-conn db-reports]
(fn [{:keys [websocket?] :as request}] (fn [{:keys [websocket?] :as request}]
(if websocket? (if websocket?
(httpkit/with-channel request ws-channel (sync-client-with-db! ws-channel db-conn)) (httpkit/with-channel request ws-channel (sync-client-with-db! ws-channel db-conn db-reports))
(handler request)))) (handler request))))
(defn handle-root [handler index] (defn handle-root [handler index]
@ -86,20 +90,22 @@
(resp/not-found "") (resp/not-found "")
(handler req)))) (handler req))))
(defn make-handler [{:keys [index]} db-conn] (defn make-handler [{:keys [index]} db-conn db-reports]
(-> (fn [req] (resp/not-found "Not found")) (-> (fn [req] (resp/not-found "Not found"))
(resource/wrap-resource "public") (resource/wrap-resource "public")
(content-type/wrap-content-type) (content-type/wrap-content-type)
(handle-root index) (handle-root index)
(handle-websocket db-conn) (handle-websocket db-conn db-reports)
(wrap-bounce-favicon))) (wrap-bounce-favicon)))
(defn start [{:keys [port database-uri] :as system}] (defn start [{:keys [port database-uri] :as system}]
(let [db-conn (db/connect database-uri) (let [db-conn (db/connect database-uri)
stop-server (httpkit/run-server (make-handler system db-conn) {:port port})] db-reports (db/report-queue-channel db-conn)
stop-server (httpkit/run-server (make-handler system db-conn db-reports) {:port port})]
(println "Started server on localhost:" port) (println "Started server on localhost:" port)
(assoc system (assoc system
:db-conn db-conn :db-conn db-conn
:db-reports db-reports
:stop-server stop-server))) :stop-server stop-server)))
(defn stop [{:keys [db-conn stop-server] :as system}] (defn stop [{:keys [db-conn stop-server] :as system}]
@ -143,6 +149,9 @@
(case (first arguments) (case (first arguments)
"development" (start (merge dev-system options)) "development" (start (merge dev-system options))
"dev" (start (merge dev-system options)) "dev" (start (merge dev-system options))
"e2e" (let [system (start (merge dev-system options))]
(e2e/run-e2e-tests system)
(stop system))
"production" (start (merge prod-system options)) "production" (start (merge prod-system options))
"prod" (start (merge prod-system options)) "prod" (start (merge prod-system options))
(exit 1 (usage summary))))) (exit 1 (usage summary)))))

View file

@ -132,9 +132,6 @@
(defn disconnect [conn] (defn disconnect [conn]
(d/release conn)) (d/release conn))
(defn get-history-state [db-conn tag]
(get-current-state db-conn))
(defn diff-tx [diff] (defn diff-tx [diff]
(let [grubs-upsert-tx (->> diff (let [grubs-upsert-tx (->> diff
:grubs :grubs
@ -164,3 +161,24 @@
(defn patch-state! [conn diff] (defn patch-state! [conn diff]
@(d/transact conn (diff-tx diff))) @(d/transact conn (diff-tx diff)))
(defn report-queue-channel [conn]
(let [queue (d/tx-report-queue conn)
changes (chan)
pub (a/mult changes)]
(go (loop []
(let [report (.. queue take)]
(>! changes report)
(recur))))
pub))
(defn report-queue-subscribe [report-ch]
(let [reports (chan)
report-buffer (chan (a/sliding-buffer 1))]
(a/tap report-ch reports)
(a/pipe reports report-buffer)
{:report-queue report-buffer
:tap report-ch}))
(defn report-queue-unsubscribe [report-ch tap]
(a/untap report-ch tap))

View file

@ -1,53 +1,71 @@
(ns grub.server-sync (ns grub.server-sync
(:require [grub.diff :as diff] (:require [grub.db :as db]
[grub.state :as state] [grub.diff :as diff]
[grub.event :as event]
[grub.util :as util]
[datomic.api :as d] [datomic.api :as d]
[clojure.core.async :as a :refer [<! >! chan go]] [clojure.core.async :as a :refer [<! >! chan go]]))
[grub.db :as db]
[clojure.pprint :refer [pprint]]))
(defn full-sync [state tag] (def DEBUG false)
{:type :full-sync
:full-state state
:tag tag})
(def empty-state state/empty-state) (defn make-printer []
(let [print-chan (chan)]
(go (loop []
(println (<! print-chan))
(recur)))
print-chan))
(defn diff-msg [shadow state] (def debug-print
(println "diff-msg") (let [printer (make-printer)]
(let [diff (diff/diff-states shadow state)] (fn [msg] (when DEBUG (a/put! printer msg)))))
{:type :diff
:diff diff
:tag (:tag state)
:shadow-tag (:tag shadow)}))
(defn sync-server! [to-client diffs full-sync-reqs db-conn] (defn rand-id [] (util/rand-str 10))
(go (loop []
(let [[event ch] (a/alts! [full-sync-reqs diffs])]
(when-not (nil? event)
(condp = ch
diffs
(let [{:keys [diff shadow-tag tag]} event
client-shadow-db (d/as-of (d/db db-conn) shadow-tag)
client-shadow-state (db/get-current-db-state client-shadow-db)
{:keys [db-after]} (db/patch-state! db-conn diff)
new-tag (d/basis-t db-after)
new-state (assoc (db/get-current-db-state db-after) :tag new-tag)
new-shadow (assoc (diff/patch-state client-shadow-state diff) :tag tag)
return-diff (diff-msg new-shadow new-state)]
(println "************************* as-of:" new-tag)
(println "client-shadow:" (pprint (dissoc client-shadow-state :recipes)))
(println "new-state:" (pprint (dissoc new-state :recipes)))
(println "new-shadow" (pprint (dissoc new-shadow :recipes)))
;(println "**************************history-state:" history-state)
;(println "**************************new-state:" new-state)
;(println "**************************new-shadow:" new-shadow)
;(println "return diff:" return-diff)
(>! to-client return-diff)
(recur))
full-sync-reqs (defn start-sync! [to-client diffs full-sync-reqs db-conn report-queue]
(do (>! to-client (full-sync (db/get-current-state db-conn) (d/basis-t (d/db db-conn)))) (let [id (rand-id)]
(recur)) (go (loop [client-tag nil
(do (println "Unhandled event:" event) awaiting-state? true]
(recur)))))))) (let [channels (if awaiting-state? [full-sync-reqs diffs] [full-sync-reqs diffs report-queue])
[event ch] (a/alts! channels)]
(when-not (nil? event)
(condp = ch
diffs
(let [{:keys [diff shadow-tag tag]} event
client-shadow-db (d/as-of (d/db db-conn) shadow-tag)
client-shadow-state (db/get-current-db-state client-shadow-db)
a (debug-print (str id " " "Got diff from client: " shadow-tag " -> " tag))
{:keys [db-after]} (db/patch-state! db-conn diff)
new-tag (d/basis-t db-after)
new-state (assoc (db/get-current-db-state db-after) :tag new-tag)
new-shadow (assoc (diff/patch-state client-shadow-state diff) :tag tag)
return-diff (event/diff-msg new-shadow new-state)]
(debug-print (str id " " "Send diff to client : " tag " -> " new-tag))
(>! to-client return-diff)
(recur new-tag false))
full-sync-reqs
(let [current-db (d/db db-conn)
current-tag (d/basis-t current-db)
current-state (assoc (db/get-current-db-state current-db) :tag current-tag)]
(debug-print (str id " " "Full sync client to : " current-tag))
(>! to-client (event/full-sync current-state))
(recur current-tag false))
report-queue
(let [tx-report event
new-db-state (:db-after tx-report)
new-tag (d/basis-t new-db-state)]
(if (>= client-tag new-tag)
;; Already up to date, do nothing
(do (debug-print (str id " " "Got report " new-tag " but client already up-to-date at " new-tag))
(recur client-tag false))
;; Changes, send them down
(let [new-state (assoc (db/get-current-db-state new-db-state) :tag new-tag)
client-db (d/as-of (d/db db-conn) client-tag)
client-state (assoc (db/get-current-db-state client-db) :tag client-tag)]
(debug-print (str id " " "Got report, send diff to client: " client-tag " -> " new-tag))
(>! to-client (event/diff-msg client-state new-state))
(recur new-tag false))))
(throw (Throwable. "Bug: Received an event on unknown channel")))))))))

View file

@ -1,57 +1,65 @@
(ns grub.client-sync (ns grub.client-sync
(:require [grub.diff :as diff] (:require [grub.diff :as diff]
[grub.state :as state] [grub.state :as state]
[grub.event :as event]
#?(:cljs [cljs.core.async :as a :refer [<! >! chan]] #?(:cljs [cljs.core.async :as a :refer [<! >! chan]]
:clj [clojure.core.async :as a :refer [<! >! chan go]])) :clj [clojure.core.async :as a :refer [<! >! chan go]]))
#?(:cljs (:require-macros [cljs.core.async.macros :refer [go]]))) #?(:cljs (:require-macros [cljs.core.async.macros :refer [go]])))
(def DEBUG true) (def DEBUG false)
(def full-sync-request {:type :full-sync-request}) (defn sync-client! [initial-state to-server ui-state-buffer diffs full-syncs connected ui-state]
(go (loop [client-state initial-state
server-state initial-state
awaiting-ack? false]
(let [channels (if awaiting-ack?
[diffs full-syncs connected]
[diffs full-syncs connected ui-state-buffer])
[event ch] (a/alts! channels)]
(when DEBUG (println event))
(when-not (nil? event)
(condp = ch
full-syncs (let [{:keys [full-state]} event]
(reset! ui-state full-state)
(when DEBUG (println "Full sync, new ui state tag:" (:tag @ui-state)))
(recur full-state full-state false))
ui-state-buffer (let [new-ui-state @ui-state]
(if (state/state= server-state new-ui-state)
(recur server-state server-state false)
(do
(when DEBUG (println "Changes, current ui state tag:" (:tag new-ui-state)))
(>! to-server (event/diff-msg server-state new-ui-state))
(recur new-ui-state server-state true))))
diffs (let [{:keys [diff]} event]
(if (= (:shadow-tag diff) (:tag server-state))
;; Our state is based on what they think it's based on
(let [;; Update server state we are based on
new-server-state (diff/patch-state client-state diff)
;; Apply changes directly to UI
new-client-state (swap! ui-state diff/patch-state diff)]
(when DEBUG (println "Applied diff, new ui tag:" (:tag new-client-state)))
(when DEBUG (println "Applied diff, new server tag:" (:tag new-server-state)))
;; If there are any diffs to reconcile, they will come back through input buffer
(recur new-client-state new-server-state false))
(defn diff-msg [shadow state] ;; State mismatch, do full sync
(let [diff (diff/diff-states shadow state)] (do (>! to-server (event/full-sync-request))
{:type :diff (recur client-state server-state true))))
:diff diff connected
:tag (:tag state) ;; Need to make sure we are in sync, send diff
:shadow-tag (:tag shadow)})) (do
(when DEBUG (println "Reconnected, sending diff"))
(>! to-server (event/diff-msg server-state @ui-state))
(recur client-state server-state true))
(defn update-states [states diff] (throw "Bug: Received a sync event on an unknown channel")))))))
(let [state (state/get-latest states)
new-state (diff/patch-state state diff)]
(state/add states new-state)))
(defn sync-client! [to-server new-ui-states diffs full-syncs ui-state] (defn start-sync! [to-server new-ui-states diffs full-syncs connected ui-state]
(let [ui-state-buffer (chan (a/sliding-buffer 1))] (let [ui-state-buffer (chan (a/sliding-buffer 1))]
(a/pipe new-ui-states ui-state-buffer) (a/pipe new-ui-states ui-state-buffer)
(reset! ui-state state/empty-state) (go (<! connected)
(go (loop [state (assoc @ui-state :tag 0) (>! to-server (event/full-sync-request))
shadow state (let [full-sync-event (<! full-syncs)
awaiting-ack? false] initial-state (:full-state full-sync-event)]
(let [channels (if awaiting-ack? [diffs full-syncs] [diffs full-syncs ui-state-buffer])] (reset! ui-state initial-state)
(let [[event ch] (a/alts! channels)] (sync-client! initial-state to-server ui-state-buffer diffs full-syncs connected ui-state)))))
(when DEBUG (println event))
(when-not (nil? event)
(condp = ch
ui-state-buffer (let [new-state (assoc event :tag (inc (:tag state)))]
(println "new-state:\n" new-state)
(>! to-server (diff-msg shadow new-state))
(recur new-state shadow true))
full-syncs (let [{:keys [full-state tag]} event
new-tag (inc (:tag state))
new-state (assoc full-state :tag new-tag)]
(reset! ui-state full-state)
(recur new-state (assoc full-state :tag tag) false))
diffs (let [{:keys [diff shadow-tag tag]} event]
(cond (< shadow-tag (:tag state)) (recur state shadow false)
(= shadow-tag (:tag state))
(let [new-shadow (assoc (diff/patch-state state diff) :tag tag)
new-state (assoc (swap! ui-state diff/patch-state diff) :tag (inc (:tag state)))]
(if (state/state= new-shadow new-state)
(recur new-state new-shadow false)
(do (>! to-server (diff-msg new-shadow new-state))
(recur new-state new-shadow true))))
:else (do (>! to-server (full-sync-request (:tag shadow)))
(recur state shadow true))))
(println "An error occurred, received value on unknown channel")))))))
(a/put! to-server full-sync-request)))

View file

@ -6,42 +6,22 @@
(set/difference (into #{} (keys a)) (into #{} (keys b)))) (set/difference (into #{} (keys a)) (into #{} (keys b))))
(defn updated [a b] (defn updated [a b]
(println "*******************updated")
(println "********************a:" a "\n\n\n")
(println "********************b:" b "\n\n\n")
(println "diff:" (second (data/diff a b)) "\n\n\n")
(second (data/diff a b))) (second (data/diff a b)))
(def a {:grub-e1ff4b5a-05eb-4364-8884-fc124ac1091c {:id :grub-e1ff4b5a-05eb-4364-8884-fc124ac1091c, :text "a", :completed false}})
(def b {:grub-e1ff4b5a-05eb-4364-8884-fc124ac1091c {:id :grub-e1ff4b5a-05eb-4364-8884-fc124ac1091c, :text "a", :completed false}})
(def d (second (data/diff a b)))
(defn diff-maps [a b] (defn diff-maps [a b]
(when (and (map? a) (map? b)) (when (and (map? a) (map? b))
{:- (deleted a b) {:- (deleted a b)
:+ (updated a b)})) :+ (updated a b)}))
(defn diff-keys [prev next]
(->> prev
(keys)
(map (fn [k] [k (diff-maps (k prev) (k next))]))
(filter #(not (nil? (second %))))
(into {})))
(defn diff-states [prev next] (defn diff-states [prev next]
(println "diff states")
(println "prev:" (dissoc prev :recipes))
(println "next:" (dissoc next :recipes))
(let [prev* (dissoc prev :tag) (let [prev* (dissoc prev :tag)
next* (dissoc next :tag)] next* (dissoc next :tag)]
(->> prev* (->> prev*
(keys) (keys)
(map (fn [k] [k (diff-maps (k prev*) (k next*))])) (map (fn [k] [k (diff-maps (k prev*) (k next*))]))
(filter #(not (nil? (second %)))) (filter #(not (nil? (second %))))
(into {})))) (into {})
(#(assoc % :shadow-tag (:tag prev) :tag (:tag next))))))
(defn patch-map [state diff] (defn patch-map [state diff]
(-> state (-> state
@ -52,4 +32,5 @@
(->> state (->> state
(keys) (keys)
(map (fn [k] [k (patch-map (k state) (k diff))])) (map (fn [k] [k (patch-map (k state) (k diff))]))
(into {}))) (into {})
(#(assoc % :tag (:tag diff)))))

19
src/cljc/grub/event.cljc Normal file
View file

@ -0,0 +1,19 @@
(ns grub.event
(:require [grub.diff :as diff]))
(defn full-sync-request []
{:type :full-sync-request})
(defn diff-msg [shadow state]
(let [diff (diff/diff-states shadow state)]
{:type :diff
:diff diff
:tag (:tag state)
:shadow-tag (:tag shadow)}))
(defn connected []
{:type :connected})
(defn full-sync [state]
{:type :full-sync
:full-state state})

View file

@ -1,28 +1,6 @@
(ns grub.state) (ns grub.state)
(def num-history-states 20)
(def empty-state {:tag 0 :grubs {} :recipes {}}) (def empty-state {:tag 0 :grubs {} :recipes {}})
(defn new-states [state]
[(assoc state :tag 0)])
(defn get-latest [states]
(last states))
(defn get-tagged [states tag]
(->> states
(filter #(= (:tag %) tag))
(first)))
(defn add [states new-state]
(let [last-state (last states)]
(if (= last-state new-state)
states
(let [new-states (conj states (assoc new-state :tag (inc (:tag last-state))))]
(if (>= (count states) num-history-states)
(into [] (rest new-states))
new-states)))))
(defn state= [a b] (defn state= [a b]
(= (dissoc a :tag) (dissoc b :tag))) (= (dissoc a :tag) (dissoc b :tag)))

View file

@ -1,21 +1,13 @@
(ns grub.util (ns grub.util)
(:require #?(:clj [clojure.core.async :as a :refer [<! >! chan go]]
:cljs [cljs.core.async :as a :refer [<! >! chan]]))
#?(:cljs (:require-macros [grub.macros :refer [log logs]]
[cljs.core.async.macros :refer [go]])))
(defn map-by-key [key coll] (defn map-by-key [key coll]
(->> coll (->> coll
(map (fn [a] [(keyword (get a key)) a])) (map (fn [a] [(keyword (get a key)) a]))
(into {}))) (into {})))
(defn printer [] (defn rand-str [n]
(let [in (chan)] (let [chars "0123456789abcdefghijklmnopqrstuvwxyz"
(go (loop [] rand-index #(rand-int (count chars))]
(when-let [msg (<! printer)] (->> (repeatedly n rand-index)
#?(:clj (do (clojure.pprint/pprint msg) (map #(.charAt chars %))
(println "-------")) (clojure.string/join))))
:cljs (do (logs msg)
(log "-------")) )
(recur))))))

View file

@ -4,7 +4,7 @@
[grub.websocket :as websocket] [grub.websocket :as websocket]
[grub.view.app :as view] [grub.view.app :as view]
[cljs.core.async :as a :refer [<! >! chan]]) [cljs.core.async :as a :refer [<! >! chan]])
(:require-macros [cljs.core.async.macros :refer [go go-loop]] )) (:require-macros [cljs.core.async.macros :refer [go-loop]] ))
(defn start-app [] (defn start-app []
(let [ui-state (atom state/empty-state) (let [ui-state (atom state/empty-state)
@ -12,15 +12,17 @@
to-server (chan) to-server (chan)
new-ui-states (chan) new-ui-states (chan)
diffs (chan) diffs (chan)
full-syncs (chan)] full-syncs (chan)
(sync/sync-client! to-server new-ui-states diffs full-syncs ui-state) connected (chan)]
(sync/start-sync! to-server new-ui-states diffs full-syncs connected ui-state)
(websocket/connect to-server from-server) (websocket/connect to-server from-server)
(view/render-app ui-state new-ui-states) (view/render-app ui-state new-ui-states)
(go-loop [] (let [event (<! from-server)] (go-loop [](let [event (<! from-server)]
(cond (cond
(nil? event) nil ;; drop out of loop
(= (:type event) :diff) (do (>! diffs event) (recur)) (= (:type event) :diff) (do (>! diffs event) (recur))
(= (:type event) :full-sync) (do (>! full-syncs event) (recur)) (= (:type event) :full-sync) (do (>! full-syncs event) (recur))
(= (:type event) :connected) (do (>! connected event) (recur))
(nil? event) nil ;; drop out of loop
:else (do (println "Unknown event:" event) (recur))))))) :else (do (println "Unknown event:" event) (recur)))))))
(enable-console-print!) (enable-console-print!)

View file

@ -1,34 +0,0 @@
(ns grub.macros)
(defmacro log [& args]
`(.log js/console ~@args))
(defmacro logs [& args]
(let [strings (map (fn [a] `(pr-str ~a)) args)]
`(.log js/console ~@strings)))
;; Maybe monad
(defmacro and-let* [bindings & body]
(when (not= (count bindings) 2)
(throw (IllegalArgumentException.
"and-let* requires an even number of forms in binding vector")))
(let [form (bindings 0)
tst (bindings 1)]
`(let [temp# ~tst]
(when temp#
(let [~form temp#]
~@body)))))
(defmacro and-let [bindings & body]
(when (not (even? (count bindings)))
(throw (IllegalArgumentException.
"and-let requires an even number of forms in binding vector")))
(let [whenlets (reduce (fn [sexpr bind]
(let [form (first bind)
tst (second bind)]
(conj sexpr `(and-let* [~form ~tst]))))
()
(partition 2 bindings))
body (cons 'do body)]
`(->> ~body ~@whenlets)))

View file

@ -36,8 +36,6 @@
:<events <events :<events <events
:add-grubs-ch add-grubs-ch} :add-grubs-ch add-grubs-ch}
:tx-listen (fn [{:keys [new-state tag]} _] :tx-listen (fn [{:keys [new-state tag]} _]
(println "new ui state?" tag)
(when (= tag :local) (when (= tag :local)
(println "new ui state")
(put! new-ui-states new-state)))}) (put! new-ui-states new-state)))})
nil)) nil))

View file

@ -4,8 +4,7 @@
[sablono.core :refer-macros [html]] [sablono.core :refer-macros [html]]
[cljs.core.async :as a :refer [<! put! chan]] [cljs.core.async :as a :refer [<! put! chan]]
[cljs-uuid.core :as uuid]) [cljs-uuid.core :as uuid])
(:require-macros [grub.macros :refer [log logs]] (:require-macros [cljs.core.async.macros :refer [go go-loop]]))
[cljs.core.async.macros :refer [go go-loop]]))
(defn new-grub [text] (defn new-grub [text]
{:id (keyword (str "grub-" (uuid/make-random))) {:id (keyword (str "grub-" (uuid/make-random)))

View file

@ -4,8 +4,7 @@
[om.core :as om :include-macros true] [om.core :as om :include-macros true]
[sablono.core :refer-macros [html]] [sablono.core :refer-macros [html]]
[cljs.core.async :as a :refer [<! chan]]) [cljs.core.async :as a :refer [<! chan]])
(:require-macros [grub.macros :refer [log logs]] (:require-macros [cljs.core.async.macros :refer [go go-loop]]))
[cljs.core.async.macros :refer [go go-loop]]))
(defn get-grub-ingredient [grub] (defn get-grub-ingredient [grub]
(when-not (nil? (:text grub)) (when-not (nil? (:text grub))

View file

@ -6,8 +6,7 @@
[sablono.core :refer-macros [html]] [sablono.core :refer-macros [html]]
[cljs.core.async :as a :refer [<! put! chan]] [cljs.core.async :as a :refer [<! put! chan]]
[cljs-uuid.core :as uuid]) [cljs-uuid.core :as uuid])
(:require-macros [grub.macros :refer [log logs]] (:require-macros [cljs.core.async.macros :refer [go go-loop]]))
[cljs.core.async.macros :refer [go go-loop]]))
(defn new-recipe [name grubs directions] (defn new-recipe [name grubs directions]
{:id (str "recipe-" (uuid/make-random)) {:id (str "recipe-" (uuid/make-random))

View file

@ -4,8 +4,7 @@
[om.core :as om :include-macros true] [om.core :as om :include-macros true]
[sablono.core :refer-macros [html]] [sablono.core :refer-macros [html]]
[cljs.core.async :refer [<! chan]]) [cljs.core.async :refer [<! chan]])
(:require-macros [grub.macros :refer [log logs]] (:require-macros [cljs.core.async.macros :refer [go go-loop]]))
[cljs.core.async.macros :refer [go go-loop]]))
(defn view [recipes owner] (defn view [recipes owner]
(reify (reify

View file

@ -4,8 +4,7 @@
[om.core :as om :include-macros true] [om.core :as om :include-macros true]
[sablono.core :refer-macros [html]] [sablono.core :refer-macros [html]]
[cljs.core.async :as a :refer [<! chan]]) [cljs.core.async :as a :refer [<! chan]])
(:require-macros [grub.macros :refer [log logs]] (:require-macros [cljs.core.async.macros :refer [go go-loop]]))
[cljs.core.async.macros :refer [go go-loop]]))
(defn add-recipe [owner name grubs directions] (defn add-recipe [owner name grubs directions]
(when (and (not (empty? name)) (when (and (not (empty? name))

View file

@ -3,9 +3,9 @@
goog.net.WebSocket goog.net.WebSocket
goog.events.EventHandler goog.events.EventHandler
goog.events.EventTarget goog.events.EventTarget
[cognitect.transit :as t]) [cognitect.transit :as t]
(:require-macros [cljs.core.async.macros :refer [go go-loop]] [grub.event :as event])
[grub.macros :refer [log logs]])) (:require-macros [cljs.core.async.macros :refer [go]]))
(def DEBUG true) (def DEBUG true)
@ -18,16 +18,10 @@
(def reader (t/reader :json)) (def reader (t/reader :json))
(def writer (t/writer :json)) (def writer (t/writer :json))
(defn send-pending-msg [websocket pending-msg] (defn send-message [websocket msg]
(when (and (.isOpen websocket) (when (.isOpen websocket)
(not (nil? @pending-msg))) (when DEBUG (println "UP" msg))
(.send websocket (t/write writer @pending-msg)) (.send websocket (t/write writer msg))))
(when DEBUG (println "UP" @pending-msg))
(reset! pending-msg nil)))
(defn on-connected [websocket pending-msg event]
(log "Connected:" event)
(send-pending-msg websocket pending-msg))
(defn read-msg [msg] (defn read-msg [msg]
(let [received (t/read reader (.-message msg))] (let [received (t/read reader (.-message msg))]
@ -35,18 +29,16 @@
received)) received))
(defn connect [from-client to-client] (defn connect [from-client to-client]
(let [pending-msg (atom nil) (let [ws (goog.net.WebSocket.)
ws (goog.net.WebSocket.)
handler (goog.events.EventHandler.) handler (goog.events.EventHandler.)
listen (fn [type fun] (.listen handler ws type fun false))] listen (fn [type fun] (.listen handler ws type fun false))]
(listen goog.net.WebSocket.EventType.OPENED (partial on-connected ws pending-msg)) (listen goog.net.WebSocket.EventType.OPENED #(do (println "ws connected") (a/put! to-client (event/connected))))
(listen goog.net.WebSocket.EventType.MESSAGE #(a/put! to-client (read-msg %))) (listen goog.net.WebSocket.EventType.MESSAGE #(a/put! to-client (read-msg %)))
(listen goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %)) (listen goog.net.WebSocket.EventType.CLOSED #(println "ws disconnected"))
(listen goog.net.WebSocket.EventType.ERROR #(log "Error:" %)) (listen goog.net.WebSocket.EventType.ERROR #(println "ws error:" %))
(go (loop [] (go (loop []
(when-let [msg (<! from-client)] (when-let [msg (<! from-client)]
(reset! pending-msg msg) (send-message ws msg)
(send-pending-msg ws pending-msg)
(recur)))) (recur))))
(.open ws server-url) (.open ws server-url)
ws)) ws))

View file

@ -0,0 +1,98 @@
(ns grub.test.e2e.sync
(:require [grub.db :as db]
[grub.util :as util]
[clj-webdriver.taxi :as taxi]
[clj-webdriver.core :refer [new-driver]]))
(defn set-chromedriver-path! []
(System/setProperty "webdriver.chrome.driver" "bin/chromedriver"))
(defn start-client []
(let [driver (new-driver {:browser :chrome})]
(taxi/to driver "http://localhost:3000")
driver))
(defn grub-text [driver elem]
(let [input (taxi/find-element-under driver elem {:tag :input})]
(taxi/value driver input)))
(defn grub-completed? [driver elem]
(let [classes-str (taxi/attribute driver elem "class")
classes (clojure.string/split classes-str #" ")]
(not (nil? (first (filter #(= "completed" %) classes))))))
(defn grub-elem-data [driver elem]
{:text (grub-text driver elem)
:completed (grub-completed? driver elem)})
(defn get-grub-elems [driver]
(taxi/elements driver ".grub-item"))
(defn get-grubs [driver]
(let [elems (get-grub-elems driver)]
(set (map (partial grub-elem-data driver) elems))))
(defn click-random-grub [driver]
(let [elems (get-grub-elems driver)
random-elem (nth elems (rand-int (count elems)))]
(try (taxi/click driver random-elem)
(catch Exception e (println "Click failed")))))
(defn add-grub-btn [driver]
(taxi/element driver "#add-grub-btn"))
(defn add-grub-input [driver]
(taxi/element driver "#add-grub-input"))
(defn add-grub [driver text]
(taxi/input-text driver (add-grub-input driver) text)
(taxi/click driver (add-grub-btn driver)))
(defn make-random-change-on-client [driver]
(let [add-grub? (> 0.6 (rand))]
(if add-grub?
(add-grub driver (util/rand-str 10))
(click-random-grub driver))))
(defn assert-all-clients-in-sync [clients db-grubs]
(doseq [client clients]
(let [client-grubs (get-grubs client)]
(if (= client-grubs db-grubs)
(println "Client is in sync")
(println "Error: client is not in sync" "\nexpected:\n" db-grubs "\n actual:\n" client-grubs)))))
(defn stop-client [driver]
(taxi/close driver))
(defn get-db-state [uri]
(db/get-current-state (db/connect uri)))
(defn get-db-grubs [uri]
(->> (get-db-state uri)
(:grubs)
(vals)
(map #(dissoc % :id))
(set)))
(defn make-random-changes-on-clients [clients]
(dotimes [_ 100]
(let [client (nth clients (rand-int (count clients)))]
(make-random-change-on-client client))))
(defn eventual-sync-test [db-uri]
(let [num-clients 4
num-changes 100]
(println "Starting" num-clients "clients")
(let [clients (repeatedly 4 start-client)]
(println "Making" num-changes "random changes")
(make-random-changes-on-clients clients)
(println "Sleeping for a moment")
(Thread/sleep 2000)
(println "Verifying clients are in sync")
(assert-all-clients-in-sync clients (get-db-grubs db-uri))
(println "Closing clients")
(doseq [client clients] (stop-client client)))))
(defn run-e2e-tests [system]
(set-chromedriver-path!)
(eventual-sync-test (:database-uri system)))

View file

@ -1,90 +0,0 @@
(ns grub.test.integration.synchronization
(:require [grub.client-sync :as client-sync]
[grub.server-sync :as server-sync]
[clojure.test :refer :all]
[midje.sweet :refer :all]
[clojure.core.async :as a :refer [<!! >!! chan go]]))
;(defn client-server [client-states server-states]
; (let [server-shadow (last @server-states)
; client-shadow (last @client-states)
; new-client-states (chan)
; >client (chan)
; new-server-states (chan)
; >server (chan)]
; (client-sync/make-client-agent >server >client new-client-states client-states server-shadow)
; (server-sync/sync-server! >client >server new-server-states server-states client-shadow)
; {:new-client-states new-client-states
; :new-server-states new-server-states}))
;
;(defn states-in-sync? [a b]
; (let [last-a (dissoc (last a) :tag)
; last-b (dissoc (last b) :tag)]
; last-a => last-b))
;
;(defn last-state [states]
; (-> states
; (last)
; (dissoc :tag)))
;
;(defn short-delay []
; (<!! (a/timeout 300)))
;
;(fact "Client-only changes sync with server"
; (let [client (atom [{:tag 1
; :grubs {"1" {:text "2 apples" :completed false}}
; :recipes {}}])
; server (atom [{:tag 44 :grubs {"1" {:text "2 apples" :completed false}}
; :recipes {}}])
; {:keys [new-client-states]} (client-server client server)
; client-change {:tag 2
; :grubs {"1" {:text "2 apples" :completed true}}
; :recipes {}}]
; (swap! client conj client-change)
; (>!! new-client-states client-change)
; (short-delay)
; (states-in-sync? @client @server)
; (last-state @client) => {:grubs {"1" {:text "2 apples" :completed true}}
; :recipes {}}))
;
;(fact "Other client changes synced with client"
; (let [client (atom [{:tag 1
; :grubs {"1" {:text "2 apples" :completed false}}
; :recipes {}}])
; server (atom [{:tag 44 :grubs {"1" {:text "2 apples" :completed false}}
; :recipes {}}])
; {:keys [new-server-states]} (client-server client server)
; server-change {:tag 2
; :grubs {"1" {:text "2 apples" :completed true}}
; :recipes {}}]
; (swap! server conj server-change)
; (>!! new-server-states server-change)
; (short-delay)
; (states-in-sync? @client @server)
; (last-state @client) => {:grubs {"1" {:text "2 apples" :completed true}}
; :recipes {}}))
;
;(fact "Client changes and simultaneous server changes synced"
; (let [client (atom [{:tag 1
; :grubs {"1" {:text "2 apples" :completed false}}
; :recipes {}}])
; server (atom [{:tag 44 :grubs {"1" {:text "2 apples" :completed false}}
; :recipes {}}])
; {:keys [new-client-states new-server-states]} (client-server client server)
; client-change {:tag 2
; :grubs {"1" {:text "2 apples" :completed true}}
; :recipes {}}
; server-change {:tag 45
; :grubs {"1" {:text "2 apples" :completed false}
; "2" {:text "milk" :completed false}}
; :recipes {}}]
; (swap! client conj client-change)
; (swap! server conj server-change)
; (>!! new-client-states client-change)
; (short-delay)
; (>!! new-server-states (last @server))
; (short-delay)
; (states-in-sync? @client @server)
; (last-state @client) => {:grubs {"1" {:text "2 apples" :completed true}
; "2" {:text "milk" :completed false}}
; :recipes {}}))

View file

@ -3,77 +3,94 @@
[midje.sweet :refer :all])) [midje.sweet :refer :all]))
(def empty-diff {:grubs {:- #{} :+ nil} (def empty-diff {:tag 0
:shadow-tag 0
:grubs {:- #{} :+ nil}
:recipes {:- #{} :+ nil}}) :recipes {:- #{} :+ nil}})
(fact "Diff of empty states is empty diff" (fact "Diff of empty states is empty diff"
(let [empty-state {:grubs {} :recipes {}}] (let [empty-state {:grubs {} :recipes {} :tag 0}]
(diff/diff-states empty-state empty-state) => empty-diff)) (diff/diff-states empty-state empty-state) => empty-diff))
(fact "Diff of equal states is empty diff" (fact "Diff of equal states is empty diff"
(diff/diff-states {:grubs {"id" {:text "asdf" :completed false}} :recipes {}} (diff/diff-states {:tag 0 :grubs {"id" {:text "asdf" :completed false}} :recipes {}}
{:grubs {"id" {:text "asdf" :completed false}} :recipes {}}) {:tag 0 :grubs {"id" {:text "asdf" :completed false}} :recipes {}})
=> empty-diff) => empty-diff)
(fact "Diff of one added grub has one updated grub" (fact "Diff of one added grub has one updated grub"
(diff/diff-states {:grubs {} :recipes {}} (diff/diff-states {:tag 0 :grubs {} :recipes {}}
{:grubs {"id" {:text "asdf" :completed false}} :recipes {}}) {:tag 1 :grubs {"id" {:text "asdf" :completed false}} :recipes {}})
=> {:grubs {:- #{} => {:shadow-tag 0
:tag 1
:grubs {:- #{}
:+ {"id" {:completed false, :text "asdf"}}} :+ {"id" {:completed false, :text "asdf"}}}
:recipes {:- #{} :+ nil}}) :recipes {:- #{} :+ nil}})
(fact "Diff of one removed grub has one deleted grub" (fact "Diff of one removed grub has one deleted grub"
(diff/diff-states {:grubs {"id" {:text "asdf" :completed false}} :recipes {}} (diff/diff-states {:tag 0 :grubs {"id" {:text "asdf" :completed false}} :recipes {}}
{:grubs {} :recipes {}}) {:tag 1 :grubs {} :recipes {}})
=> =>
{:grubs {:- #{"id"} {:shadow-tag 0
:tag 1
:grubs {:- #{"id"}
:+ nil} :+ nil}
:recipes {:- #{} :+ nil}}) :recipes {:- #{} :+ nil}})
(fact "Diff of one changed grub has updated grub" (fact "Diff of one changed grub has updated grub"
(diff/diff-states {:grubs {"id" {:text "asdf" :completed false}} :recipes {}} (diff/diff-states {:tag 0 :grubs {"id" {:text "asdf" :completed false}} :recipes {}}
{:grubs {"id" {:text "asdf2" :completed false}} :recipes {}}) {:tag 1 :grubs {"id" {:text "asdf2" :completed false}} :recipes {}})
=> =>
{:grubs {:- #{} {:shadow-tag 0
:tag 1
:grubs {:- #{}
:+ {"id" {:text "asdf2"}}} :+ {"id" {:text "asdf2"}}}
:recipes {:- #{} :+ nil}}) :recipes {:- #{} :+ nil}})
(fact "Diff of one completed grub has updated grub" (fact "Diff of one completed grub has updated grub"
(diff/diff-states {:grubs {"id" {:text "asdf" :completed false}} :recipes {}} (diff/diff-states {:tag 0 :grubs {"id" {:text "asdf" :completed false}} :recipes {}}
{:grubs {"id" {:text "asdf" :completed true}} :recipes {}}) {:tag 1 :grubs {"id" {:text "asdf" :completed true}} :recipes {}})
=> {:grubs {:- #{} => {:shadow-tag 0
:tag 1
:grubs {:- #{}
:+ {"id" {:completed true}}} :+ {"id" {:completed true}}}
:recipes {:- #{} :+ nil}}) :recipes {:- #{} :+ nil}})
(fact "Diff of one added recipe has updated recipe" (fact "Diff of one added recipe has updated recipe"
(diff/diff-states {:grubs {} :recipes {}} (diff/diff-states {:tag 0 :grubs {} :recipes {}}
{:grubs {} :recipes {"id" {:name "Blue Cheese Soup" {:tag 1 :grubs {} :recipes {"id" {:name "Blue Cheese Soup"
:grubs "Some grubs"}}}) :grubs "Some grubs"}}})
=> =>
{:grubs {:- #{} {:shadow-tag 0
:tag 1
:grubs {:- #{}
:+ nil} :+ nil}
:recipes {:- #{} :+ {"id" {:name "Blue Cheese Soup" :recipes {:- #{} :+ {"id" {:name "Blue Cheese Soup"
:grubs "Some grubs"}}}}) :grubs "Some grubs"}}}})
(fact "Diff of one changed recipe has one updated recipe" (fact "Diff of one changed recipe has one updated recipe"
(diff/diff-states {:grubs {} :recipes {"id" {:name "Blue Cheese Soup" (diff/diff-states {:tag 0 :grubs {} :recipes {"id" {:name "Blue Cheese Soup"
:grubs "Some grubs"}}} :grubs "Some grubs"}}}
{:grubs {} :recipes {"id" {:name "Bleu Cheese Soup" {:tag 1 :grubs {} :recipes {"id" {:name "Bleu Cheese Soup"
:grubs "Some grubs"}}}) :grubs "Some grubs"}}})
=> {:grubs {:- #{} => {:shadow-tag 0
:tag 1
:grubs {:- #{}
:+ nil} :+ nil}
:recipes {:- #{} :+ {"id" {:name "Bleu Cheese Soup" }}}}) :recipes {:- #{} :+ {"id" {:name "Bleu Cheese Soup" }}}})
(fact "Diff of one removed recipe has one deleted recipe" (fact "Diff of one removed recipe has one deleted recipe"
(diff/diff-states {:grubs {} :recipes {"id" {:name "Blue Cheese Soup" (diff/diff-states {:tag 0 :grubs {} :recipes {"id" {:name "Blue Cheese Soup"
:grubs "Some grubs"}}} :grubs "Some grubs"}}}
{:grubs {} :recipes {}}) {:tag 1 :grubs {} :recipes {}})
=> =>
{:grubs {:- #{} :+ nil} {:shadow-tag 0
:tag 1
:grubs {:- #{} :+ nil}
:recipes {:- #{"id"} :+ nil}}) :recipes {:- #{"id"} :+ nil}})
(def before-state (def before-state
{:grubs {:tag 0
:grubs
{"grub-same" {:completed false {"grub-same" {:completed false
:text "3 garlic cloves"} :text "3 garlic cloves"}
"grub-completed" {:completed false "grub-completed" {:completed false
@ -91,7 +108,8 @@
:name "Chickenburgers"}}}) :name "Chickenburgers"}}})
(def after-state (def after-state
{:grubs {:tag 1
:grubs
{"grub-same" {:completed false, {"grub-same" {:completed false,
:text "3 garlic cloves"} :text "3 garlic cloves"}
"grub-completed" {:completed true, "grub-completed" {:completed true,
@ -109,7 +127,9 @@
:name "Burgers"}}}) :name "Burgers"}}})
(def expected-diff (def expected-diff
{:recipes {:shadow-tag 0
:tag 1
:recipes
{:- #{"recipe-deleted"} {:- #{"recipe-deleted"}
:+ :+
{"recipe-added" {"recipe-added"