diff --git a/backend/src/app/http/sse.clj b/backend/src/app/http/sse.clj index 1e6ef1f69b..7459d9224b 100644 --- a/backend/src/app/http/sse.clj +++ b/backend/src/app/http/sse.clj @@ -9,7 +9,6 @@ (:refer-clojure :exclude [tap]) (:require [app.common.data :as d] - [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.transit :as t] [app.http.errors :as errors] @@ -54,16 +53,20 @@ ::yres/status 200 ::yres/body (yres/stream-body (fn [_ output] - (binding [events/*channel* (sp/chan :buf buf :xf (keep encode))] - (let [listener (events/start-listener - (partial write! output) - (partial pu/close! output))] - (try + (let [channel (sp/chan :buf buf :xf (keep encode)) + listener (events/start-listener + channel + (partial write! output) + (partial pu/close! output))] + (try + (binding [events/*channel* channel] (let [result (handler)] - (events/tap :end result)) - (catch Throwable cause - (let [result (errors/handle' cause request)] - (events/tap :error result))) - (finally - (sp/close! events/*channel*) - (px/await! listener)))))))})) + (events/tap :end result))) + + (catch Throwable cause + (let [result (errors/handle' cause request)] + (events/tap channel :error result))) + + (finally + (sp/close! channel) + (px/await! listener))))))})) diff --git a/backend/src/app/util/events.clj b/backend/src/app/util/events.clj index a41843c6b1..b26810cb04 100644 --- a/backend/src/app/util/events.clj +++ b/backend/src/app/util/events.clj @@ -10,7 +10,6 @@ to them. Mainly used in http.sse for progress reporting." (:refer-clojure :exclude [tap run!]) (:require - [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] [promesa.exec :as px] @@ -18,33 +17,30 @@ (def ^:dynamic *channel* nil) -(defn channel - [] - (sp/chan :buf 32)) - (defn tap - [type data] - (when-let [channel *channel*] - (sp/put! channel [type data]) - nil)) + ([type data] + (when-let [channel *channel*] + (sp/put! channel [type data]) + nil)) + ([channel type data] + (when channel + (sp/put! channel [type data]) + nil))) (defn start-listener - [on-event on-close] - - (dm/assert! - "expected active events channel" - (sp/chan? *channel*)) + [channel on-event on-close] + (assert (sp/chan? channel) "expected active events channel") (px/thread {:virtual true} (try (loop [] - (when-let [event (sp/take! *channel*)] + (when-let [event (sp/take! channel)] (let [result (ex/try! (on-event event))] (if (ex/exception? result) (do (l/wrn :hint "unexpected exception" :cause result) - (sp/close! *channel*)) + (sp/close! channel)) (recur))))) (finally (on-close))))) @@ -55,7 +51,7 @@ [f on-event] (binding [*channel* (sp/chan :buf 32)] - (let [listener (start-listener on-event (constantly nil))] + (let [listener (start-listener *channel* on-event (constantly nil))] (try (f) (finally