Synchronize added grubs with other clients

This commit is contained in:
Nicholas Kariniemi 2013-07-28 09:24:01 +03:00
parent 34ea58e11b
commit 8891e402d0
3 changed files with 68 additions and 38 deletions

View file

@ -5,7 +5,7 @@
:url "http://www.eclipse.org/legal/epl-v10.html"} :url "http://www.eclipse.org/legal/epl-v10.html"}
:source-paths ["src-clj"] :source-paths ["src-clj"]
:dependencies [[org.clojure/clojure "1.5.1"] :dependencies [[org.clojure/clojure "1.5.1"]
[http-kit "2.1.5"] [http-kit "2.1.8"]
[compojure "1.1.5"] [compojure "1.1.5"]
[ring/ring-devel "1.2.0"] [ring/ring-devel "1.2.0"]
[ring/ring-core "1.2.0"] [ring/ring-core "1.2.0"]

View file

@ -1,23 +1,36 @@
(ns grub.core (ns grub.core
(:use [org.httpkit.server
:only [run-server with-channel on-receive send! websocket?]]
[compojure.handler :only [site]]
[compojure.core :only [defroutes GET POST]])
(:require [ring.middleware.reload :as reload] (:require [ring.middleware.reload :as reload]
[compojure.core :refer [defroutes GET POST]]
[compojure.handler :as handler]
[compojure.route :as route] [compojure.route :as route]
[org.httpkit.server :as httpkit]
[hiccup [hiccup
[page :refer [html5]] [page :refer [html5]]
[page :refer [include-js include-css]]])) [page :refer [include-js include-css]]]
[clojure.core.async :as async :refer [<! >! >!! chan go close! timeout]]))
(defn async-handler [request] (def out-channels (atom []))
(if-not (:websocket? request) (def channel-id-count (atom 0))
{:status 200 :body "WebSocket server"}
(with-channel request channel (defn push-grub-to-others [grub my-channel-id]
(on-receive channel (fn [data] (let [other-channels (fn [] (filter #(not (= (:id %) my-channel-id)) @out-channels))]
(send! channel data))) (go (doseq [{ch :channel} (other-channels)]
(send! channel {:status 200 (>! ch grub)))))
:headers {"Content-Type" "text/plain"}
:body "Long polling?"})))) (defn push-new-grubs-to-client [c ws-channel]
(go (while true
(let [grub (<! c)]
(httpkit/send! ws-channel grub)))))
(defn websocket-handler [request]
(httpkit/with-channel request ws-channel
(let [channel-id (swap! channel-id-count inc)
c (chan)]
(swap! out-channels conj {:id channel-id :channel c})
(println "channel connected:" (.toString ws-channel))
(httpkit/on-receive ws-channel #(push-grub-to-others % channel-id))
(push-new-grubs-to-client c ws-channel))))
(defn index-page [] (defn index-page []
(html5 (html5
[:head [:head
@ -30,7 +43,7 @@
(include-js "/js/main.js")])) (include-js "/js/main.js")]))
(defroutes routes (defroutes routes
(GET "/ws" [] async-handler) (GET "/ws" [] websocket-handler)
(GET "/" [] (index-page)) (GET "/" [] (index-page))
(route/files "/") (route/files "/")
(route/not-found "<p>Page not found.</p>")) (route/not-found "<p>Page not found.</p>"))
@ -38,8 +51,8 @@
(def app (def app
(let [dev? true] (let [dev? true]
(if dev? (if dev?
(reload/wrap-reload (site #'routes)) (reload/wrap-reload (handler/site #'routes))
(site routes)))) (handler/site routes))))
(defn -main [& args] (defn -main [& args]
(run-server app {:port 8080})) (httpkit/run-server app {:port 3000}))

View file

@ -2,7 +2,7 @@
(:require [dommy.core :as dommy] (:require [dommy.core :as dommy]
[cljs.core.async :as async :refer [<! >! chan close! timeout]]) [cljs.core.async :as async :refer [<! >! chan close! timeout]])
(:require-macros [dommy.macros :refer [deftemplate sel1 node]] (:require-macros [dommy.macros :refer [deftemplate sel1 node]]
[cljs.core.async.macros :as m :refer [go alt!]] [cljs.core.async.macros :as m :refer [go alt! alts!]]
[grub-client.macros :refer [log]])) [grub-client.macros :refer [log]]))
(deftemplate grub-template [grub] (deftemplate grub-template [grub]
@ -27,12 +27,11 @@
[:span.input-group-btn [:span.input-group-btn
add-grub-btn]] add-grub-btn]]
[:table.table.table-condensed [:table.table.table-condensed
[:tbody#grubList [:tbody#grubList]]]
(for [grub grubs] (grub-template grub))]]]
[:div.col-lg-4]]]) [:div.col-lg-4]]])
(defn render-body [grubs] (defn render-body []
(dommy/prepend! (sel1 :body) (main-template grubs))) (dommy/prepend! (sel1 :body) (main-template)))
(defn push-new-grub [channel] (defn push-new-grub [channel]
(let [new-grub (dommy/value add-grub-text)] (let [new-grub (dommy/value add-grub-text)]
@ -60,10 +59,14 @@
(defn append-new-grub [grub] (defn append-new-grub [grub]
(dommy/append! (sel1 :#grubList) (grub-template grub))) (dommy/append! (sel1 :#grubList) (grub-template grub)))
(defn append-new-grubs [chan]
(go (while true
(let [grub (<! chan)]
(append-new-grub grub)))))
(defn add-grubs-to-list [in] (defn add-grubs-to-list [in]
(go (while true (go (while true
(let [new-grub (<! in)] (let [new-grub (<! in)]
(log "Added grub: " new-grub)
(append-new-grub new-grub))))) (append-new-grub new-grub)))))
(defn filter-empty-grubs [in] (defn filter-empty-grubs [in]
@ -73,21 +76,35 @@
(when-not (empty? grub) (>! out grub))))) (when-not (empty? grub) (>! out grub)))))
out)) out))
(defn add-new-grubs-to-list [] (def websocket* (atom nil))
(let [added-grubs (get-added-grubs)
filtered-grubs (filter-empty-grubs added-grubs)]
(add-grubs-to-list filtered-grubs)))
(def test-grubs (defn push-grubs-to-server [chan]
["8 dl water" (let [websocket (js/WebSocket. "ws://localhost:3000/ws")]
"8 whole peppercorns" (aset websocket "onmessage" (fn [event]
"2 bay leaves" (let [grub (.-data event)]
"1 - 2 (150 g) onions" (log "Received grub:" grub)
"2 dl cream" (append-new-grub grub))))
"1 dl dill"]) (go (while true
(let [grub (<! chan)]
(.send websocket grub))))))
(defn fan-out [in num-chans]
(let [out-channels (repeatedly num-chans chan)]
(go (while true
(let [x (<! in)]
(doseq [out out-channels]
(>! out x)))))
out-channels))
(defn add-new-grubs-as-they-come []
(let [added-grubs (get-added-grubs)
filtered-grubs (filter-empty-grubs added-grubs)
out-channels (fan-out filtered-grubs 2)]
(append-new-grubs (first out-channels))
(push-grubs-to-server (second out-channels))))
(defn init [] (defn init []
(render-body test-grubs) (render-body)
(add-new-grubs-to-list)) (add-new-grubs-as-they-come))
(init) (init)