Rework sync - client reworked, wip

This commit is contained in:
Nicholas Kariniemi 2015-11-19 23:33:30 -05:00
parent 838e6497d4
commit a8d8ae56c5
7 changed files with 108 additions and 152 deletions

View file

@ -26,7 +26,7 @@
:dependencies [[org.clojure/tools.namespace "0.2.10"] :dependencies [[org.clojure/tools.namespace "0.2.10"]
[org.clojure/java.classpath "0.2.2"]]}} [org.clojure/java.classpath "0.2.2"]]}}
:min-lein-version "2.1.2" :min-lein-version "2.1.2"
:plugins [[lein-cljsbuild "1.0.3"] :plugins [[lein-cljsbuild "1.1.1"]
[lein-ring "0.8.6"]] [lein-ring "0.8.6"]]
:cljsbuild {:builds {:dev {:source-paths ["src/cljs" "src/cljc"] :cljsbuild {:builds {:dev {:source-paths ["src/cljs" "src/cljc"]
:compiler {:output-dir "resources/public/js/out" :compiler {:output-dir "resources/public/js/out"

View file

@ -60,27 +60,26 @@
(println "Unhandled message:" msg) (println "Unhandled message:" msg)
{}) {})
(defn make-server-agent (defn make-server-agent [up down saved db-conn]
([up down saved db-conn] (go (loop [shadow (db/get-current-state db-conn)]
(go (loop [shadow (db/get-current-state db-conn)] (let [[event c] (a/alts! [up saved] :priority true)]
(let [[event c] (a/alts! [up saved] :priority true)] (when-not (nil? event)
(println "Handling event:") (case (:type event)
(pprint event)
(when-not (nil? event)
(case (:type event)
:diff :diff
(let [history-state (db/get-history-state db-conn (:shadow-tag event)) (let [history-state (db/get-history-state db-conn (:shadow-tag event))
new-state (db/patch-state! db-conn (:diff event)) new-state (db/patch-state! db-conn (:diff event))
new-shadow (diff/patch-state history-state (:diff event)) new-shadow (diff/patch-state history-state (:diff event))
return-diff (diff/diff-states new-shadow new-state)] return-diff (diff/diff-states new-shadow new-state)]
(>! down return-diff) (println "**************************history-state:" history-state)
(recur new-shadow)) (println "**************************new-state:" new-state)
(println "**************************new-shadow:" new-shadow)
(println "return diff:" return-diff)
(>! down return-diff)
(recur new-shadow))
:full-sync-request :full-sync-request
(do (println "full sync!") (do (>! down (full-sync (db/get-current-state db-conn)))
(>! down (full-sync (db/get-current-state db-conn))) (recur shadow))
(recur shadow)) (do (println "Unhandled event:" event)
(do (println "Unhandled event") (recur shadow))))))))
(println event)
(recur shadow)))))))))

View file

@ -4,6 +4,8 @@
[cognitect.transit :as t]) [cognitect.transit :as t])
(:import [java.io ByteArrayInputStream ByteArrayOutputStream])) (:import [java.io ByteArrayInputStream ByteArrayOutputStream]))
(def DEBUG true)
(defn write-msg [msg] (defn write-msg [msg]
(let [out (ByteArrayOutputStream. 4096) (let [out (ByteArrayOutputStream. 4096)
writer (t/writer out :json)] writer (t/writer out :json)]
@ -19,10 +21,13 @@
(defn add-connected-client! [ws-channel to from on-close] (defn add-connected-client! [ws-channel to from on-close]
(println "Client connected:" (.toString ws-channel)) (println "Client connected:" (.toString ws-channel))
(a/go-loop [] (if-let [event (<! to)] (a/go-loop [] (if-let [event (<! to)]
(do (httpkit/send! ws-channel (write-msg event)) (do (when DEBUG (println "DOWN" event "\n"))
(httpkit/send! ws-channel (write-msg event))
(recur)) (recur))
(httpkit/close ws-channel))) (httpkit/close ws-channel)))
(httpkit/on-receive ws-channel #(a/put! from (read-msg %))) (httpkit/on-receive ws-channel #(let [msg (read-msg %)]
(when DEBUG (println "UP" msg "\n"))
(a/put! from msg)))
(httpkit/on-close ws-channel (fn [status] (httpkit/on-close ws-channel (fn [status]
(println "Client disconnected:" (println "Client disconnected:"
(.toString ws-channel) (.toString ws-channel)

View file

@ -1,20 +1,14 @@
(ns grub.client-sync (ns grub.client-sync
(:require [grub.diff :as diff] (:require [grub.diff :as diff]
[grub.state :as state] [grub.state :as state]
#?(:cljs [cljs.core.async :as a :refer [<! >! chan]] #?(:cljs [cljs.core.async :as a :refer [<! >! chan]]
:clj [clojure.core.async :as a :refer [<! >! chan go]])) :clj [clojure.core.async :as a :refer [<! >! chan go]]))
#?(:cljs (:require-macros [grub.macros :refer [log logs]] #?(:cljs (:require-macros [cljs.core.async.macros :refer [go]])))
[cljs.core.async.macros :refer [go]])))
(def DEBUG true)
(def full-sync-request {:type :full-sync-request}) (def full-sync-request {:type :full-sync-request})
(def empty-state state/empty-state)
(defn update-states [states diff]
(let [state (state/get-latest states)
new-state (diff/patch-state state diff)]
(state/add states new-state)))
(defn diff-msg [shadow state] (defn diff-msg [shadow state]
(let [diff (diff/diff-states shadow state)] (let [diff (diff/diff-states shadow state)]
{:type :diff {:type :diff
@ -22,68 +16,39 @@
:tag (:tag state) :tag (:tag state)
:shadow-tag (:tag shadow)})) :shadow-tag (:tag shadow)}))
(defmulti handle-event (fn [event] (:type event))) (defn update-states [states diff]
(let [state (state/get-latest states)
new-state (diff/patch-state state diff)]
(state/add states new-state)))
(defn apply-diff [states diff shadow new-shadow-tag] (defn sync-client! [to-server new-ui-states diffs full-syncs ui-state]
(swap! states update-states diff) (let [ui-state-buffer (chan (a/sliding-buffer 1))]
(let [new-shadow (assoc (diff/patch-state shadow diff) :tag new-shadow-tag)] (a/pipe new-ui-states ui-state-buffer)
{:new-shadow new-shadow :out-event nil})) (reset! ui-state state/empty-state)
(go (loop [states (state/new-states @ui-state)
(defmethod handle-event :diff [{:keys [diff states shadow shadow-tag tag]}] shadow (state/get-latest states)
(let [history-shadow (state/get-tagged @states shadow-tag)] awaiting-ack? false]
(if history-shadow (let [channels (if awaiting-ack? [diffs full-syncs] [diffs full-syncs ui-state-buffer])]
(apply-diff states diff history-shadow tag) (let [[val ch] (a/alts! channels)]
{:out-event full-sync-request (when DEBUG (println val))
:new-shadow shadow} ))) (condp = ch
ui-state-buffer (let [new-state val
(defmethod handle-event :full-sync [{:keys [full-state states]}] new-states (state/add states new-state)
(reset! states (state/new-states full-state)) latest-state (state/get-latest new-states)]
{:new-shadow full-state}) (>! to-server (diff-msg shadow latest-state))
(recur new-states shadow true))
(defmethod handle-event :new-state [{:keys [shadow states new-state]}] full-syncs (let [full-state (:full-state val)
(let [new-states (swap! states state/add new-state) new-states (state/new-states full-state)
latest-state (state/get-latest new-states)] latest-state (state/get-latest new-states)]
{:out-event (when-not (state/state= shadow latest-state) (reset! ui-state full-state)
(diff-msg shadow latest-state)) (recur new-states latest-state false))
:new-shadow nil})) diffs (let [{:keys [diff shadow-tag tag]} val
history-shadow (state/get-tagged states shadow-tag)]
(defmethod handle-event :default [msg] (if history-shadow
#?(:cljs (logs "Unhandled message:" msg)) (let [new-states (update-states states diff)
{}) new-shadow (assoc (diff/patch-state shadow diff) :tag tag)]
(recur new-states new-shadow false))
(defn make-client-agent (do (>! to-server full-sync-request)
([>remote events new-states states] (recur states shadow true))))
(make-client-agent >remote events new-states states state/empty-state)) (println "An error occurred, received value on unknown channel"))))))
([>remote events new-states states initial-shadow] (a/put! to-server full-sync-request)))
(go (loop [shadow initial-shadow
out-event nil]
(when out-event (>! >remote out-event))
(let [timeout (a/timeout 1000)
[v c] (if out-event
(a/alts! [events timeout])
(a/alts! [new-states events] :priority true))]
(cond (= c timeout) (recur shadow out-event)
(nil? v) nil ;; drop out of loop
(= c new-states)
(let [event {:type :new-state
:new-state v
:shadow shadow
:states states}
{:keys [out-event]} (handle-event event)]
(recur shadow out-event))
(= c events)
(let [event (assoc v
:states states
:shadow shadow)
{:keys [new-shadow out-event]} (handle-event event)]
(recur (if new-shadow new-shadow shadow) out-event))))))))
#?(:cljs
(defn sync-client! [>remote events new-states states]
(let [new-states* (chan (a/sliding-buffer 1))]
(go (loop []
(let [v (<! new-states)]
(>! new-states* v)
(recur))))
(make-client-agent >remote events new-states* states)
(a/put! >remote full-sync-request))))

View file

@ -4,40 +4,24 @@
[grub.websocket :as websocket] [grub.websocket :as websocket]
[grub.view.app :as view] [grub.view.app :as view]
[cljs.core.async :as a :refer [<! >! chan]]) [cljs.core.async :as a :refer [<! >! chan]])
(:require-macros [grub.macros :refer [log logs]])) (:require-macros [cljs.core.async.macros :refer [go go-loop]] ))
(def system (defn start-app []
{:pending-msg (atom nil) (let [ui-state (atom state/empty-state)
:ws (atom nil) from-server (chan)
:channels {:local-states (chan) to-server (chan)
:remote-states (chan) new-ui-states (chan)
:to-remote (chan) diffs (chan)
:from-remote (chan)} full-syncs (chan)]
:states (atom nil) (sync/sync-client! to-server new-ui-states diffs full-syncs ui-state)
:view-state nil}) (websocket/connect to-server from-server)
(view/render-app ui-state new-ui-states)
(go-loop [] (let [event (<! from-server)]
(cond
(nil? event) nil ;; drop out of loop
(= (:type event) :diff) (do (>! diffs event) (recur))
(= (:type event) :full-sync) (do (>! full-syncs event) (recur))
:else (do (println "Unknown event:" event) (recur)))))))
(defn start [{:keys [states pending-msg] :as system}] (enable-console-print!)
(reset! states sync/empty-state) (start-app)
(let [new-states (chan)
render-states (chan)
>remote (chan)
events (chan)
view-state (view/render-app state/empty-state render-states new-states)
ws (websocket/connect pending-msg >remote events)]
(sync/sync-client! >remote events new-states states)
(add-watch states :render (fn [_ _ old new]
(when-not (= old new)
(a/put! render-states (state/get-latest new)))))
(assoc system
:ws ws
:channels {:new-states new-states
:>remote >remote
:events events}
:states states
:view-state view-state)))
(defn stop [{:keys [channels ws]} system]
(doseq [c (vals channels)] (a/close! c))
(websocket/disconnect ws))
(start system)

View file

@ -25,21 +25,19 @@
(dom/on-document-mousedown #(put! >events {:type :body-mousedown :event %})) (dom/on-document-mousedown #(put! >events {:type :body-mousedown :event %}))
(dom/on-window-scroll #(put! >events {:type :body-scroll :event %})))))) (dom/on-window-scroll #(put! >events {:type :body-scroll :event %}))))))
(defn render-app [initial-state <new-states >new-states] (defn render-app [ui-state new-ui-states]
(let [state (atom initial-state) (let [>events (chan)
>events (chan)
<events (a/pub >events :type) <events (a/pub >events :type)
add-grubs-ch (chan)] add-grubs-ch (chan)]
(om/root app-view (om/root app-view
state ui-state
{:target (.getElementById js/document "container") {:target (.getElementById js/document "container")
:shared {:>events >events :shared {:>events >events
:<events <events :<events <events
:add-grubs-ch add-grubs-ch} :add-grubs-ch add-grubs-ch}
:tx-listen (fn [{:keys [new-state tag]} _] :tx-listen (fn [{:keys [new-state tag]} _]
(when (= tag :local) (put! >new-states new-state)))}) (println "new ui state?" tag)
(go (loop [] (when (= tag :local)
(let [new-state (<! <new-states)] (println "new ui state")
(reset! state new-state) (put! new-ui-states new-state)))})
(recur)))) nil))
state))

View file

@ -7,6 +7,8 @@
(:require-macros [cljs.core.async.macros :refer [go go-loop]] (:require-macros [cljs.core.async.macros :refer [go go-loop]]
[grub.macros :refer [log logs]])) [grub.macros :refer [log logs]]))
(def DEBUG true)
(def location (.-location js/document)) (def location (.-location js/document))
(def protocol (.-protocol location)) (def protocol (.-protocol location))
(def ws-protocol (if (= protocol "http:") "ws://" "wss://")) (def ws-protocol (if (= protocol "http:") "ws://" "wss://"))
@ -20,6 +22,7 @@
(when (and (.isOpen websocket) (when (and (.isOpen websocket)
(not (nil? @pending-msg))) (not (nil? @pending-msg)))
(.send websocket (t/write writer @pending-msg)) (.send websocket (t/write writer @pending-msg))
(when DEBUG (println "UP" @pending-msg))
(reset! pending-msg nil))) (reset! pending-msg nil)))
(defn on-connected [websocket pending-msg event] (defn on-connected [websocket pending-msg event]
@ -28,18 +31,20 @@
(defn read-msg [msg] (defn read-msg [msg]
(let [received (t/read reader (.-message msg))] (let [received (t/read reader (.-message msg))]
(when DEBUG (println "DOWN" received))
received)) received))
(defn connect [pending-msg in out] (defn connect [from-client to-client]
(let [ws (goog.net.WebSocket.) (let [pending-msg (atom nil)
ws (goog.net.WebSocket.)
handler (goog.events.EventHandler.) handler (goog.events.EventHandler.)
listen (fn [type fun] (.listen handler ws type fun false))] listen (fn [type fun] (.listen handler ws type fun false))]
(listen goog.net.WebSocket.EventType.OPENED (partial on-connected ws pending-msg)) (listen goog.net.WebSocket.EventType.OPENED (partial on-connected ws pending-msg))
(listen goog.net.WebSocket.EventType.MESSAGE #(a/put! out (read-msg %))) (listen goog.net.WebSocket.EventType.MESSAGE #(a/put! to-client (read-msg %)))
(listen goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %)) (listen goog.net.WebSocket.EventType.CLOSED #(log "Closed:" %))
(listen goog.net.WebSocket.EventType.ERROR #(log "Error:" %)) (listen goog.net.WebSocket.EventType.ERROR #(log "Error:" %))
(go (loop [] (go (loop []
(when-let [msg (<! in)] (when-let [msg (<! from-client)]
(reset! pending-msg msg) (reset! pending-msg msg)
(send-pending-msg ws pending-msg) (send-pending-msg ws pending-msg)
(recur)))) (recur))))