Possibly fully working (poorly tested)
This commit is contained in:
parent
d174b2236e
commit
48ba2c5449
11 changed files with 126 additions and 189 deletions
|
@ -42,7 +42,7 @@
|
|||
{:source-paths ["src/cljx"]
|
||||
:output-path "target/generated/cljs"
|
||||
:rules :cljs}]}
|
||||
:source-paths ["src/clj" "src/test"]
|
||||
:source-paths ["src/clj" "src/test" "target/classes"]
|
||||
:test-paths ["src/test"]
|
||||
:ring {:handler grub.core/app}
|
||||
:uberjar-name "grub-standalone.jar"
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
(:require [grub.websocket :as ws]
|
||||
[grub.db :as db]
|
||||
[grub.test.integration.core :as integration-test]
|
||||
[grub.shared-state :as state]
|
||||
[grub.state :as state]
|
||||
[ring.middleware.file :as file]
|
||||
[ring.util.response :as resp]
|
||||
[compojure.core :refer [defroutes GET POST]]
|
||||
|
@ -73,14 +73,14 @@
|
|||
(reset! index-page prod-index-page)
|
||||
(let [to-db (chan)]
|
||||
(db/connect-production-database to-db mongo-url)
|
||||
(state/init to-db (db/get-current-grubs) (db/get-current-recipes))
|
||||
(state/init-server to-db (db/get-current-grubs) (db/get-current-recipes))
|
||||
(println "Starting production server on localhost:" port)
|
||||
(start-server port)))
|
||||
|
||||
(defn start-development-server [{:keys [port]}]
|
||||
(let [to-db (chan)]
|
||||
(db/connect-development-database to-db)
|
||||
(state/init to-db (db/get-current-grubs) (db/get-current-recipes))
|
||||
(state/init-server to-db (db/get-current-grubs) (db/get-current-recipes))
|
||||
(println "Starting development server on localhost:" port)
|
||||
(start-server port)))
|
||||
|
||||
|
|
|
@ -5,14 +5,12 @@
|
|||
[cljs.core.async :as a :refer [<! >! chan]])
|
||||
(:require-macros [grub.macros :refer [log logs]]))
|
||||
|
||||
(defn connect-to-server [reset? state-changes]
|
||||
(let [to-remote (chan)
|
||||
(defn init-app []
|
||||
(let [current-state (atom state/empty-state)
|
||||
state-changes (view/render-app current-state)
|
||||
to-remote (chan)
|
||||
from-remote (chan)]
|
||||
(ws/connect-client! to-remote from-remote)
|
||||
(state/sync-state! from-remote to-remote reset? state-changes)))
|
||||
|
||||
(defn init-app []
|
||||
(let [state-changes (view/render-app state/state)]
|
||||
(connect-to-server true state-changes)))
|
||||
(state/init-client from-remote to-remote state-changes current-state)))
|
||||
|
||||
(init-app)
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
(ns grub.state
|
||||
(:require [grub.diff :as diff]
|
||||
[grub.common-state :as cs]
|
||||
[grub.message :as message]
|
||||
[grub.sync :as sync]
|
||||
[cljs.core.async :as a :refer [<! >! chan]]
|
||||
[hasch.core :as hasch])
|
||||
(:require-macros [grub.macros :refer [log logs]]
|
||||
[cljs.core.async.macros :refer [go go-loop]]))
|
||||
|
||||
(def state (atom sync/empty-state))
|
||||
(def server-state (atom sync/empty-state))
|
||||
|
||||
(def unacked-states (atom {}))
|
||||
|
||||
(defn get-server-state [hash]
|
||||
(if (= (hasch/uuid @server-state) hash)
|
||||
@server-state
|
||||
(get @unacked-states hash)))
|
||||
|
||||
(defn send-state-changes-to-server! [state-changes from]
|
||||
(go-loop []
|
||||
(when-let [current-state (<! state-changes)]
|
||||
(when-not (= @server-state current-state)
|
||||
(let [msg (cs/diff-states @server-state current-state)]
|
||||
(swap! unacked-states assoc (:hash msg) current-state)
|
||||
(a/put! from msg)))
|
||||
(recur))))
|
||||
|
||||
(defn handle-received-changes! [to from]
|
||||
(go-loop []
|
||||
(if-let [{:keys [type diff hash shadow-hash] :as msg} (<! to)]
|
||||
(do (condp = type
|
||||
:diff
|
||||
(if-let [acked-server-state (get-server-state shadow-hash)]
|
||||
(do (reset! server-state acked-server-state)
|
||||
(reset! unacked-states {})
|
||||
(let [new-server (swap! server-state #(diff/patch-state % diff))]
|
||||
(if (= (hasch/uuid new-server) hash)
|
||||
(swap! state diff/patch-state diff)
|
||||
(do (log "State update failure --> full sync")
|
||||
(a/put! from message/full-sync-request)))))
|
||||
(do (log "Could not find server state locally --> full sync")
|
||||
(a/put! from message/full-sync-request)))
|
||||
:full-sync (do
|
||||
(logs "Full sync")
|
||||
(reset! unacked-states {})
|
||||
(reset! server-state (:state msg))
|
||||
(reset! state (:state msg)))
|
||||
(logs "Invalid msg:" msg))
|
||||
(recur))
|
||||
(remove-watch state :state))))
|
||||
|
||||
(defn sync-state! [to from reset? state-changes]
|
||||
(send-state-changes-to-server! state-changes from)
|
||||
(handle-received-changes! to from)
|
||||
(a/put! from (if reset? message/full-sync-request (cs/diff-states @server-state @state))))
|
|
@ -14,6 +14,7 @@
|
|||
(defn send-pending-msg [websocket]
|
||||
(when (and (.isOpen websocket)
|
||||
(not (nil? @pending-msg)))
|
||||
(logs "Send message:" @pending-msg)
|
||||
(.send websocket (pr-str @pending-msg))
|
||||
(reset! pending-msg nil)))
|
||||
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
(ns grub.common-state
|
||||
(:require [grub.diff :as diff]
|
||||
[hasch.core :as hasch]))
|
||||
|
||||
(def empty-state {:grubs {} :recipes {}})
|
||||
|
||||
(def complete-sync-request {:type :complete})
|
||||
(defn complete-sync-response [state]
|
||||
{:type :complete
|
||||
:state state})
|
||||
|
||||
(defn diff-msg [diff hash shadow-hash]
|
||||
{:type :diff
|
||||
:diff diff
|
||||
:hash hash
|
||||
:shadow-hash shadow-hash})
|
||||
|
||||
(defn diff-states [shadow state]
|
||||
(let [diff (diff/diff-states shadow state)
|
||||
hash (hasch/uuid state)
|
||||
shadow-hash (hasch/uuid shadow)
|
||||
msg (diff-msg diff hash shadow-hash)]
|
||||
msg))
|
|
@ -1,88 +0,0 @@
|
|||
(ns grub.shared-state
|
||||
(:require [grub.diff :as diff]
|
||||
[grub.message :as message]
|
||||
[grub.sync :as sync]
|
||||
#+clj [clojure.core.async :as a :refer [<! >! chan go]]
|
||||
#+cljs [cljs.core.async :as a :refer [<! >! chan]])
|
||||
#+cljs (:require-macros [cljs.core.async.macros :refer [go]]))
|
||||
|
||||
;; Server state
|
||||
;; (def states (atom []))
|
||||
|
||||
(defn make-server-agent
|
||||
([in out states] (make-server-agent in out states sync/empty-state))
|
||||
([in out states initial-client-shadow]
|
||||
(go (loop [client-shadow initial-client-shadow]
|
||||
(when-let [msg (<! in)]
|
||||
(condp = (:type msg)
|
||||
:diff
|
||||
(let [states* @states
|
||||
shadow (sync/get-history-state states* (:hash msg))]
|
||||
(if shadow
|
||||
(let [new-states (sync/apply-diff states* (:diff msg))
|
||||
new-shadow (diff/patch-state shadow (:diff msg))
|
||||
{:keys [diff hash]} (sync/diff-states new-states new-shadow)]
|
||||
(reset! states new-states)
|
||||
(>! out (message/diff-msg diff hash))
|
||||
(recur new-shadow))
|
||||
(let [state (sync/get-current-state @states)]
|
||||
(>! out (message/full-sync state))
|
||||
(recur state))))
|
||||
|
||||
:full-sync
|
||||
(let [state (sync/get-current-state @states)]
|
||||
(>! out (message/full-sync state))
|
||||
(recur state))
|
||||
|
||||
:new-state
|
||||
(let [{:keys [diff hash]} (sync/diff-states (:new-states msg) client-shadow)]
|
||||
(>! out (message/diff-msg diff hash))
|
||||
(recur client-shadow))
|
||||
(recur client-shadow)))))))
|
||||
|
||||
(defn make-client-agent
|
||||
([in out states] (make-client-agent in out states sync/empty-state))
|
||||
([in out states initial-server-shadow]
|
||||
(a/go-loop [server-shadow initial-server-shadow]
|
||||
(when-let [msg (<! in)]
|
||||
(condp = (:type msg)
|
||||
:diff
|
||||
(let [states* @states
|
||||
shadow (sync/get-history-state states* (:hash msg))]
|
||||
(if shadow
|
||||
(let [new-states (sync/apply-diff states* (:diff msg))
|
||||
new-shadow (diff/patch-state shadow (:diff msg))
|
||||
{:keys [diff hash]} (sync/diff-states new-states new-shadow)]
|
||||
(reset! states new-states)
|
||||
(recur new-shadow))
|
||||
(let [state (sync/get-current-state @states)]
|
||||
(>! out (message/full-sync state))
|
||||
(recur state))))
|
||||
|
||||
:full-sync
|
||||
(let [state (:state msg)]
|
||||
(reset! states [state])
|
||||
(recur state))
|
||||
|
||||
:new-state
|
||||
(let [{:keys [diff hash]} (sync/diff-states (:new-states msg) server-shadow)]
|
||||
(>! out (message/diff-msg diff hash))
|
||||
(recur server-shadow))
|
||||
(recur server-shadow))))))
|
||||
|
||||
;; TODO: Remove watch, close up channels properly
|
||||
(defn sync-new-client! [>client <client]
|
||||
nil)
|
||||
;; (let [client-id (java.util.UUID/randomUUID)
|
||||
;; state-changes (chan)
|
||||
;; state-change-events (a/map< (fn [s] {:type :new-state :new-states s}) state-changes)
|
||||
;; client-events (chan)]
|
||||
;; (add-watch states client-id (fn [_ _ _ new-states] (a/put! state-changes new-states)))
|
||||
;; (a/pipe (a/merge [<client state-change-events]) client-events)
|
||||
;; (make-server-agent client-events >client states)))
|
||||
|
||||
(defn init [to-db grubs recipes]
|
||||
nil)
|
||||
;; (reset! states (sync/initial-state grubs recipes))
|
||||
;; (add-watch states :to-db (fn [_ _ old-states new-states]
|
||||
;; (a/put! to-db (sync/get-current-state new-states)))))
|
103
src/cljx/grub/state.cljx
Normal file
103
src/cljx/grub/state.cljx
Normal file
|
@ -0,0 +1,103 @@
|
|||
(ns grub.state
|
||||
(:require [grub.diff :as diff]
|
||||
[grub.message :as message]
|
||||
[grub.sync :as sync]
|
||||
#+clj [clojure.core.async :as a :refer [<! >! chan go]]
|
||||
#+cljs [cljs.core.async :as a :refer [<! >! chan]])
|
||||
#+cljs (:require-macros [grub.macros :refer [log logs]]
|
||||
[cljs.core.async.macros :refer [go]]))
|
||||
|
||||
(defn make-agent
|
||||
([client? in out states] (make-agent client? in out states sync/empty-state))
|
||||
([client? in out states initial-shadow]
|
||||
(go (loop [shadow initial-shadow]
|
||||
(when-let [msg (<! in)]
|
||||
(condp = (:type msg)
|
||||
:diff
|
||||
(let [states* @states
|
||||
shadow (sync/get-history-state states* (:hash msg))]
|
||||
#+cljs (logs "diff msg:" msg)
|
||||
#+clj (println "diff msg:" msg)
|
||||
(if shadow
|
||||
(let [new-states (sync/apply-diff states* (:diff msg))
|
||||
new-shadow (diff/patch-state shadow (:diff msg))
|
||||
{:keys [diff hash]} (sync/diff-states (sync/get-current-state new-states) new-shadow)]
|
||||
(reset! states new-states)
|
||||
(when-not client? (>! out (message/diff-msg diff hash))) ;; HERE
|
||||
(recur new-shadow))
|
||||
(if client?
|
||||
(do (>! out message/full-sync-request)
|
||||
(recur shadow))
|
||||
(let [state (sync/get-current-state states*)]
|
||||
(>! out (message/full-sync state))
|
||||
(recur state)))))
|
||||
|
||||
:full-sync
|
||||
(if client?
|
||||
(let [state (:state msg)]
|
||||
#+cljs (logs "received full sync")
|
||||
(reset! states (sync/new-state state))
|
||||
(recur state))
|
||||
(let [state (sync/get-current-state @states)]
|
||||
#+clj (println "sending full sync")
|
||||
(>! out (message/full-sync state)) ;; HERE
|
||||
(recur state)))
|
||||
|
||||
:new-state
|
||||
(let [{:keys [diff hash]} (sync/diff-states (:state msg) shadow)]
|
||||
#+cljs (logs "new state")
|
||||
#+clj (println "new state")
|
||||
(if client?
|
||||
(when-not (sync/empty-diff? diff)
|
||||
(>! out (message/diff-msg diff hash)))
|
||||
(>! out (message/diff-msg diff hash)))
|
||||
(recur shadow))
|
||||
(recur shadow)))))))
|
||||
|
||||
(defn make-server-agent
|
||||
([in out states] (make-agent false in out states))
|
||||
([in out states initial-shadow] (make-agent false in out states initial-shadow)))
|
||||
|
||||
(defn make-client-agent
|
||||
([in out states] (make-agent true in out states))
|
||||
([in out states initial-shadow] (make-agent true in out states initial-shadow)))
|
||||
|
||||
(def states (atom []))
|
||||
(def empty-state sync/empty-state)
|
||||
|
||||
;; TODO: Remove watch, close up channels properly
|
||||
#+clj
|
||||
(defn sync-new-client! [>client <client]
|
||||
(let [client-id (java.util.UUID/randomUUID)
|
||||
state-changes (chan)
|
||||
state-change-events (a/map< (fn [s] {:type :new-state :state s}) state-changes)
|
||||
client-events (chan)]
|
||||
(add-watch states client-id (fn [_ _ _ new-states]
|
||||
(a/put! state-changes (sync/get-current-state new-states))))
|
||||
(a/go-loop []
|
||||
(let [[val _] (a/alts! [<client state-change-events])]
|
||||
(if val
|
||||
(do (>! client-events val)
|
||||
(recur))
|
||||
(do #+clj (println "client disconnected, clean up")
|
||||
(remove-watch states client-id)
|
||||
(a/close! <client)
|
||||
(a/close! state-change-events)))))
|
||||
(make-server-agent client-events >client states)))
|
||||
|
||||
(defn init-server [to-db grubs recipes]
|
||||
(reset! states (sync/initial-state grubs recipes))
|
||||
(add-watch states :to-db (fn [_ _ old-states new-states]
|
||||
(a/put! to-db (sync/get-current-state new-states)))))
|
||||
|
||||
#+cljs
|
||||
(defn init-client [in out state-changes current-state]
|
||||
(reset! states (sync/initial-state {} {}))
|
||||
(add-watch states :render (fn [_ _ _ new-states]
|
||||
(let [new-state (sync/get-current-state new-states)]
|
||||
(reset! current-state new-state))))
|
||||
(a/pipe (a/map< (fn [s]
|
||||
(swap! states sync/add-history-state s)
|
||||
{:type :new-state :state s}) state-changes) in)
|
||||
(make-client-agent in out states)
|
||||
(a/put! out message/full-sync-request))
|
|
@ -10,6 +10,10 @@
|
|||
:recipes (util/map-by-key :id recipes)}]
|
||||
[{:state state :hash (hasch/uuid state)}]))
|
||||
|
||||
(defn new-state [state]
|
||||
[{:hash (hasch/uuid state)
|
||||
:state state}])
|
||||
|
||||
(defn get-current-state [states]
|
||||
(:state (last states)))
|
||||
|
||||
|
@ -24,7 +28,8 @@
|
|||
(conj states {:hash new-hash :state new-state}))))
|
||||
|
||||
(defn diff-states [states shadow]
|
||||
(let [state (get-current-state states)]
|
||||
(let [state states;(get-current-state states)
|
||||
]
|
||||
{:hash (hasch/uuid shadow)
|
||||
:diff (diff/diff-states shadow state)}))
|
||||
|
||||
|
@ -32,3 +37,5 @@
|
|||
(let [new-state (diff/patch-state (get-current-state states) diff)]
|
||||
(add-history-state states new-state)))
|
||||
|
||||
(defn empty-diff? [diff]
|
||||
(= diff {:recipes {:deleted #{}, :updated nil}, :grubs {:deleted #{}, :updated nil}}))
|
||||
|
|
|
@ -75,7 +75,7 @@
|
|||
(defn start-db-and-websocket-server! []
|
||||
(let [to-db (chan)]
|
||||
(db/connect-and-handle-events to-db "grub-integration-test")
|
||||
(state/init to-db (db/get-current-grubs) (db/get-current-recipes))))
|
||||
(state/init-server to-db (db/get-current-grubs) (db/get-current-recipes))))
|
||||
|
||||
(defn run []
|
||||
(println "Starting integration test")
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
(ns grub.test.unit.state
|
||||
(:require [grub.shared-state :as state]
|
||||
(:require [grub.state :as state]
|
||||
[clojure.test :refer :all]
|
||||
[midje.sweet :refer :all]
|
||||
[hasch.core :as hasch]
|
||||
|
@ -121,7 +121,7 @@
|
|||
:recipes {}})
|
||||
client-state {:grubs {"1" {:text "2 apples" :completed false}} :recipes {}}
|
||||
msg {:type :new-state
|
||||
:new-states @states}
|
||||
:state (:state (last @states))}
|
||||
in (chan 1)
|
||||
out (chan 1)]
|
||||
(state/make-server-agent in out states client-state)
|
||||
|
@ -156,9 +156,7 @@
|
|||
server-out (chan)
|
||||
client-state-changes (chan 1)
|
||||
msg {:type :new-state
|
||||
:new-states (hashed-states
|
||||
{:grubs {"1" {:text "2 apples" :completed false}} :recipes {}}
|
||||
{:grubs {"1" {:text "2 apples" :completed true}} :recipes {}})}]
|
||||
:state {:grubs {"1" {:text "2 apples" :completed true}} :recipes {}}}]
|
||||
(a/pipe client-out server-in)
|
||||
(a/pipe server-out client-in)
|
||||
(state/make-client-agent client-in client-out client-states server-shadow)
|
||||
|
@ -185,9 +183,7 @@
|
|||
server-in (chan)
|
||||
server-out (chan)
|
||||
msg {:type :new-state
|
||||
:new-states (hashed-states
|
||||
{:grubs {"1" {:text "2 apples" :completed false}} :recipes {}}
|
||||
{:grubs {"1" {:text "2 apples" :completed true}} :recipes {}})}
|
||||
:state {:grubs {"1" {:text "2 apples" :completed true}} :recipes {}}}
|
||||
client-state-changes (chan 1)]
|
||||
(a/pipe client-out server-in)
|
||||
(a/pipe server-out client-in)
|
||||
|
|
Loading…
Reference in a new issue