Pull state history into state handler loop

- This is incidental complexity and other code doesn't need to know
  about it
This commit is contained in:
Nicholas Kariniemi 2014-10-14 22:38:26 +03:00
parent 7ada33bdfc
commit f496fb51e5
6 changed files with 61 additions and 66 deletions

View file

@ -7,6 +7,8 @@
[clojure.test :as test]
[clojure.tools.namespace.repl :refer (refresh refresh-all)]))
(clojure.tools.namespace.repl/set-refresh-dirs "src/clj" "target/classes")
(def system nil)
(defn start

View file

@ -46,7 +46,7 @@
{:source-paths ["src/cljx"]
:output-path "target/generated/cljs"
:rules :cljs}]}
:source-paths ["src/clj" "src/test" "target/classes"]
:source-paths ["src/clj" "target/classes"]
:test-paths ["src/test"]
:ring {:handler grub.core/app}
:uberjar-name "grub-standalone.jar"

View file

@ -1,7 +1,6 @@
(ns grub.core
(:require [grub.websocket :as ws]
[grub.db :as db]
[grub.test.integration.core :as integration-test]
[grub.state :as state]
[ring.middleware.file :as file]
[ring.middleware.content-type :as content-type]
@ -63,7 +62,7 @@
(let [to-client (chan)
from-client (chan)]
(ws/add-connected-client! ws-channel to-client from-client)
(state/sync-new-client! from-client to-client state)))
(state/sync-new-client! to-client from-client state)))
(handler request))))
(defn handle-root [handler index]
@ -79,7 +78,7 @@
(handle-root index)
(handle-websocket state)))
(defn start [current {:keys [port db-name state] :as system}]
(defn start [{:keys [port db-name state] :as system}]
(let [{:keys [db conn]} (db/connect db-name)
_ (reset! state (db/get-current-state db))
stop-server (httpkit/run-server (make-handler system) {:port port})]

View file

@ -15,20 +15,18 @@
:view-state nil})
(defn start [system]
(let [local-states (chan)
to-remote (chan)
from-remote (chan)
view-state (view/render-app state/empty-state local-states)
ws (websocket/connect (:pending-msg system) to-remote from-remote)
agent-states (state/sync-client! from-remote to-remote local-states view-state)]
(let [new-states (chan)
>remote (chan)
events (chan)
state (view/render-app state/empty-state new-states)
ws (websocket/connect (:pending-msg system) >remote events)
agent-states (state/sync-client! >remote events new-states state)]
(assoc system
:ws ws
:channels {:local-states local-states
:remote-states remote-states
:to-remote to-remote
:from-remote from-remote}
:view-state view-state
:agent-states agent-states)))
:channels {:new-states new-states
:>remote >remote
:events events}
:state state)))
(defn stop [{:keys [channels ws]} system]
(doseq [c (vals channels)] (a/close! c))

View file

@ -11,7 +11,6 @@
(def empty-state sync/empty-state)
(defmulti handle-event (fn [event]
#+cljs (logs (:type event))
(:type event)))
(defmethod handle-event :diff [{:keys [hash diff states shadow client?] :as msg}]
@ -25,9 +24,7 @@
new-hash (hasch/uuid new-shadow)]
{:out-event (when-not (sync/empty-diff? diff)
(message/diff-msg new-diff new-hash))
:new-states (if client?
(sync/new-state new-state)
new-states)
:new-states new-states
:new-shadow new-shadow})
(if client?
{:out-event message/full-sync-request
@ -45,54 +42,58 @@
{:new-states (sync/new-state state)
:new-shadow state})
(defmethod handle-event :new-state [{:keys [client? state states shadow] :as event}]
(defmethod handle-event :default [msg]
#+cljs (logs "Unhandled message:" msg)
#+clj (println "Unhandled message:" msg)
{})
(defn diff-msg [shadow state]
(let [diff (diff/diff-states shadow state)
hash (hasch/uuid shadow)]
{:new-states (sync/add-history-state states state)
:out-event (when-not (sync/empty-diff? diff) (message/diff-msg diff hash))}))
(message/diff-msg diff hash)))
(defn make-agent [client? <remote >remote states* initial-shadow]
(go (loop [shadow initial-shadow]
(when-let [msg (<! <remote)]
(let [states @states*
event (assoc msg :states states :client? client? :shadow shadow)
{:keys [new-states new-shadow out-event]} (handle-event event)]
(when (and new-states (not= states new-states)) (reset! states* new-states))
(when out-event (a/put! >remote out-event))
(recur (if new-shadow new-shadow shadow)))))))
(defn make-agent
([client? >remote events new-states state]
(make-agent client? >remote events new-states state sync/empty-state))
([client? >remote events new-states state initial-shadow]
(go (loop [shadow initial-shadow
states (sync/new-state @state)]
(let [[v c] (a/alts! [new-states events] :priority true)]
(when v
(cond (= c new-states)
(let [current (sync/get-current-state states)]
(when-not (= shadow v)
(>! >remote (diff-msg shadow v)))
(recur shadow
(if (= v current)
states
(sync/add-history-state states v))))
(= c events)
(let [event (assoc v
:states states
:client? client?
:shadow shadow)
{:keys [new-states new-shadow out-event]} (handle-event event)]
(when (and new-states (not= states new-states))
(let [new-state (sync/get-current-state new-states)]
(reset! state new-state)))
(when out-event (a/put! >remote out-event))
(recur (if new-shadow new-shadow shadow)
(if new-states new-states states))))))))))
(def make-server-agent (partial make-agent false))
(def make-client-agent (partial make-agent true))
#+clj
(defn sync-new-client! [<remote >remote state]
(let [states (atom (sync/new-state @state))
client-id (java.util.UUID/randomUUID)
state-change-events (chan 1 (map (fn [s] {:type :new-state :state s})))
client-events (chan)]
(add-watch states client-id (fn [_ _ _ new-states]
(let [new-state (sync/get-current-state new-states)]
(when new-state
(reset! state new-state)
(a/put! state-change-events new-state)))))
(a/go-loop []
(let [[val _] (a/alts! [<remote state-change-events])]
(if val
(do (>! client-events val)
(recur))
(do (remove-watch states client-id)
(a/close! <remote)
(a/close! state-change-events)))))
(make-server-agent client-events >remote states sync/empty-state)))
(defn sync-new-client! [>remote events state]
(let [client-id (java.util.UUID/randomUUID)
new-states (chan)]
(add-watch state client-id (fn [_ _ old new]
(when-not (= old new)
(a/put! new-states new))))
(make-server-agent >remote events new-states state)))
#+cljs
(defn sync-client! [<remote >remote <view state]
(let [states (atom (sync/initial-state {} {}))
local-events (chan 1 (map (fn [s] {:type :new-state :state s})))]
(add-watch states :render (fn [_ _ _ new-states]
(let [new-state (sync/get-current-state new-states)]
(reset! state new-state))))
(a/pipe <view local-events)
(make-client-agent (a/merge [local-events <remote]) >remote states sync/empty-state)
(a/put! >remote message/full-sync-request)
states))
(defn sync-client! [>remote events new-states state]
(make-client-agent >remote events new-states state)
(a/put! >remote message/full-sync-request))

View file

@ -7,11 +7,6 @@
(def empty-state {:grubs {} :recipes {}})
(defn initial-state [grubs recipes]
(let [state {:grubs (util/map-by-key :id grubs)
:recipes (util/map-by-key :id recipes)}]
[{:state state :hash (hasch/uuid state)}]))
(defn new-state [state]
[{:hash (hasch/uuid state)
:state state}])