Refactor server to use new core.async goodness

This commit is contained in:
Nicholas Kariniemi 2013-09-30 13:11:59 +03:00
parent 331f74fa4d
commit 6bc7582255
4 changed files with 102 additions and 159 deletions

View file

@ -1,85 +0,0 @@
(ns grub.async
(:require [clojure.core.async :as async :refer [<! >! chan go put! alts!]]))
(defmacro go-loop [& body]
`(clojure.core.async/go
(while true
~@body)))
(defn put-all! [cs x]
(doseq [c cs]
(put! c x)))
(defn fan-out [in cs-or-n]
(let [cs (if (number? cs-or-n)
(repeatedly cs-or-n chan)
cs-or-n)]
(go (loop []
(let [x (<! in)]
(if-not (nil? x)
(do
(put-all! cs x)
(recur))
:done))))
cs))
(defn copy-chan
([c]
(first (fan-out c 1)))
([out c]
(first (fan-out c [out]))))
(defn put-all! [cs x]
(doseq [c cs]
(put! c x)))
(defn fan-out [in cs-or-n]
(let [cs (if (number? cs-or-n)
(repeatedly cs-or-n chan)
cs-or-n)]
(go (loop []
(let [x (<! in)]
(if-not (nil? x)
(do
(put-all! cs x)
(recur))
:done))))
cs))
(defn fan-in
([ins] (fan-in (chan) ins))
([c ins]
(go-loop
(let [[x] (alts! ins)]
(>! c x)))
c))
(defn map-chan
([f source] (map-chan (chan) f source))
([c f source]
(go-loop
(>! c (f (<! source))))
c))
(defn filter-chan
([f source] (filter-chan (chan) f source))
([c f source]
(go-loop
(let [v (<! source)]
(when (f v)
(>! c v))))
c))
(defn do-chan! [f source]
(go-loop
(let [v (<! source)]
(f v))))
(defn do-chan [f source]
(let [out (chan)]
(go-loop
(let [v (<! source)]
(>! out v)
(f v)))
out))

View file

@ -53,10 +53,19 @@
(integration-test/run integration-test-port)
(stop-server)))
(defn start-production-server []
(reset! js-file "/js/grub.js")
(let [db-chan (db/connect-production-database)]
(ws/pass-received-events-to-clients-and-db db-chan)
(start-server default-port)))
(defn start-development-server []
(let [db-chan (db/connect-development-database)]
(ws/pass-received-events-to-clients-and-db db-chan)
(start-server default-port)))
(defn -main [& args]
(cond
(some #(= % "integration") args) (run-integration-test)
(some #(= % "production") args) (do (reset! js-file "/js/grub.js")
(start-server default-port))
:else (do (db/connect-and-handle-events "grub-dev")
(start-server default-port))))
(some #(= % "production") args) (start-production-server)
:else (start-development-server)))

View file

@ -1,9 +1,8 @@
(ns grub.db
(:require [grub.async :refer [go-loop]]
[monger.core :as m]
(:require [monger.core :as m]
[monger.collection :as mc]
[monger.operators :as mo]
[clojure.core.async :as async :refer [<! >! >!! chan go close! timeout]]))
[clojure.core.async :as a :refer [<! >! chan go]]))
(def grub-collection "grubs")
(def recipe-collection "recipes")
@ -11,8 +10,6 @@
(defn clear-grubs []
(mc/drop grub-collection))
(def incoming-events (atom nil))
(defmulti handle-event :event :default :unknown-event)
(defmethod handle-event :add [event]
@ -48,40 +45,46 @@
(defmethod handle-event :unknown-event [event]
(println "Cannot handle unknown event:" event))
(defn handle-incoming-events! []
(reset! incoming-events (chan))
(go-loop (let [event (<! @incoming-events)]
(handle-event event))))
(defn get-current-grubs-as-events []
(let [grubs (mc/find-maps grub-collection)
sorted-grubs (sort-by :_id (vec grubs))
events (map (fn [g] (-> g
(select-keys [:_id :grub :completed])
(assoc :event :add)))
sorted-grubs)
out (chan)]
(go (doseq [grub sorted-grubs]
(let [grub-event (-> grub
(select-keys [:_id :grub :completed])
(assoc :event :add))]
(>! out grub-event))))
(a/onto-chan out events)
out))
(defn get-current-recipes-as-events []
(let [recipes (mc/find-maps recipe-collection)
sorted-recipes (sort-by :_id (vec recipes))
events (map (fn [r] (-> r
(select-keys [:_id :name :steps])
(assoc :event :add-recipe)))
sorted-recipes)
out (chan)]
(go (doseq [recipe sorted-recipes]
(let [recipe-event (-> recipe
(select-keys [:_id :name :steps])
(assoc :event :add-recipe))]
(>! out recipe-event))))
(a/onto-chan out events)
out))
(def default-db "grub")
(def production-db "grub")
(def development-db "grub-dev")
(defn connect-and-handle-events
([] (connect-and-handle-events default-db))
([db-name]
(handle-incoming-events!)
(m/connect!)
(m/set-db! (m/get-db db-name))))
(defn handle-incoming-events [in]
(a/go-loop [] (let [event (<! in)]
(println "db received event:" event)
(handle-event event)
(recur))))
(connect-and-handle-events default-db)
(defn connect-and-handle-events [db-name]
(let [in (chan)]
(handle-incoming-events in)
(m/connect!)
(m/set-db! (m/get-db db-name))
in))
(defn connect-production-database []
(connect-and-handle-events production-db))
(defn connect-development-database []
(connect-and-handle-events development-db))

View file

@ -1,59 +1,75 @@
(ns grub.websocket
(:require [grub.async :refer [go-loop fan-out do-chan! copy-chan]]
[grub.db :as db]
(:require [grub.db :as db]
[org.httpkit.server :as httpkit]
[clojure.core.async :refer [<! >! chan go]]))
[clojure.core.async :as a :refer [<! >! chan go]]))
(def incoming-events (chan))
(defn get-incoming-events []
incoming-events)
(def connected-clients (atom []))
(def ws-channels (atom []))
(def ws-channel-id-count (atom 0))
(defn push-event-to-others [orig-event]
(let [my-ws-channel-id (:ws-channel orig-event)
other-channels (fn [] (filter #(not (= (:id %) my-ws-channel-id)) @ws-channels))
event (dissoc orig-event :ws-channel)]
(go (doseq [{ch :channel} (other-channels)]
(>! ch event)))))
(defn get-unique-ws-id []
(swap! ws-channel-id-count inc))
(defn push-current-grubs-to-client [c ws-channel]
(copy-chan c (db/get-current-grubs-as-events)))
(defn add-connected-client! [ws-channel]
(let [ws-channel-id (get-unique-ws-id)
client-chan (chan)]
(swap! connected-clients
conj
{:id ws-channel-id :channel client-chan})
[ws-channel-id client-chan]))
(defn push-current-recipes-to-client [c ws-channel]
(copy-chan c (db/get-current-recipes-as-events)))
(defn push-received-events-to-client [c ws-channel]
(go-loop (let [event (<! c)
event-str (str event)]
(println "Send to client" event-str)
(httpkit/send! ws-channel event-str))))
(defn add-incoming-event [raw-event ws-channel-id]
(defn add-event-to-incoming-channel [raw-event ws-channel-id]
(let [parsed-event (read-string raw-event)
event (assoc parsed-event :ws-channel ws-channel-id)]
(println "Received event" event)
(go (>! (get-incoming-events) event))))
(go (>! incoming-events event))))
(defn handle-incoming-events []
(let [[incoming incoming'] (fan-out incoming-events 2)]
(do-chan! push-event-to-others incoming)
(go-loop (let [event (<! incoming')
parsed-event (dissoc event :ws-channel)]
(>! @db/incoming-events event)))))
(defn forward-client-events-to-others [ws-channel ws-channel-id]
(httpkit/on-receive ws-channel
#(add-event-to-incoming-channel % ws-channel-id)))
(defn forward-other-events-to-client [c ws-channel]
(a/go-loop [] (let [event (<! c)
event-str (str event)]
(println "Send to client '" event-str "'")
(httpkit/send! ws-channel event-str))
(recur)))
(defn send-current-grubs-and-recipes-to-client [client-chan]
(a/pipe (db/get-current-grubs-as-events) client-chan false)
(a/pipe (db/get-current-recipes-as-events) client-chan false))
(defn setup-new-connection [ws-channel]
(let [[ws-channel-id client-chan] (add-connected-client! ws-channel)]
(println "Channel connected:" (.toString ws-channel))
(forward-client-events-to-others ws-channel ws-channel-id)
(forward-other-events-to-client client-chan ws-channel)
(send-current-grubs-and-recipes-to-client client-chan)))
(defn websocket-handler [request]
(httpkit/with-channel request ws-channel
(let [ws-channel-id (swap! ws-channel-id-count inc)
c (chan)]
(swap! ws-channels conj {:id ws-channel-id :channel c})
(println "Channel connected:" (.toString ws-channel))
(println "Request:" request)
(httpkit/on-receive ws-channel #(add-incoming-event % ws-channel-id))
(push-current-grubs-to-client c ws-channel)
(push-current-recipes-to-client c ws-channel)
(push-received-events-to-client c ws-channel))))
(handle-incoming-events)
(httpkit/with-channel request ws-channel (setup-new-connection ws-channel)))
(defn get-other-clients [my-ws-channel-id]
(filter #(not (= (:id %) my-ws-channel-id))
@connected-clients))
(defn push-event-to-others [orig-event]
(let [my-ws-channel-id (:ws-channel orig-event)
event (dissoc orig-event :ws-channel)]
(go (doseq [{ch :channel} (get-other-clients my-ws-channel-id)]
(>! ch event)))))
(defn pass-received-events-to-clients-and-db [db-chan]
(let [in' (a/mult incoming-events)
to-others (chan)
to-database (chan)]
(a/tap in' to-others)
(a/tap in' to-database)
(a/go-loop [] (let [event (<! to-others)]
(push-event-to-others event)
(recur)))
(a/pipe to-database (a/map> #(dissoc % :ws-channel) db-chan))))