Persist grubs on server, send on connect

This commit is contained in:
Nicholas Kariniemi 2013-08-05 18:19:52 +03:00
parent 7875059fcd
commit fb38e8d1f6
6 changed files with 221 additions and 41 deletions

View file

@ -11,7 +11,8 @@
[ring/ring-core "1.2.0"]
[hiccup "1.0.4"]
[prismatic/dommy "0.1.1"]
[core.async "0.1.0-SNAPSHOT"]]
[core.async "0.1.0-SNAPSHOT"]
[com.novemberain/monger "1.5.0"]]
:plugins [[lein-cljsbuild "0.3.2"]
[lein-ring "0.8.6"]]
:cljsbuild {

85
src-clj/grub/async.clj Normal file
View file

@ -0,0 +1,85 @@
(ns grub.async
(:require [clojure.core.async :as async :refer [<! >! chan go put! alts!]]))
(defmacro go-loop [& body]
`(clojure.core.async/go
(while true
~@body)))
(defn put-all! [cs x]
(doseq [c cs]
(put! c x)))
(defn fan-out [in cs-or-n]
(let [cs (if (number? cs-or-n)
(repeatedly cs-or-n chan)
cs-or-n)]
(go (loop []
(let [x (<! in)]
(if-not (nil? x)
(do
(put-all! cs x)
(recur))
:done))))
cs))
(defn copy-chan
([c]
(first (fan-out c 1)))
([out c]
(first (fan-out c [out]))))
(defn put-all! [cs x]
(doseq [c cs]
(put! c x)))
(defn fan-out [in cs-or-n]
(let [cs (if (number? cs-or-n)
(repeatedly cs-or-n chan)
cs-or-n)]
(go (loop []
(let [x (<! in)]
(if-not (nil? x)
(do
(put-all! cs x)
(recur))
:done))))
cs))
(defn fan-in
([ins] (fan-in (chan) ins))
([c ins]
(go-loop
(let [[x] (alts! ins)]
(>! c x)))
c))
(defn map-chan
([f source] (map-chan (chan) f source))
([c f source]
(go-loop
(>! c (f (<! source))))
c))
(defn filter-chan
([f source] (filter-chan (chan) f source))
([c f source]
(go-loop
(let [v (<! source)]
(when (f v)
(>! c v))))
c))
(defn do-chan! [f source]
(go-loop
(let [v (<! source)]
(f v))))
(defn do-chan [f source]
(let [out (chan)]
(go-loop
(let [v (<! source)]
(>! out v)
(f v)))
out))

View file

@ -1,36 +1,15 @@
(ns grub.core
(:require [ring.middleware.reload :as reload]
(:require [grub.websocket :as ws]
[grub.db :as db]
[ring.middleware.reload :as reload]
[compojure.core :refer [defroutes GET POST]]
[compojure.handler :as handler]
[compojure.route :as route]
[org.httpkit.server :as httpkit]
[hiccup
[page :refer [html5]]
[page :refer [include-js include-css]]]
[clojure.core.async :as async :refer [<! >! >!! chan go close! timeout]]))
[page :refer [include-js include-css]]]))
(def out-channels (atom []))
(def channel-id-count (atom 0))
(defn push-grub-to-others [grub my-channel-id]
(let [other-channels (fn [] (filter #(not (= (:id %) my-channel-id)) @out-channels))]
(go (doseq [{ch :channel} (other-channels)]
(>! ch grub)))))
(defn push-new-grubs-to-client [c ws-channel]
(go (while true
(let [grub (<! c)]
(httpkit/send! ws-channel grub)))))
(defn websocket-handler [request]
(httpkit/with-channel request ws-channel
(let [channel-id (swap! channel-id-count inc)
c (chan)]
(swap! out-channels conj {:id channel-id :channel c})
(println "channel connected:" (.toString ws-channel))
(httpkit/on-receive ws-channel #(push-grub-to-others % channel-id))
(push-new-grubs-to-client c ws-channel))))
(defn index-page []
(html5
[:head
@ -38,17 +17,18 @@
[:meta {:name "viewport" :content "width=device-width, initial-scale=1.0"}]
(include-css "/css/bootstrap.css")
(include-css "/css/styles.css")]
[:body
(include-js "http://code.jquery.com/jquery.js")
(include-js "/js/bootstrap.js")
(include-js "/js/main.js")]))
[:body
(include-js "http://code.jquery.com/jquery.js")
(include-js "/js/bootstrap.js")
(include-js "/js/main.js")]))
(defroutes routes
(GET "/ws" [] websocket-handler)
(GET "/ws" [] ws/websocket-handler)
(GET "/" [] (index-page))
(route/files "/")
(route/not-found "<p>Page not found.</p>"))
(def app
(let [dev? true]
(if dev?
@ -56,4 +36,4 @@
(handler/site routes))))
(defn -main [& args]
(httpkit/run-server app {:port 3000}))
(httpkit/run-server app {:port 3000}))

62
src-clj/grub/db.clj Normal file
View file

@ -0,0 +1,62 @@
(ns grub.db
(:require [grub.async :refer [go-loop]]
[monger.core :as m]
[monger.collection :as mc]
[monger.operators :as mo]
[clojure.core.async :as async :refer [<! >! >!! chan go close! timeout]]))
(def grub-collection "grubs")
(def incoming-events (chan))
(defn get-incoming-events []
incoming-events)
(defmulti handle-event :event :default :unknown-event)
(defmethod handle-event :create [event]
(let [grub (-> event
(select-keys [:_id :grub])
(assoc :completed false))]
(mc/insert grub-collection grub)))
(defmethod handle-event :complete [event]
(mc/update grub-collection
{:_id (:_id event)}
{mo/$set {:completed true}}))
(defmethod handle-event :uncomplete [event]
(mc/update grub-collection
{:_id (:_id event)}
{mo/$set {:completed false}}))
(defmethod handle-event :delete [event]
(mc/remove grub-collection {:_id (:_id event)}))
(defmethod handle-event :unknown-event [event]
(println "Cannot handle unknown event:" event))
(defn handle-incoming-events []
(go-loop (let [event (<! (get-incoming-events))]
(println "DB handling" event)
(handle-event event))))
(defn get-current-grubs-as-events []
(let [grubs (mc/find-maps grub-collection)
sorted-grubs (sort-by :_id (vec grubs))
out (chan)]
(println "sorted-grubs:" sorted-grubs)
(go (doseq [grub sorted-grubs]
(let [grub-event (-> grub
(select-keys [:_id :grub :completed])
(assoc :event :create))]
(>! out grub-event))))
out))
(defn connect-to-db []
(println "Connect to db")
(m/connect!)
(m/set-db! (m/get-db "monger-test")))
(connect-to-db)
(handle-incoming-events)

View file

@ -0,0 +1,53 @@
(ns grub.websocket
(:require [grub.async :refer [go-loop fan-out do-chan! copy-chan]]
[grub.db :as db]
[org.httpkit.server :as httpkit]
[clojure.core.async :refer [<! >! chan go]]))
(def incoming-events (chan))
(defn get-incoming-events []
incoming-events)
(def ws-channels (atom []))
(def ws-channel-id-count (atom 0))
(defn push-event-to-others [orig-event]
(let [my-ws-channel-id (:ws-channel orig-event)
other-channels (fn [] (filter #(not (= (:id %) my-ws-channel-id)) @ws-channels))
event (dissoc orig-event :ws-channel)]
(go (doseq [{ch :channel} (other-channels)]
(>! ch event)))))
(defn push-current-grubs-to-client [c ws-channel]
(copy-chan c (db/get-current-grubs-as-events)))
(defn push-received-events-to-client [c ws-channel]
(go-loop (let [event (<! c)
event-str (str event)]
(println "Send to client" event)
(httpkit/send! ws-channel event-str))))
(defn add-incoming-event [raw-event ws-channel-id]
(let [parsed-event (read-string raw-event)
event (assoc parsed-event :ws-channel ws-channel-id)]
(println "Received event" event)
(go (>! (get-incoming-events) event))))
(defn handle-incoming-events []
(let [[incoming incoming'] (fan-out incoming-events 2)]
(do-chan! push-event-to-others incoming)
(go-loop (let [event (<! incoming')]
(>! (db/get-incoming-events) event)))))
(defn websocket-handler [request]
(httpkit/with-channel request ws-channel
(let [ws-channel-id (swap! ws-channel-id-count inc)
c (chan)]
(swap! ws-channels conj {:id ws-channel-id :channel c})
(println "Channel connected:" (.toString ws-channel))
(httpkit/on-receive ws-channel #(add-incoming-event % ws-channel-id))
(push-current-grubs-to-client c ws-channel)
(push-received-events-to-client c ws-channel))))
(handle-incoming-events)

View file

@ -8,7 +8,7 @@
[cljs.core.async.macros :refer [go]]))
(deftemplate grub-template [grub]
[:tr {:id (:id grub)}
[:tr {:id (:_id grub)}
[:td
[:div.checkbox.grubCheckbox [:label
[:input {:type "checkbox"}]
@ -40,7 +40,7 @@
(dommy/prepend! (sel1 :body) (main-template)))
(defn add-grub-to-dom [grub-obj]
(logs "Adding" grub-obj)
(logs "Add" grub-obj)
(dommy/append! (sel1 :#grubList) (grub-template grub-obj)))
(defn add-grub [grub]
@ -48,14 +48,14 @@
(defn complete-grub [grub]
(logs "Complete" grub)
(aset (sel1 [(str "#" (:id grub)) "input"]) "checked" true))
(aset (sel1 [(str "#" (:_id grub)) "input"]) "checked" true))
(defn uncomplete-grub [grub]
(logs "Uncomplete" grub)
(aset (sel1 [(str "#" (:id grub)) "input"]) "checked" false))
(aset (sel1 [(str "#" (:_id grub)) "input"]) "checked" false))
(defn delete-grub [grub]
(let [elem (sel1 (str "#" (:id grub)))]
(let [elem (sel1 (str "#" (:_id grub)))]
(.removeChild (.-parentNode elem) elem)))
(defn get-add-grub-text []
@ -84,17 +84,16 @@
(get-grubs-from-enter)])]
(->> grubs
(filter-chan #(not (empty? %)))
(map-chan (fn [g] {:event :create :grub g :id (str "grub-" (.now js/Date))})))))
(map-chan (fn [g] {:event :create :grub g :_id (str "grub-" (.now js/Date))})))))
(defn get-completed-event [event]
(logs "completed-event:" event)
(let [target (.-target event)
checked (.-checked target)
event-type (if checked :complete :uncomplete)
label (aget (.-labels (.-target event)) 0)
grub (.-textContent label)
id (.-id (.-parentNode (.-parentNode (.-parentNode (.-parentNode target)))))]
{:grub grub :id id :event event-type}))
{:grub grub :_id id :event event-type}))
(defn get-completed-events []
(let [events (:chan (event-chan (sel1 :#grubList) "change"))
@ -107,6 +106,6 @@
:click
#(go (>! click-events %)))
(let [ids (map-chan #(.-id (.-parentNode (.-parentNode (.-target %)))) click-events)
grub-events (map-chan (fn [id] {:event :delete :id id}) ids)]
grub-events (map-chan (fn [id] {:event :delete :_id id}) ids)]
grub-events)))