From 6891826c787bb3a6b485bd880a777bf98a021d45 Mon Sep 17 00:00:00 2001 From: "alonso.torres" Date: Tue, 30 Mar 2021 14:31:57 +0200 Subject: [PATCH] :sparkles: Adds a worker message buffer for selection queries --- .../app/main/ui/workspace/viewport/hooks.cljs | 9 ++- frontend/src/app/main/worker.cljs | 4 + frontend/src/app/util/worker.cljs | 71 ++++++++++------- frontend/src/app/worker.cljs | 76 ++++++++++++++++++- 4 files changed, 128 insertions(+), 32 deletions(-) diff --git a/frontend/src/app/main/ui/workspace/viewport/hooks.cljs b/frontend/src/app/main/ui/workspace/viewport/hooks.cljs index 34d1acfcbe..44da4fc077 100644 --- a/frontend/src/app/main/ui/workspace/viewport/hooks.cljs +++ b/frontend/src/app/main/ui/workspace/viewport/hooks.cljs @@ -96,10 +96,11 @@ (mf/deps page-id) (fn [point] (let [rect (gsh/center->rect point 8 8)] - (uw/ask! {:cmd :selection/query - :page-id page-id - :rect rect - :include-frames? true})))) + (uw/ask-buffered! + {:cmd :selection/query + :page-id page-id + :rect rect + :include-frames? true})))) ;; We use ref so we don't recreate the stream on a change transform-ref (mf/use-ref nil) diff --git a/frontend/src/app/main/worker.cljs b/frontend/src/app/main/worker.cljs index b0569216aa..8c171f7117 100644 --- a/frontend/src/app/main/worker.cljs +++ b/frontend/src/app/main/worker.cljs @@ -25,3 +25,7 @@ (defn ask! [message] (uw/ask! instance message)) + +(defn ask-buffered! + [message] + (uw/ask-buffered! instance message)) diff --git a/frontend/src/app/util/worker.cljs b/frontend/src/app/util/worker.cljs index e3f9440113..acf34fd53a 100644 --- a/frontend/src/app/util/worker.cljs +++ b/frontend/src/app/util/worker.cljs @@ -17,39 +17,58 @@ (declare handle-response) (defrecord Worker [instance stream]) -(defn ask! - [w message] - (let [sender-id (uuid/next) - data (t/encode {:payload message :sender-id sender-id}) - instance (:instance w)] +(defn- send-message! [worker {sender-id :sender-id :as message}] + (let [data (t/encode message) + instance (:instance worker)] (.postMessage instance data) - (->> (:stream w) + (->> (:stream worker) (rx/filter #(= (:reply-to %) sender-id)) - (rx/map handle-response) - (rx/first)))) + (rx/take 1) + (rx/map handle-response)))) + +(defn ask! + [worker message] + (send-message! + worker + {:sender-id (uuid/next) + :payload message})) + +(defn ask-buffered! + [worker message] + (send-message! + worker + {:sender-id (uuid/next) + :payload message + :buffer? true})) (defn init "Return a initialized webworker instance." [path on-error] - (let [ins (js/Worker. path) - bus (rx/subject) - wrk (Worker. ins bus)] - (.addEventListener ins "message" - (fn [event] - (let [data (.-data event) - data (t/decode data)] - (if (:error data) - (on-error (:error data)) - (rx/push! bus data))))) - (.addEventListener ins "error" - (fn [error] - (on-error wrk (.-data error)))) + (let [instance (js/Worker. path) + bus (rx/subject) + worker (Worker. instance bus) - wrk)) + handle-message + (fn [event] + (let [data (.-data event) + data (t/decode data)] + (if (:error data) + (on-error (:error data)) + (rx/push! bus data)))) + + handle-error + (fn [error] + (on-error worker (.-data error)))] + + (.addEventListener instance "message" handle-message) + (.addEventListener instance "error" handle-error) + + worker)) (defn- handle-response - [{:keys [payload error] :as response}] - (if-let [{:keys [data message]} error] - (throw (ex-info message data)) - payload)) + [{:keys [payload error dropped] :as response}] + (when-not dropped + (if-let [{:keys [data message]} error] + (throw (ex-info message data)) + payload))) diff --git a/frontend/src/app/worker.cljs b/frontend/src/app/worker.cljs index d26963dfd2..604b7eb4f8 100644 --- a/frontend/src/app/worker.cljs +++ b/frontend/src/app/worker.cljs @@ -28,14 +28,23 @@ ;; --- Messages Handling (s/def ::cmd keyword?) + (s/def ::payload (s/keys :req-un [::cmd])) (s/def ::sender-id uuid?) + +(s/def ::buffer? boolean?) + (s/def ::message - (s/keys :req-un [::payload ::sender-id])) + (s/keys + :req-opt [::buffer?] + :req-un [::payload ::sender-id])) + +(def buffer (rx/subject)) (defn- handle-message + "Process the message and returns to the client" [{:keys [sender-id payload] :as message}] (us/assert ::message message) (try @@ -68,20 +77,83 @@ :message (ex-message e)}}] (.postMessage js/self (t/encode message)))))) +(defn- drop-message + "Sends to the client a notifiction that its messages have been dropped" + [{:keys [sender-id payload] :as message}] + (us/assert ::message message) + (.postMessage js/self (t/encode {:reply-to sender-id + :dropped true}))) + +(defn subscribe-buffer-messages + "Creates a subscription to process the buffer messages" + [] + (let [empty [{} [] ::clear]] + (->> buffer + + ;; We want async processing to not block the main loop + (rx/observe-on :async) + + ;; This scan will store the last message per type in `messages` + ;; when a previous message is dropped is stored in `dropped` + ;; we also store the last message processed in order to detect + ;; posible infinite loops + (rx/scan + (fn [[messages dropped last] message] + (let [cmd (get-in message [:payload :cmd]) + + ;; The previous message is dropped + dropped + (cond-> dropped + (contains? messages cmd) + (conj (get messages cmd))) + + ;; This is the new "head" for its type + messages + (assoc messages cmd message)] + + ;; When a "clear" message is detected we empty the buffer + (if (= message ::clear) + empty + [messages dropped message]))) + + empty) + + ;; 1ms debounce, after 1ms without messages will process the buffer + (rx/debounce 1) + + (rx/subs (fn [[messages dropped last]] + ;; Send back the dropped messages replies + (doseq [msg dropped] + (drop-message msg)) + + ;; Process the message + (doseq [msg (vals messages)] + (handle-message msg)) + + ;; After process the buffer we send a clear + (when-not (= last ::clear) + (rx/push! buffer ::clear))))))) + +(defonce process-message-sub (subscribe-buffer-messages)) + (defn- on-message [event] (when (nil? (.-source event)) (let [message (.-data event) message (t/decode message)] - (handle-message message)))) + (if (:buffer? message) + (rx/push! buffer message) + (handle-message message))))) (.addEventListener js/self "message" on-message) (defn ^:dev/before-load stop [] + (rx/-dispose process-message-sub) (.removeEventListener js/self "message" on-message)) (defn ^:dev/after-load start [] [] + (set! process-message-sub (subscribe-buffer-messages)) (.addEventListener js/self "message" on-message))