diff --git a/project.clj b/project.clj index c273d97..b9c1da8 100644 --- a/project.clj +++ b/project.clj @@ -11,7 +11,8 @@ [ring/ring-core "1.2.0"] [hiccup "1.0.4"] [prismatic/dommy "0.1.1"] - [core.async "0.1.0-SNAPSHOT"]] + [core.async "0.1.0-SNAPSHOT"] + [com.novemberain/monger "1.5.0"]] :plugins [[lein-cljsbuild "0.3.2"] [lein-ring "0.8.6"]] :cljsbuild { diff --git a/src-clj/grub/async.clj b/src-clj/grub/async.clj new file mode 100644 index 0000000..6cae084 --- /dev/null +++ b/src-clj/grub/async.clj @@ -0,0 +1,85 @@ +(ns grub.async + (:require [clojure.core.async :as async :refer [! chan go put! alts!]])) + +(defmacro go-loop [& body] + `(clojure.core.async/go + (while true + ~@body))) + +(defn put-all! [cs x] + (doseq [c cs] + (put! c x))) + +(defn fan-out [in cs-or-n] + (let [cs (if (number? cs-or-n) + (repeatedly cs-or-n chan) + cs-or-n)] + (go (loop [] + (let [x (! c x))) + c)) + +(defn map-chan + ([f source] (map-chan (chan) f source)) + ([c f source] + (go-loop + (>! c (f (! c v)))) + c)) + +(defn do-chan! [f source] + (go-loop + (let [v (! out v) + (f v))) + out)) + diff --git a/src-clj/grub/core.clj b/src-clj/grub/core.clj index 12f502a..0a30406 100644 --- a/src-clj/grub/core.clj +++ b/src-clj/grub/core.clj @@ -1,36 +1,15 @@ (ns grub.core - (:require [ring.middleware.reload :as reload] + (:require [grub.websocket :as ws] + [grub.db :as db] + [ring.middleware.reload :as reload] [compojure.core :refer [defroutes GET POST]] [compojure.handler :as handler] [compojure.route :as route] [org.httpkit.server :as httpkit] [hiccup [page :refer [html5]] - [page :refer [include-js include-css]]] - [clojure.core.async :as async :refer [! >!! chan go close! timeout]])) + [page :refer [include-js include-css]]])) -(def out-channels (atom [])) -(def channel-id-count (atom 0)) - -(defn push-grub-to-others [grub my-channel-id] - (let [other-channels (fn [] (filter #(not (= (:id %) my-channel-id)) @out-channels))] - (go (doseq [{ch :channel} (other-channels)] - (>! ch grub))))) - -(defn push-new-grubs-to-client [c ws-channel] - (go (while true - (let [grub (Page not found.

")) + (def app (let [dev? true] (if dev? @@ -56,4 +36,4 @@ (handler/site routes)))) (defn -main [& args] - (httpkit/run-server app {:port 3000})) + (httpkit/run-server app {:port 3000})) diff --git a/src-clj/grub/db.clj b/src-clj/grub/db.clj new file mode 100644 index 0000000..dd0c80a --- /dev/null +++ b/src-clj/grub/db.clj @@ -0,0 +1,62 @@ +(ns grub.db + (:require [grub.async :refer [go-loop]] + [monger.core :as m] + [monger.collection :as mc] + [monger.operators :as mo] + [clojure.core.async :as async :refer [! >!! chan go close! timeout]])) + +(def grub-collection "grubs") + +(def incoming-events (chan)) + +(defn get-incoming-events [] + incoming-events) + +(defmulti handle-event :event :default :unknown-event) + +(defmethod handle-event :create [event] + (let [grub (-> event + (select-keys [:_id :grub]) + (assoc :completed false))] + (mc/insert grub-collection grub))) + +(defmethod handle-event :complete [event] + (mc/update grub-collection + {:_id (:_id event)} + {mo/$set {:completed true}})) + +(defmethod handle-event :uncomplete [event] + (mc/update grub-collection + {:_id (:_id event)} + {mo/$set {:completed false}})) + +(defmethod handle-event :delete [event] + (mc/remove grub-collection {:_id (:_id event)})) + +(defmethod handle-event :unknown-event [event] + (println "Cannot handle unknown event:" event)) + +(defn handle-incoming-events [] + (go-loop (let [event ( grub + (select-keys [:_id :grub :completed]) + (assoc :event :create))] + (>! out grub-event)))) + out)) + +(defn connect-to-db [] + (println "Connect to db") + (m/connect!) + (m/set-db! (m/get-db "monger-test"))) + +(connect-to-db) +(handle-incoming-events) diff --git a/src-clj/grub/websocket.clj b/src-clj/grub/websocket.clj new file mode 100644 index 0000000..a16d799 --- /dev/null +++ b/src-clj/grub/websocket.clj @@ -0,0 +1,53 @@ +(ns grub.websocket + (:require [grub.async :refer [go-loop fan-out do-chan! copy-chan]] + [grub.db :as db] + [org.httpkit.server :as httpkit] + [clojure.core.async :refer [! chan go]])) + +(def incoming-events (chan)) + +(defn get-incoming-events [] + incoming-events) + +(def ws-channels (atom [])) +(def ws-channel-id-count (atom 0)) + +(defn push-event-to-others [orig-event] + (let [my-ws-channel-id (:ws-channel orig-event) + other-channels (fn [] (filter #(not (= (:id %) my-ws-channel-id)) @ws-channels)) + event (dissoc orig-event :ws-channel)] + (go (doseq [{ch :channel} (other-channels)] + (>! ch event))))) + +(defn push-current-grubs-to-client [c ws-channel] + (copy-chan c (db/get-current-grubs-as-events))) + +(defn push-received-events-to-client [c ws-channel] + (go-loop (let [event (! (get-incoming-events) event)))) + +(defn handle-incoming-events [] + (let [[incoming incoming'] (fan-out incoming-events 2)] + (do-chan! push-event-to-others incoming) + (go-loop (let [event (! (db/get-incoming-events) event))))) + +(defn websocket-handler [request] + (httpkit/with-channel request ws-channel + (let [ws-channel-id (swap! ws-channel-id-count inc) + c (chan)] + (swap! ws-channels conj {:id ws-channel-id :channel c}) + (println "Channel connected:" (.toString ws-channel)) + (httpkit/on-receive ws-channel #(add-incoming-event % ws-channel-id)) + (push-current-grubs-to-client c ws-channel) + (push-received-events-to-client c ws-channel)))) + +(handle-incoming-events) diff --git a/src-cljs/grub_client/view.cljs b/src-cljs/grub_client/view.cljs index b8128b9..ecdc97d 100644 --- a/src-cljs/grub_client/view.cljs +++ b/src-cljs/grub_client/view.cljs @@ -8,7 +8,7 @@ [cljs.core.async.macros :refer [go]])) (deftemplate grub-template [grub] - [:tr {:id (:id grub)} + [:tr {:id (:_id grub)} [:td [:div.checkbox.grubCheckbox [:label [:input {:type "checkbox"}] @@ -40,7 +40,7 @@ (dommy/prepend! (sel1 :body) (main-template))) (defn add-grub-to-dom [grub-obj] - (logs "Adding" grub-obj) + (logs "Add" grub-obj) (dommy/append! (sel1 :#grubList) (grub-template grub-obj))) (defn add-grub [grub] @@ -48,14 +48,14 @@ (defn complete-grub [grub] (logs "Complete" grub) - (aset (sel1 [(str "#" (:id grub)) "input"]) "checked" true)) + (aset (sel1 [(str "#" (:_id grub)) "input"]) "checked" true)) (defn uncomplete-grub [grub] (logs "Uncomplete" grub) - (aset (sel1 [(str "#" (:id grub)) "input"]) "checked" false)) + (aset (sel1 [(str "#" (:_id grub)) "input"]) "checked" false)) (defn delete-grub [grub] - (let [elem (sel1 (str "#" (:id grub)))] + (let [elem (sel1 (str "#" (:_id grub)))] (.removeChild (.-parentNode elem) elem))) (defn get-add-grub-text [] @@ -84,17 +84,16 @@ (get-grubs-from-enter)])] (->> grubs (filter-chan #(not (empty? %))) - (map-chan (fn [g] {:event :create :grub g :id (str "grub-" (.now js/Date))}))))) + (map-chan (fn [g] {:event :create :grub g :_id (str "grub-" (.now js/Date))}))))) (defn get-completed-event [event] - (logs "completed-event:" event) (let [target (.-target event) checked (.-checked target) event-type (if checked :complete :uncomplete) label (aget (.-labels (.-target event)) 0) grub (.-textContent label) id (.-id (.-parentNode (.-parentNode (.-parentNode (.-parentNode target)))))] - {:grub grub :id id :event event-type})) + {:grub grub :_id id :event event-type})) (defn get-completed-events [] (let [events (:chan (event-chan (sel1 :#grubList) "change")) @@ -107,6 +106,6 @@ :click #(go (>! click-events %))) (let [ids (map-chan #(.-id (.-parentNode (.-parentNode (.-target %)))) click-events) - grub-events (map-chan (fn [id] {:event :delete :id id}) ids)] + grub-events (map-chan (fn [id] {:event :delete :_id id}) ids)] grub-events)))