diff --git a/project.clj b/project.clj index e835290..4bf079f 100644 --- a/project.clj +++ b/project.clj @@ -26,7 +26,7 @@ :dependencies [[org.clojure/tools.namespace "0.2.10"] [org.clojure/java.classpath "0.2.2"]]}} :min-lein-version "2.1.2" - :plugins [[lein-cljsbuild "1.0.3"] + :plugins [[lein-cljsbuild "1.1.1"] [lein-ring "0.8.6"]] :cljsbuild {:builds {:dev {:source-paths ["src/cljs" "src/cljc"] :compiler {:output-dir "resources/public/js/out" diff --git a/src/clj/grub/server_sync.clj b/src/clj/grub/server_sync.clj index d8d8cb5..036ddf2 100644 --- a/src/clj/grub/server_sync.clj +++ b/src/clj/grub/server_sync.clj @@ -60,27 +60,26 @@ (println "Unhandled message:" msg) {}) -(defn make-server-agent - ([up down saved db-conn] - (go (loop [shadow (db/get-current-state db-conn)] - (let [[event c] (a/alts! [up saved] :priority true)] - (println "Handling event:") - (pprint event) - (when-not (nil? event) - (case (:type event) +(defn make-server-agent [up down saved db-conn] + (go (loop [shadow (db/get-current-state db-conn)] + (let [[event c] (a/alts! [up saved] :priority true)] + (when-not (nil? event) + (case (:type event) - :diff - (let [history-state (db/get-history-state db-conn (:shadow-tag event)) - new-state (db/patch-state! db-conn (:diff event)) - new-shadow (diff/patch-state history-state (:diff event)) - return-diff (diff/diff-states new-shadow new-state)] - (>! down return-diff) - (recur new-shadow)) + :diff + (let [history-state (db/get-history-state db-conn (:shadow-tag event)) + new-state (db/patch-state! db-conn (:diff event)) + new-shadow (diff/patch-state history-state (:diff event)) + return-diff (diff/diff-states new-shadow new-state)] + (println "**************************history-state:" history-state) + (println "**************************new-state:" new-state) + (println "**************************new-shadow:" new-shadow) + (println "return diff:" return-diff) + (>! down return-diff) + (recur new-shadow)) - :full-sync-request - (do (println "full sync!") - (>! down (full-sync (db/get-current-state db-conn))) - (recur shadow)) - (do (println "Unhandled event") - (println event) - (recur shadow))))))))) + :full-sync-request + (do (>! down (full-sync (db/get-current-state db-conn))) + (recur shadow)) + (do (println "Unhandled event:" event) + (recur shadow)))))))) diff --git a/src/clj/grub/websocket.clj b/src/clj/grub/websocket.clj index 1281e4d..e3439da 100644 --- a/src/clj/grub/websocket.clj +++ b/src/clj/grub/websocket.clj @@ -4,6 +4,8 @@ [cognitect.transit :as t]) (:import [java.io ByteArrayInputStream ByteArrayOutputStream])) +(def DEBUG true) + (defn write-msg [msg] (let [out (ByteArrayOutputStream. 4096) writer (t/writer out :json)] @@ -18,13 +20,16 @@ (defn add-connected-client! [ws-channel to from on-close] (println "Client connected:" (.toString ws-channel)) - (a/go-loop [] (if-let [event (! chan]] - :clj [clojure.core.async :as a :refer [! chan go]])) - #?(:cljs (:require-macros [grub.macros :refer [log logs]] - [cljs.core.async.macros :refer [go]]))) + #?(:cljs [cljs.core.async :as a :refer [! chan]] + :clj [clojure.core.async :as a :refer [! chan go]])) + #?(:cljs (:require-macros [cljs.core.async.macros :refer [go]]))) + +(def DEBUG true) (def full-sync-request {:type :full-sync-request}) -(def empty-state state/empty-state) - -(defn update-states [states diff] - (let [state (state/get-latest states) - new-state (diff/patch-state state diff)] - (state/add states new-state))) - (defn diff-msg [shadow state] (let [diff (diff/diff-states shadow state)] {:type :diff @@ -22,68 +16,39 @@ :tag (:tag state) :shadow-tag (:tag shadow)})) -(defmulti handle-event (fn [event] (:type event))) +(defn update-states [states diff] + (let [state (state/get-latest states) + new-state (diff/patch-state state diff)] + (state/add states new-state))) -(defn apply-diff [states diff shadow new-shadow-tag] - (swap! states update-states diff) - (let [new-shadow (assoc (diff/patch-state shadow diff) :tag new-shadow-tag)] - {:new-shadow new-shadow :out-event nil})) - -(defmethod handle-event :diff [{:keys [diff states shadow shadow-tag tag]}] - (let [history-shadow (state/get-tagged @states shadow-tag)] - (if history-shadow - (apply-diff states diff history-shadow tag) - {:out-event full-sync-request - :new-shadow shadow} ))) - -(defmethod handle-event :full-sync [{:keys [full-state states]}] - (reset! states (state/new-states full-state)) - {:new-shadow full-state}) - -(defmethod handle-event :new-state [{:keys [shadow states new-state]}] - (let [new-states (swap! states state/add new-state) - latest-state (state/get-latest new-states)] - {:out-event (when-not (state/state= shadow latest-state) - (diff-msg shadow latest-state)) - :new-shadow nil})) - -(defmethod handle-event :default [msg] - #?(:cljs (logs "Unhandled message:" msg)) - {}) - -(defn make-client-agent - ([>remote events new-states states] - (make-client-agent >remote events new-states states state/empty-state)) - ([>remote events new-states states initial-shadow] - (go (loop [shadow initial-shadow - out-event nil] - (when out-event (>! >remote out-event)) - (let [timeout (a/timeout 1000) - [v c] (if out-event - (a/alts! [events timeout]) - (a/alts! [new-states events] :priority true))] - (cond (= c timeout) (recur shadow out-event) - (nil? v) nil ;; drop out of loop - (= c new-states) - (let [event {:type :new-state - :new-state v - :shadow shadow - :states states} - {:keys [out-event]} (handle-event event)] - (recur shadow out-event)) - (= c events) - (let [event (assoc v - :states states - :shadow shadow) - {:keys [new-shadow out-event]} (handle-event event)] - (recur (if new-shadow new-shadow shadow) out-event)))))))) - -#?(:cljs - (defn sync-client! [>remote events new-states states] - (let [new-states* (chan (a/sliding-buffer 1))] - (go (loop [] - (let [v (! new-states* v) - (recur)))) - (make-client-agent >remote events new-states* states) - (a/put! >remote full-sync-request)))) +(defn sync-client! [to-server new-ui-states diffs full-syncs ui-state] + (let [ui-state-buffer (chan (a/sliding-buffer 1))] + (a/pipe new-ui-states ui-state-buffer) + (reset! ui-state state/empty-state) + (go (loop [states (state/new-states @ui-state) + shadow (state/get-latest states) + awaiting-ack? false] + (let [channels (if awaiting-ack? [diffs full-syncs] [diffs full-syncs ui-state-buffer])] + (let [[val ch] (a/alts! channels)] + (when DEBUG (println val)) + (condp = ch + ui-state-buffer (let [new-state val + new-states (state/add states new-state) + latest-state (state/get-latest new-states)] + (>! to-server (diff-msg shadow latest-state)) + (recur new-states shadow true)) + full-syncs (let [full-state (:full-state val) + new-states (state/new-states full-state) + latest-state (state/get-latest new-states)] + (reset! ui-state full-state) + (recur new-states latest-state false)) + diffs (let [{:keys [diff shadow-tag tag]} val + history-shadow (state/get-tagged states shadow-tag)] + (if history-shadow + (let [new-states (update-states states diff) + new-shadow (assoc (diff/patch-state shadow diff) :tag tag)] + (recur new-states new-shadow false)) + (do (>! to-server full-sync-request) + (recur states shadow true)))) + (println "An error occurred, received value on unknown channel")))))) + (a/put! to-server full-sync-request))) diff --git a/src/cljs/grub/core.cljs b/src/cljs/grub/core.cljs index 260abb5..92b8681 100644 --- a/src/cljs/grub/core.cljs +++ b/src/cljs/grub/core.cljs @@ -4,40 +4,24 @@ [grub.websocket :as websocket] [grub.view.app :as view] [cljs.core.async :as a :refer [! chan]]) - (:require-macros [grub.macros :refer [log logs]])) + (:require-macros [cljs.core.async.macros :refer [go go-loop]] )) -(def system - {:pending-msg (atom nil) - :ws (atom nil) - :channels {:local-states (chan) - :remote-states (chan) - :to-remote (chan) - :from-remote (chan)} - :states (atom nil) - :view-state nil}) +(defn start-app [] + (let [ui-state (atom state/empty-state) + from-server (chan) + to-server (chan) + new-ui-states (chan) + diffs (chan) + full-syncs (chan)] + (sync/sync-client! to-server new-ui-states diffs full-syncs ui-state) + (websocket/connect to-server from-server) + (view/render-app ui-state new-ui-states) + (go-loop [] (let [event (! diffs event) (recur)) + (= (:type event) :full-sync) (do (>! full-syncs event) (recur)) + :else (do (println "Unknown event:" event) (recur))))))) -(defn start [{:keys [states pending-msg] :as system}] - (reset! states sync/empty-state) - (let [new-states (chan) - render-states (chan) - >remote (chan) - events (chan) - view-state (view/render-app state/empty-state render-states new-states) - ws (websocket/connect pending-msg >remote events)] - (sync/sync-client! >remote events new-states states) - (add-watch states :render (fn [_ _ old new] - (when-not (= old new) - (a/put! render-states (state/get-latest new))))) - (assoc system - :ws ws - :channels {:new-states new-states - :>remote >remote - :events events} - :states states - :view-state view-state))) - -(defn stop [{:keys [channels ws]} system] - (doseq [c (vals channels)] (a/close! c)) - (websocket/disconnect ws)) - -(start system) +(enable-console-print!) +(start-app) diff --git a/src/cljs/grub/view/app.cljs b/src/cljs/grub/view/app.cljs index 0a1b643..7855a6a 100644 --- a/src/cljs/grub/view/app.cljs +++ b/src/cljs/grub/view/app.cljs @@ -25,21 +25,19 @@ (dom/on-document-mousedown #(put! >events {:type :body-mousedown :event %})) (dom/on-window-scroll #(put! >events {:type :body-scroll :event %})))))) -(defn render-app [initial-state new-states] - (let [state (atom initial-state) - >events (chan) +(defn render-app [ui-state new-ui-states] + (let [>events (chan) events :type) add-grubs-ch (chan)] (om/root app-view - state + ui-state {:target (.getElementById js/document "container") :shared {:>events >events :new-states new-state)))}) - (go (loop [] - (let [new-state (