Add support for emit messages without waiting response on worker

This commit is contained in:
Andrey Antukh 2025-09-25 09:55:34 +02:00
parent 255f5af2e3
commit 1d54fe2e24
2 changed files with 29 additions and 7 deletions

View File

@ -21,7 +21,6 @@
:config {:public-uri cf/public-uri
:build-data cf/build-date
:version cf/version}})
(set! instance worker)))
(defn ask!
@ -34,6 +33,16 @@
(uw/ask! instance message transfer)
(rx/empty))))
(defn emit!
([message]
(if instance
(uw/emit! instance message)
(rx/empty)))
([message transfer]
(if instance
(uw/emit! instance message transfer)
(rx/empty))))
(defn ask-buffered!
([message]
(if instance

View File

@ -19,7 +19,7 @@
([worker message]
(send-message! worker message nil))
([worker {sender-id :sender-id :as message} {:keys [many?] :or {many? false}}]
([worker {sender-id :sender-id :as message} {:keys [many? ignore-response?] :or {many? false ignore-response? false}}]
(let [take-messages
(fn [ob]
(if many?
@ -34,11 +34,13 @@
(if (some? instance)
(do (.postMessage instance data transfer)
(->> (:stream worker)
(rx/filter #(= (:reply-to %) sender-id))
(take-messages)
(rx/filter (complement :dropped))
(rx/map handle-response)))
(if (not ignore-response?)
(->> (:stream worker)
(rx/filter #(= (:reply-to %) sender-id))
(take-messages)
(rx/filter (complement :dropped))
(rx/map handle-response))
(rx/empty)))
(rx/empty)))))
(defn ask!
@ -51,6 +53,17 @@
:payload message
:transfer transfer})))
(defn emit!
([worker message]
(emit! worker message nil))
([worker message transfer]
(send-message!
worker
{:sender-id (uuid/next)
:payload message
:transfer transfer}
{:ignore-response? true})))
(defn ask-many!
([worker message]
(ask-many! worker message nil))