Rework sync - wip

- Fetch full sync state from database, save upsert changes to database
This commit is contained in:
Nicholas Kariniemi 2015-07-17 00:25:24 +03:00
parent 8b11c119f2
commit 755d06f32b
3 changed files with 109 additions and 74 deletions

View file

@ -2,7 +2,6 @@
(:gen-class)
(:require [grub.websocket :as ws]
[grub.db :as db]
[grub.state :as state]
[grub.server-sync :as sync]
[ring.middleware.resource :as resource]
[ring.middleware.content-type :as content-type]
@ -41,32 +40,27 @@
:database-uri (System/getenv "GRUB_DATABASE_URI")
:db-conn nil
:port 3000
:stop-server nil
:states (atom nil)})
:stop-server nil})
(def dev-system
{:index dev-index-page
:database-uri (or (System/getenv "GRUB_DATABASE_URI") "datomic:mem://grub")
:db-conn nil
:port 3000
:stop-server nil
:states (atom nil)})
:stop-server nil})
(defn handle-websocket [handler states new-states-pub]
(defn handle-websocket [handler db-conn]
(fn [{:keys [websocket?] :as request}]
(if websocket?
(httpkit/with-channel request ws-channel
(let [to-client (chan)
from-client (chan)
new-states (chan (a/sliding-buffer 1))
(let [up (chan)
down (chan)
saved (chan)
on-close (fn []
(a/unsub new-states-pub :new-state new-states)
(a/close! new-states)
(a/close! from-client)
(a/close! to-client))]
(a/sub new-states-pub :new-state new-states)
(ws/add-connected-client! ws-channel to-client from-client on-close)
(sync/make-server-agent to-client from-client new-states states)))
(a/close! up)
(a/close! down))]
(ws/add-connected-client! ws-channel down up on-close)
(sync/make-server-agent up down saved db-conn)))
(handler request))))
(defn handle-root [handler index]
@ -81,37 +75,25 @@
(resp/not-found "")
(handler req))))
(defn make-handler [{:keys [index states]} new-states-pub]
(defn make-handler [{:keys [index]} db-conn]
(-> (fn [req] (resp/not-found "Not found"))
(resource/wrap-resource "public")
(content-type/wrap-content-type)
(handle-root index)
(handle-websocket states new-states-pub)
(handle-websocket db-conn)
(wrap-bounce-favicon)))
(defn start [{:keys [port database-uri states] :as system}]
(defn start [{:keys [port database-uri] :as system}]
(let [db-conn (db/connect database-uri)
new-states (chan)
new-states-pub (a/pub new-states (fn [_] :new-state))
db-state (db/get-current-state db-conn)
_ (reset! states (state/new-states (if db-state db-state state/empty-state)))
stop-server (httpkit/run-server (make-handler system new-states-pub) {:port port})]
(add-watch states :db (fn [_ _ old new]
(when-not (= old new)
(let [new-state (state/get-latest new)]
(a/put! new-states new-state)
(db/update-db! db-conn new-state)))))
stop-server (httpkit/run-server (make-handler system db-conn) {:port port})]
(println "Started server on localhost:" port)
(assoc system
:db-conn db-conn
:stop-server stop-server
:states states)))
:stop-server stop-server)))
(defn stop [{:keys [db-conn stop-server states] :as system}]
(remove-watch states :db)
(defn stop [{:keys [db-conn stop-server] :as system}]
(stop-server)
(db/disconnect db-conn)
(reset! states nil)
system)
(defn usage [options-summary]

View file

@ -1,7 +1,8 @@
(ns grub.db
(:require [datomic.api :as d]
[clojure.core.async :as a :refer [<! >! chan go]]
[grub.util :as util]))
[grub.util :as util]
[clojure.pprint :refer [pprint]]))
(def schema-tx [
;; grubs
@ -91,30 +92,35 @@
{:grubs grubs
:recipes recipes}))
(defn grub-tx [grub]
[{:db/id (d/tempid :db.part/user)
:grub/id (:id grub)
:grub/text (:text grub)
:grub/completed (:completed grub)}])
(defn remove-keys-with-nil-vals [mapcoll]
(->> mapcoll
(remove (fn [[k v]] (nil? v)))
(reduce (fn [cur [k v]] (assoc cur k v)) {})))
(defn recipe-tx [recipe]
[{:db/id (d/tempid :db.part/user)
:recipe/id (:id recipe)
:recipe/name (:name recipe)
:recipe/grubs (:grubs recipe)
:recipe/directions (:directions recipe)}])
(defn upsert-grub-tx [grub]
[(remove-keys-with-nil-vals {:db/id (d/tempid :db.part/user)
:grub/id (:id grub)
:grub/text (:text grub)
:grub/completed (:completed grub)})])
(defn upsert-recipe-tx [recipe]
[(remove-keys-with-nil-vals {:db/id (d/tempid :db.part/user)
:recipe/id (:id recipe)
:recipe/name (:name recipe)
:recipe/grubs (:grubs recipe)
:recipe/directions (:directions recipe)})])
(defn update-db! [conn state]
(let [grubs-tx (->> state
:grubs
(vals)
(map grub-tx)
(map upsert-grub-tx)
(flatten)
(vec))
recipes-tx (->> state
:recipes
(vals)
(map recipe-tx)
(map upsert-recipe-tx)
(flatten)
(vec))
tx (into grubs-tx recipes-tx)]
@ -123,3 +129,49 @@
(defn disconnect [conn]
(d/release conn))
(defn get-history-state [db-conn tag]
(get-current-state db-conn))
(defn patch-map [state diff]
(-> state
(#(apply dissoc % (into [] (:- diff))))
(#(merge-with merge % (:+ diff)))))
(defn patch-state [state diff]
(->> state
(keys)
(map (fn [k] [k (patch-map (k state) (k diff))]))
(into {})))
(def empty-diff {:grubs {:- #{} :+ nil}
:recipes {:- #{} :+ nil}})
(def added-diff
{:grubs {:- #{}
:+ {"grub-completed" {:completed true}
"grub-updated" {:text "Ketchup"}
"grub-added" {:completed false :text "Toothpaste"}}}
:recipes {:- #{} :+ nil}})
(defn diff-tx [diff]
(let [grubs-tx (->> diff
:grubs
:+
(map (fn [[k v]] (assoc v :id k)))
(map upsert-grub-tx)
(flatten)
(vec))
recipes-tx (->> diff
:recipes
:+
(map (fn [[k v]] (assoc v :id k)))
(map upsert-recipe-tx)
(flatten)
(vec))]
(into grubs-tx recipes-tx)))
(defn patch-state! [conn diff]
(pprint (diff-tx diff))
@(d/transact conn (diff-tx diff)))

View file

@ -1,10 +1,9 @@
(ns grub.server-sync
(:require [grub.diff :as diff]
[grub.state :as state]
#?(:cljs [cljs.core.async :as a :refer [<! >! chan]]
:clj [clojure.core.async :as a :refer [<! >! chan go]]))
#?(:cljs (:require-macros [grub.macros :refer [log logs]]
[cljs.core.async.macros :refer [go]])))
[clojure.core.async :as a :refer [<! >! chan go]]
[grub.db :as db]
[clojure.pprint :refer [pprint]]))
(defn full-sync [state]
{:type :full-sync
@ -58,28 +57,30 @@
(assoc latest-state :tag (inc (:tag shadow))))}))
(defmethod handle-event :default [msg]
#?(:clj (println "Unhandled message:" msg))
(println "Unhandled message:" msg)
{})
(defn make-server-agent
([>remote events new-states states]
(make-server-agent >remote events new-states states state/empty-state))
([>remote events new-states states initial-shadow]
(go (loop [shadow initial-shadow]
(let [[v c] (a/alts! [new-states events] :priority true)]
(cond (nil? v) nil ;; drop out of loop
(= c new-states)
(let [event {:type :new-state
:new-state v
:shadow shadow
:states states}
{:keys [out-event new-shadow]} (handle-event event)]
(when out-event (a/put! >remote out-event))
(recur (if new-shadow new-shadow shadow)))
(= c events)
(let [event (assoc v
:states states
:shadow shadow)
{:keys [new-shadow out-event]} (handle-event event)]
(when out-event (a/put! >remote out-event))
(recur (if new-shadow new-shadow shadow)))))))))
([up down saved db-conn]
(go (loop [shadow (db/get-current-state db-conn)]
(let [[event c] (a/alts! [up saved] :priority true)]
(println "Handling event:")
(pprint event)
(when-not (nil? event)
(case (:type event)
:diff
(let [history-state (db/get-history-state db-conn (:shadow-tag event))
new-state (db/patch-state! db-conn (:diff event))
new-shadow (diff/patch-state history-state (:diff event))
return-diff (diff/diff-states new-shadow new-state)]
(>! down return-diff)
(recur new-shadow))
:full-sync-request
(do (println "full sync!")
(>! down (full-sync (db/get-current-state db-conn)))
(recur shadow))
(do (println "Unhandled event")
(println event)
(recur shadow)))))))))