2013-08-05 15:19:52 +00:00
|
|
|
(ns grub.websocket
|
2013-09-30 10:11:59 +00:00
|
|
|
(:require [grub.db :as db]
|
2013-08-05 15:19:52 +00:00
|
|
|
[org.httpkit.server :as httpkit]
|
2013-09-30 10:11:59 +00:00
|
|
|
[clojure.core.async :as a :refer [<! >! chan go]]))
|
2013-08-05 15:19:52 +00:00
|
|
|
|
|
|
|
(def incoming-events (chan))
|
|
|
|
|
2013-10-09 06:04:51 +00:00
|
|
|
(def connected-clients (atom {}))
|
2013-08-05 15:19:52 +00:00
|
|
|
|
|
|
|
(def ws-channel-id-count (atom 0))
|
|
|
|
|
2013-09-30 10:11:59 +00:00
|
|
|
(defn get-unique-ws-id []
|
|
|
|
(swap! ws-channel-id-count inc))
|
2013-08-05 15:19:52 +00:00
|
|
|
|
2013-09-30 10:11:59 +00:00
|
|
|
(defn add-connected-client! [ws-channel]
|
|
|
|
(let [ws-channel-id (get-unique-ws-id)
|
|
|
|
client-chan (chan)]
|
2013-10-09 06:04:51 +00:00
|
|
|
(swap! connected-clients #(assoc % ws-channel-id client-chan))
|
2013-09-30 10:11:59 +00:00
|
|
|
[ws-channel-id client-chan]))
|
2013-08-05 15:19:52 +00:00
|
|
|
|
2013-10-09 06:04:51 +00:00
|
|
|
(defn remove-connected-client! [status ws-channel ws-channel-id client-chan]
|
|
|
|
(println "Client disconnected:"
|
|
|
|
(.toString ws-channel)
|
|
|
|
(str "(" ws-channel-id ")")
|
|
|
|
"with status" status)
|
|
|
|
(swap! connected-clients #(dissoc % ws-channel-id))
|
|
|
|
(println (count @connected-clients) "client(s) still connected")
|
|
|
|
(a/close! client-chan))
|
|
|
|
|
2013-09-05 09:39:10 +00:00
|
|
|
|
2013-09-30 10:11:59 +00:00
|
|
|
(defn add-event-to-incoming-channel [raw-event ws-channel-id]
|
2013-08-05 15:19:52 +00:00
|
|
|
(let [parsed-event (read-string raw-event)
|
|
|
|
event (assoc parsed-event :ws-channel ws-channel-id)]
|
|
|
|
(println "Received event" event)
|
2013-09-30 10:11:59 +00:00
|
|
|
(go (>! incoming-events event))))
|
|
|
|
|
|
|
|
(defn forward-other-events-to-client [c ws-channel]
|
2013-10-09 06:04:51 +00:00
|
|
|
(a/go-loop []
|
|
|
|
(when-let [event (<! c)]
|
|
|
|
(println "Send to client '" (str event) "'")
|
|
|
|
(httpkit/send! ws-channel (str event))
|
|
|
|
(recur))))
|
2013-09-30 10:11:59 +00:00
|
|
|
|
2013-08-05 15:19:52 +00:00
|
|
|
|
2013-09-30 10:11:59 +00:00
|
|
|
(defn send-current-grubs-and-recipes-to-client [client-chan]
|
|
|
|
(a/pipe (db/get-current-grubs-as-events) client-chan false)
|
|
|
|
(a/pipe (db/get-current-recipes-as-events) client-chan false))
|
|
|
|
|
|
|
|
(defn setup-new-connection [ws-channel]
|
|
|
|
(let [[ws-channel-id client-chan] (add-connected-client! ws-channel)]
|
2013-10-09 06:04:51 +00:00
|
|
|
(println "Client connected:" (.toString ws-channel) (str "(" ws-channel-id ")"))
|
|
|
|
(println (count @connected-clients) "client(s) connected")
|
|
|
|
(httpkit/on-close ws-channel #(remove-connected-client! % ws-channel ws-channel-id client-chan))
|
|
|
|
(httpkit/on-receive ws-channel #(add-event-to-incoming-channel % ws-channel-id))
|
2013-09-30 10:11:59 +00:00
|
|
|
(forward-other-events-to-client client-chan ws-channel)
|
|
|
|
(send-current-grubs-and-recipes-to-client client-chan)))
|
2013-08-05 15:19:52 +00:00
|
|
|
|
|
|
|
(defn websocket-handler [request]
|
2013-09-30 10:11:59 +00:00
|
|
|
(httpkit/with-channel request ws-channel (setup-new-connection ws-channel)))
|
|
|
|
|
2013-10-09 06:04:51 +00:00
|
|
|
(defn get-other-client-channels [my-ws-channel-id]
|
|
|
|
(-> @connected-clients
|
|
|
|
(dissoc my-ws-channel-id)
|
|
|
|
(vals)))
|
2013-09-30 10:11:59 +00:00
|
|
|
|
|
|
|
(defn push-event-to-others [orig-event]
|
|
|
|
(let [my-ws-channel-id (:ws-channel orig-event)
|
|
|
|
event (dissoc orig-event :ws-channel)]
|
2013-10-09 06:04:51 +00:00
|
|
|
(go (doseq [c (get-other-client-channels my-ws-channel-id)]
|
|
|
|
(>! c event)))))
|
2013-09-30 10:11:59 +00:00
|
|
|
|
|
|
|
(defn pass-received-events-to-clients-and-db [db-chan]
|
|
|
|
(let [in' (a/mult incoming-events)
|
|
|
|
to-others (chan)
|
|
|
|
to-database (chan)]
|
|
|
|
(a/tap in' to-others)
|
|
|
|
(a/tap in' to-database)
|
|
|
|
(a/go-loop [] (let [event (<! to-others)]
|
|
|
|
(push-event-to-others event)
|
|
|
|
(recur)))
|
|
|
|
(a/pipe to-database (a/map> #(dissoc % :ws-channel) db-chan))))
|