From 758c70f7c317194fc29f7158cc3320aa56c286c9 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Thu, 19 Dec 2019 13:13:08 +0100 Subject: [PATCH] :construction: More work on collaborative edition (in real time). --- backend/src/uxbox/http/ws.clj | 41 +++-- .../services/mutations/project_pages.clj | 22 ++- frontend/src/uxbox/main/data/workspace.cljs | 164 +++++++++++++++--- .../uxbox/main/data/workspace_websocket.cljs | 60 +++++-- frontend/src/uxbox/main/ui/workspace.cljs | 12 +- .../src/uxbox/main/ui/workspace/header.cljs | 15 +- .../src/uxbox/main/ui/workspace/viewport.cljs | 77 ++++---- frontend/src/uxbox/main/websockets.cljs | 1 - 8 files changed, 282 insertions(+), 110 deletions(-) diff --git a/backend/src/uxbox/http/ws.clj b/backend/src/uxbox/http/ws.clj index 734c56e764..1ccc60e608 100644 --- a/backend/src/uxbox/http/ws.clj +++ b/backend/src/uxbox/http/ws.clj @@ -48,7 +48,7 @@ (fn [ws message] (:type message))) (defmethod handle-message :connect - [ws {:keys [file-id user-id] :as message}] + [{:keys [file-id user-id] :as ws} message] (let [local (swap! state assoc-in [file-id user-id] ws) sessions (get local file-id) message {:type :who :users (set (keys sessions))}] @@ -66,22 +66,35 @@ (let [users (keys (get @state file-id))] (send! ws {:type :who :users (set users)}))) -;; --- Handler +(defmethod handle-message :pointer-update + [{:keys [user-id file-id] :as ws} message] + (let [sessions (->> (vals (get @state file-id)) + (remove #(= user-id (:user-id %)))) + message (assoc message :user-id user-id)] + (run! #(send! % message) sessions))) -(declare start-eventbus-consumer!) +(defn- on-eventbus-message + [{:keys [file-id user-id] :as ws} {:keys [body] :as message}] + (send! ws body)) + +(defn- start-eventbus-consumer! + [vsm ws fid] + (let [topic (str "internal.uxbox.file." fid)] + (ve/consumer vsm topic #(on-eventbus-message ws %2)))) + +;; --- Handler (defn handler [{:keys [user] :as req}] (letfn [(on-init [ws] (let [vsm (::vw/execution-context req) fid (get-in req [:path-params :file-id]) + ws (assoc ws + :user-id user + :file-id fid) sem (start-eventbus-consumer! vsm ws fid)] - - (handle-message ws {:type :connect :file-id fid :user-id user}) - (assoc ws - ::sem sem - :user-id user - :file-id fid))) + (handle-message ws {:type :connect}) + (assoc ws ::sem sem))) (on-message [ws message] (try @@ -103,16 +116,6 @@ :on-message on-message :on-close on-close)))) -(defn- on-eventbus-message - [ws {:keys [body] :as message}] - ;; TODO - (ws-send! ws body)) - -(defn- start-eventbus-consumer! - [vsm ws fid] - (let [topic (str "internal.uxbox.file." fid)] - (ve/consumer vsm topic #(on-eventbus-message ws %2)))) - ;; --- Internal (vertx api) (experimental) (defrecord WebSocket [on-init on-message on-close] diff --git a/backend/src/uxbox/services/mutations/project_pages.clj b/backend/src/uxbox/services/mutations/project_pages.clj index fe81faa7f8..1e27df39b7 100644 --- a/backend/src/uxbox/services/mutations/project_pages.clj +++ b/backend/src/uxbox/services/mutations/project_pages.clj @@ -8,17 +8,18 @@ (:require [clojure.spec.alpha :as s] [promesa.core :as p] + [uxbox.common.pages :as cp] [uxbox.db :as db] [uxbox.services.mutations :as sm] [uxbox.services.mutations.project-files :as files] [uxbox.services.queries.project-pages :refer [decode-row]] [uxbox.services.util :as su] - [uxbox.common.pages :as cp] - [uxbox.util.exceptions :as ex] [uxbox.util.blob :as blob] - [uxbox.util.sql :as sql] + [uxbox.util.exceptions :as ex] [uxbox.util.spec :as us] - [uxbox.util.uuid :as uuid])) + [uxbox.util.sql :as sql] + [uxbox.util.uuid :as uuid] + [vertx.eventbus :as ve])) ;; --- Helpers & Specs @@ -100,7 +101,7 @@ [conn {:keys [user-id id version data operations]}] (let [sql "insert into project_page_snapshots (user_id, page_id, version, data, operations) values ($1, $2, $3, $4, $5) - returning id, version, operations"] + returning id, page_id, user_id, version, operations"] (db/query-one conn [sql user-id id version data operations]))) ;; --- Mutation: Rename Page @@ -169,7 +170,14 @@ (-> (update-page-data conn page) (p/then (fn [_] (insert-page-snapshot conn page))) - (p/then (fn [s] (retrieve-lagged-operations conn s params)))))) + (p/then (fn [s] + (let [topic (str "internal.uxbox.file." (:file-id page))] + (p/do! (ve/publish! uxbox.core/system topic {:type :page-snapshot + :user-id (:user-id s) + :page-id (:page-id s) + :version (:version s) + :operations ops}) + (retrieve-lagged-operations conn s params)))))))) (su/defstr sql:lagged-snapshots "select s.id, s.operations @@ -182,7 +190,7 @@ (let [sql sql:lagged-snapshots] (-> (db/query conn [sql (:id params) (:version params) #_(:id snapshot)]) (p/then (fn [rows] - {:id (:id params) + {:page-id (:id params) :version (:version snapshot) :operations (into [] (comp (map decode-row) (map :operations) diff --git a/frontend/src/uxbox/main/data/workspace.cljs b/frontend/src/uxbox/main/data/workspace.cljs index 1d9615d993..3cb19ef29d 100644 --- a/frontend/src/uxbox/main/data/workspace.cljs +++ b/frontend/src/uxbox/main/data/workspace.cljs @@ -30,7 +30,9 @@ [uxbox.util.spec :as us] [uxbox.util.transit :as t] [uxbox.util.time :as dt] - [uxbox.util.uuid :as uuid])) + [uxbox.util.uuid :as uuid] + [vendor.randomcolor])) + ;; TODO: temporal workaround (def clear-ruler nil) @@ -113,6 +115,128 @@ (defn interrupt? [e] (= e :interrupt)) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Websockets Events +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; --- Initialize WebSocket + +(declare fetch-users) +(declare handle-who) +(declare handle-pointer-update) +(declare handle-page-snapshot) +(declare shapes-changes-commited) + +(s/def ::type keyword?) +(s/def ::message + (s/keys :req-un [::type])) + +(defn initialize-ws + [file-id] + (ptk/reify ::initialize + ptk/UpdateEvent + (update [_ state] + (let [uri (str "ws://localhost:6060/sub/" file-id)] + (assoc-in state [:ws file-id] (ws/open uri)))) + + ptk/WatchEvent + (watch [_ state stream] + (let [wsession (get-in state [:ws file-id])] + (->> (rx/merge + (rx/of (fetch-users file-id)) + (->> (ws/-stream wsession) + (rx/filter #(= :message (:type %))) + (rx/map (comp t/decode :payload)) + (rx/filter #(s/valid? ::message %)) + (rx/map (fn [{:keys [type] :as msg}] + (case type + :who (handle-who msg) + :pointer-update (handle-pointer-update msg) + :page-snapshot (handle-page-snapshot msg) + ::unknown))))) + + + (rx/take-until + (rx/filter #(= ::finalize %) stream))))))) + +;; --- Finalize Websocket + +(defn finalize-ws + [file-id] + (ptk/reify ::finalize + ptk/WatchEvent + (watch [_ state stream] + (ws/-close (get-in state [:ws file-id])) + (rx/of ::finalize)))) + +;; --- Fetch Workspace Users + +(declare users-fetched) + +(defn fetch-users + [file-id] + (ptk/reify ::fetch-users + ptk/WatchEvent + (watch [_ state stream] + (->> (rp/query :project-file-users {:file-id file-id}) + (rx/map users-fetched))))) + +(defn users-fetched + [users] + (ptk/reify ::users-fetched + ptk/UpdateEvent + (update [_ state] + (reduce (fn [state user] + (update-in state [:workspace-users :by-id (:id user)] merge user)) + state + users)))) + +;; --- Handle: Who + +;; TODO: assign color + +(defn- assign-user-color + [state user-id] + (let [user (get-in state [:workspace-users :by-id user-id]) + color (js/randomcolor) + user (if (string? (:color user)) + user + (assoc user :color color))] + (prn "assign-user-color" user-id) + (assoc-in state [:workspace-users :by-id user-id] user))) + +(defn handle-who + [{:keys [users] :as msg}] + (s/assert set? users) + (ptk/reify ::handle-who + ptk/UpdateEvent + (update [_ state] + (prn "handle-who" users) + (as-> state $$ + (assoc-in $$ [:workspace-users :active] users) + (reduce assign-user-color $$ users))))) + +(defn handle-pointer-update + [{:keys [user-id page-id x y] :as msg}] + (ptk/reify ::handle-pointer-update + ptk/UpdateEvent + (update [_ state] + (assoc-in state [:workspace-users :pointer user-id] + {:page-id page-id + :user-id user-id + :x x + :y y})))) + +(defn handle-page-snapshot + [{:keys [user-id page-id version operations] :as msg}] + (ptk/reify ::handle-page-snapshot + ptk/WatchEvent + (watch [_ state stream] + (let [local (:workspace-local state)] + (when (= (:page-id local) page-id) + (prn "handle-page-snapshot" msg) + (rx/of (shapes-changes-commited msg))))))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; General workspace events ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -132,7 +256,6 @@ :tooltip nil}) (declare initialized) -;; (declare watch-events) (defn initialize "Initialize the workspace state." @@ -147,9 +270,6 @@ :page-id page-id)] (-> state (assoc :workspace-layout default-layout) - ;; (update :workspace-layout - ;; (fn [data] - ;; (if (nil? data) default-layout data))) (assoc :workspace-local local)))) ptk/WatchEvent @@ -172,16 +292,18 @@ (rx/mapcat #(rx/of (initialized file-id page-id) #_(initialize-alignment page-id)))) - ;; When workspace is initialized, run the event watchers. - (->> (rx/filter (ptk/type? ::initialized) stream) - (rx/take 1) - (rx/ignore)))) + (->> stream + (rx/filter uxbox.main.ui.workspace.streams/pointer-event?) + (rx/sample 150) + (rx/tap (fn [{:keys [pt] :as event}] + (let [msg {:type :pointer-update + :page-id page-id + :x (:x pt) + :y (:y pt)}] + (ws/-send (get-in state [:ws file-id]) (t/encode msg))))) + (rx/ignore) + (rx/take-until (rx/filter #(= ::stop-watcher %) stream))))))) - ptk/EffectEvent - (effect [_ state stream] - ;; Optimistic prefetch of projects if them are not already fetched - #_(when-not (seq (:projects state)) - (st/emit! (dp/fetch-projects)))))) (defn- initialized [file-id page-id] @@ -212,7 +334,6 @@ (disj flags flag) (conj flags flag))))))) - ;; --- Workspace Flags (defn activate-flag @@ -235,7 +356,6 @@ (update [_ state] (update-in state [:workspace-local :flags] disj flag)))) - (defn toggle-flag [flag] (s/assert keyword? flag) @@ -851,8 +971,6 @@ (rx/of (commit-shapes-changes changes) #(dissoc state ::tmp-changes))))))) -(declare shapes-changes-commited) - (defn commit-shapes-changes [operations] (s/assert ::cp/operations operations) @@ -871,21 +989,23 @@ :version (:version page) :operations operations}] (->> (rp/mutation :update-project-page params) + (rx/tap #(prn "KAKAKAKA" %)) (rx/map shapes-changes-commited)))))) (s/def ::shapes-changes-commited - (s/keys :req-un [::id ::version ::cp/operations])) + (s/keys :req-un [::page-id ::version ::cp/operations])) (defn shapes-changes-commited - [{:keys [id version operations] :as params}] + [{:keys [page-id version operations] :as params}] + (prn "shapes-changes-commited" params) (s/assert ::shapes-changes-commited params) (ptk/reify ::shapes-changes-commited ptk/UpdateEvent (update [_ state] (-> state (assoc-in [:workspace-page :version] version) - (assoc-in [:pages id :version] version) - (update-in [:pages-data id] cp/process-ops operations) + (assoc-in [:pages page-id :version] version) + (update-in [:pages-data page-id] cp/process-ops operations) (update :workspace-data cp/process-ops operations))))) ;; --- Start shape "edition mode" diff --git a/frontend/src/uxbox/main/data/workspace_websocket.cljs b/frontend/src/uxbox/main/data/workspace_websocket.cljs index aae776dc3f..353ddafa6f 100644 --- a/frontend/src/uxbox/main/data/workspace_websocket.cljs +++ b/frontend/src/uxbox/main/data/workspace_websocket.cljs @@ -24,6 +24,8 @@ (declare fetch-users) (declare handle-who) +(declare handle-pointer-update) +(declare handle-page-snapshot) (s/def ::type keyword?) (s/def ::message @@ -35,29 +37,37 @@ ptk/UpdateEvent (update [_ state] (let [uri (str "ws://localhost:6060/sub/" file-id)] - (assoc-in state [::ws file-id] (ws/open uri)))) + (assoc-in state [:ws file-id] (ws/open uri)))) ptk/WatchEvent (watch [_ state stream] - (rx/merge - (rx/of (fetch-users file-id)) - (->> (ws/-stream (get-in state [::ws file-id])) - (rx/filter #(= :message (:type %))) - (rx/map (comp t/decode :payload)) - (rx/filter #(s/valid? ::message %)) - (rx/map (fn [{:keys [type] :as msg}] - (case type - :who (handle-who msg) - ::unknown)))))))) + (let [wsession (get-in state [:ws file-id])] + (->> (rx/merge + (rx/of (fetch-users file-id)) + (->> (ws/-stream wsession) + (rx/filter #(= :message (:type %))) + (rx/map (comp t/decode :payload)) + (rx/filter #(s/valid? ::message %)) + (rx/map (fn [{:keys [type] :as msg}] + (case type + :who (handle-who msg) + :pointer-update (handle-pointer-update msg) + :page-snapshot (handle-page-snapshot msg) + ::unknown))))) + + + (rx/take-until + (rx/filter #(= ::finalize %) stream))))))) ;; --- Finalize Websocket (defn finalize [file-id] (ptk/reify ::finalize - ptk/EffectEvent - (effect [_ state stream] - (ws/-close (get-in state [::ws file-id]))))) + ptk/WatchEvent + (watch [_ state stream] + (ws/-close (get-in state [:ws file-id])) + (rx/of ::finalize)))) ;; --- Fetch Workspace Users @@ -93,3 +103,25 @@ ptk/UpdateEvent (update [_ state] (assoc-in state [:workspace-users :active] users)))) + +(defn handle-pointer-update + [{:keys [user-id page-id x y] :as msg}] + (ptk/reify ::handle-pointer-update + ptk/UpdateEvent + (update [_ state] + (assoc-in state [:workspace-users :pointer user-id] + {:page-id page-id + :user-id user-id + :x x + :y y})))) + +(defn handle-page-snapshot + [{:keys [user-id page-id version operations :as msg]}] + (ptk/reify ::handle-page-snapshot + ptk/UpdateEvent + (update [_ state] + (-> state + (assoc-in [:workspace-page :version] version) + (assoc-in [:pages page-id :version] version) + (update-in [:pages-data page-id] cp/process-ops operations) + (update :workspace-data cp/process-ops operations))))) diff --git a/frontend/src/uxbox/main/ui/workspace.cljs b/frontend/src/uxbox/main/ui/workspace.cljs index bab162d0e0..b0feb5a002 100644 --- a/frontend/src/uxbox/main/ui/workspace.cljs +++ b/frontend/src/uxbox/main/ui/workspace.cljs @@ -96,6 +96,12 @@ (mf/defc workspace [{:keys [file-id page-id] :as props}] + (mf/use-effect + {:deps #js [(str file-id)] + :fn (fn [] + (st/emit! (dw/initialize-ws file-id)) + #(st/emit! (dw/finalize-ws file-id)))}) + (mf/use-effect {:deps #js [(str file-id) (str page-id)] @@ -104,12 +110,6 @@ (st/emit! (dw/initialize file-id page-id)) #(rx/cancel! sub)))}) - (mf/use-effect - {:deps #js [(str file-id)] - :fn (fn [] - (st/emit! (dws/initialize file-id)) - #(st/emit! (dws/finalize file-id)))}) - (let [layout (mf/deref refs/workspace-layout) file (mf/deref refs/workspace-file) page (mf/deref refs/workspace-page) diff --git a/frontend/src/uxbox/main/ui/workspace/header.cljs b/frontend/src/uxbox/main/ui/workspace/header.cljs index 665b1fcb6a..22bf6adc30 100644 --- a/frontend/src/uxbox/main/ui/workspace/header.cljs +++ b/frontend/src/uxbox/main/ui/workspace/header.cljs @@ -41,24 +41,25 @@ ;; --- Header Users -(mf/defc user-item +(mf/defc user-widget [{:keys [user self?] :as props}] [:li.tooltip.tooltip-bottom {:alt (:fullname user) :on-click (when self? #(st/emit! (rt/navigate :settings/profile)))} - [:img {:src "/images/avatar.jpg"}]]) + [:img {:style {:border-color (:color user)} + :src "/images/avatar.jpg"}]]) -(mf/defc users-list +(mf/defc active-users [props] (let [profile (mf/deref refs/profile) users (mf/deref refs/workspace-users)] [:ul.user-multi - [:& user-item {:user profile :self? true}] + [:& user-widget {:user profile :self? true}] (for [id (->> (:active users) (remove #(= % (:id profile))))] - [:& user-item {:user (get-in users [:by-id id]) - :key id}])])) + [:& user-widget {:user (get-in users [:by-id id]) + :key id}])])) ;; --- Header Component @@ -80,7 +81,7 @@ :on-click #(st/emit! (dw/toggle-layout-flag :sitemap))} [:span (:project-name file) " / " (:name file)]] - [:& users-list] + [:& active-users] [:div.workspace-options [:ul.options-btn diff --git a/frontend/src/uxbox/main/ui/workspace/viewport.cljs b/frontend/src/uxbox/main/ui/workspace/viewport.cljs index 9c4b032798..9810cc8121 100644 --- a/frontend/src/uxbox/main/ui/workspace/viewport.cljs +++ b/frontend/src/uxbox/main/ui/workspace/viewport.cljs @@ -143,7 +143,7 @@ ;; --- Viewport -(declare remote-user-cursor) +(declare remote-user-cursors) (mf/defc canvas-and-shapes {:wrap [mf/wrap-memo]} @@ -295,42 +295,51 @@ ;; -- METER CURSOR MULTIUSUARIO - ;;[:& remote-user-cursor] + [:& remote-user-cursors {:page page}] [:& selrect {:data (:selrect local)}]]]))) (mf/defc remote-user-cursor - [props] - [:g.multiuser-cursor #_{:transform "translate(100, 100) scale(2)"} - [:svg {:x "100" - :y "100" - :style {:fill "#000"} - :width "106.824" - :height "20.176" - :viewBox "0 0 28.264 5.338"} - [:path {:d "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 0l-.313.314L1.139.95l.314-.314zm-.5.5l-.315.316-3.39-3.39.315-.315 3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z" - :font-family "sans-serif"}] - [:g {:transform "translate(0 -291.708)"} - [:rect {:width "21.415" - :height "5.292" - :x "6.849" - :y "291.755" - :fill-opacity ".893" - :paint-order "stroke fill markers" - :rx ".794" - :ry ".794"}] - [:text {:x "9.811" - :y "295.216" - :fill "#fff" - :stroke-width ".265" - :font-family "Open Sans" - :font-size"2.91" - :font-weight "400" - :letter-spacing"0" - :line-height "1.25" - :word-spacing "0" - ;; :style="line-height:1 - } - "User 2"]]]]) + [{:keys [pointer user] :as props}] + [:g.multiuser-cursor {:key (:user-id pointer) + :transform (str "translate(" (:x pointer) "," (:y pointer) ") scale(4)")} + [:path {:fill (:color user) + :d "M5.292 4.027L1.524.26l-.05-.01L0 0l.258 1.524 3.769 3.768zm-.45 0l-.313.314L1.139.95l.314-.314zm-.5.5l-.315.316-3.39-3.39.315-.315 3.39 3.39zM1.192.526l-.668.667L.431.646.64.43l.552.094z" + :font-family "sans-serif"}] + [:g {:transform "translate(0 -291.708)"} + [:rect {:width "21.415" + :height "5.292" + :x "6.849" + :y "291.755" + :fill (:color user) + :fill-opacity ".893" + :paint-order "stroke fill markers" + :rx ".794" + :ry ".794"}] + [:text {:x "9.811" + :y "295.216" + :fill "#fff" + :stroke-width ".265" + :font-family "Open Sans" + :font-size"2.91" + :font-weight "400" + :letter-spacing"0" + :style {:line-height "1.25"} + :word-spacing "0" + ;; :style="line-height:1 + } + (:fullname user)]]]) + +(mf/defc remote-user-cursors + [{:keys [page] :as props}] + (let [users (mf/deref refs/workspace-users) + pointers (->> (vals (:pointer users)) + (remove #(not= (:id page) (:page-id %))) + (filter #((:active users) (:user-id %))))] + (for [pointer pointers] + (let [user (get-in users [:by-id (:user-id pointer)])] + [:& remote-user-cursor {:pointer pointer + :user user + :key (:user-id pointer)}])))) diff --git a/frontend/src/uxbox/main/websockets.cljs b/frontend/src/uxbox/main/websockets.cljs index ce17975d8c..27a691b7ad 100644 --- a/frontend/src/uxbox/main/websockets.cljs +++ b/frontend/src/uxbox/main/websockets.cljs @@ -48,4 +48,3 @@ (ev/unlistenByKey lk1) (ev/unlistenByKey lk2) (ev/unlistenByKey lk3))))) -