This commit is contained in:
Andrey Antukh 2026-05-08 14:30:17 +02:00
parent c635d0532b
commit d58c1b88a9
4 changed files with 115 additions and 83 deletions

View File

@ -259,11 +259,16 @@
[config]
(let [public-uri (c/get config :public-uri)
public-uri (some-> public-uri (u/uri))
extra-flags (if (and public-uri
(= (:scheme public-uri) "http")
(not= (:host public-uri) "localhost"))
#{:disable-secure-session-cookies}
#{})]
extra-flags (cond-> #{}
;; When public-uri is http (non-localhost), disable secure cookies
(and public-uri
(= (:scheme public-uri) "http")
(not= (:host public-uri) "localhost"))
(conj :disable-secure-session-cookies)
;; When telemetry-enabled config is true, add :telemetry flag
(true? (c/get config :telemetry-enabled))
(conj :enable-telemetry))]
(flags/parse flags/default extra-flags (:flags config))))
(defn read-env

View File

@ -209,3 +209,14 @@
:cause cause)))
(rph/wrap nil)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; GET-ENABLED-FLAGS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(sv/defmethod ::get-enabled-flags
{::audit/skip true
::doc/skip true
::doc/added "1.20"}
[_cfg _params]
cf/flags)

View File

@ -68,11 +68,8 @@
ptk/WatchEvent
(watch [_ _ stream]
(rx/merge
(if (contains? cf/flags :audit-log)
(rx/of (ev/initialize))
(rx/empty))
(rx/of (dp/refresh-profile))
(rx/of (ev/initialize)
(dp/refresh-profile))
;; Watch for profile deletion events
(->> stream

View File

@ -25,6 +25,7 @@
[app.util.storage :as storage]
[beicon.v2.core :as rx]
[beicon.v2.operators :as rxo]
[cuerdas.core :as str]
[lambdaisland.uri :as u]
[potok.v2.core :as ptk]))
@ -376,83 +377,101 @@
(l/debug :hint "event instrumentation initialized")
(->> (rx/merge
(->> (rx/from-atom buffer)
(rx/filter #(pos? (count %)))
(rx/debounce 2000))
(->> stream
(rx/filter (ptk/type? :app.main.data.profile/logout))
(rx/observe-on :async)))
(rx/map (fn [_]
(into [] (take max-chunk-size) @buffer)))
(rx/with-latest-from profile)
(rx/mapcat (fn [[chunk profile-id]]
(let [events (filterv #(= profile-id (:profile-id %)) chunk)]
(->> (persist-events events)
(rx/tap (fn [_]
(l/debug :hint "events chunk persisted" :total (count chunk))))
(rx/map (constantly chunk))))))
(rx/take-until stopper)
(rx/subs! (fn [chunk]
(swap! buffer remove-from-buffer (count chunk)))
(fn [cause]
(l/error :hint "unexpected error on audit persistence" :cause cause))
(fn []
(l/debug :hint "audit persistence terminated"))))
(->> (rx/merge
(->> stream
(rx/with-latest-from profile)
(rx/map make-event))
(->> (user-input-observer)
(rx/with-latest-from profile)
(rx/map make-performance-event)
(rx/debounce debounce-browser-event-time))
(->> (longtask-observer)
(rx/with-latest-from profile)
(rx/map make-performance-event)
(rx/debounce debounce-longtask-time))
(if (and (exists? js/globalThis)
(exists? (.-requestAnimationFrame js/globalThis))
(exists? (.-scheduler js/globalThis))
(exists? (.-postTask (.-scheduler js/globalThis))))
(->> stream
;; Fetch backend flags and only start event collection if
;; :audit-log or :telemetry is enabled
(->> (rp/cmd! :get-enabled-flags)
(rx/mapcat (fn [flags]
(if (or (contains? flags :audit-log)
(contains? flags :telemetry))
(do
(l/debug :hint "event collection enabled" :flags (str/join " " (map name flags)))
(rx/of true))
(do
(l/debug :hint "event collection disabled (no audit-log or telemetry flag)")
(rx/empty)))))
(rx/take 1)
(rx/subs!
(fn [_]
;; Start the event collection pipeline
(->> (rx/merge
(->> (rx/from-atom buffer)
(rx/filter #(pos? (count %)))
(rx/debounce 2000))
(->> stream
(rx/filter (ptk/type? :app.main.data.profile/logout))
(rx/observe-on :async)))
(rx/map (fn [_]
(into [] (take max-chunk-size) @buffer)))
(rx/with-latest-from profile)
(rx/merge-map process-performance-event)
(rx/debounce debounce-performance-event-time))
(rx/empty)))
(rx/mapcat (fn [[chunk profile-id]]
(let [events (filterv #(= profile-id (:profile-id %)) chunk)]
(->> (persist-events events)
(rx/tap (fn [_]
(l/debug :hint "events chunk persisted" :total (count chunk))))
(rx/map (constantly chunk))))))
(rx/take-until stopper)
(rx/subs! (fn [chunk]
(swap! buffer remove-from-buffer (count chunk)))
(fn [cause]
(l/error :hint "unexpected error on audit persistence" :cause cause))
(fn []
(l/debug :hint "audit persistence terminated"))))
(rx/filter :profile-id)
(rx/map (fn [event]
(let [session* (or @session (ct/now))
context (-> @context
(merge (:context event))
(assoc :session session*)
(assoc :session-id cf/session-id)
(assoc :external-session-id (cf/external-session-id))
(add-external-context-info)
(d/without-nils))]
(reset! session session*)
(-> event
(assoc :timestamp (ct/now))
(assoc :context context)))))
(->> (rx/merge
(->> stream
(rx/with-latest-from profile)
(rx/map make-event))
(rx/tap (fn [event]
(l/debug :hint "event enqueued")
(swap! buffer append-to-buffer event)))
(->> (user-input-observer)
(rx/with-latest-from profile)
(rx/map make-performance-event)
(rx/debounce debounce-browser-event-time))
(rx/switch-map #(rx/timer session-timeout))
(rx/take-until stopper)
(rx/subs! (fn [_]
(l/debug :hint "session reinitialized")
(reset! session nil))
(fn [cause]
(l/error :hint "error on event batching stream" :cause cause))
(fn []
(l/debug :hitn "events batching stream terminated"))))))))
(->> (longtask-observer)
(rx/with-latest-from profile)
(rx/map make-performance-event)
(rx/debounce debounce-longtask-time))
(if (and (exists? js/globalThis)
(exists? (.-requestAnimationFrame js/globalThis))
(exists? (.-scheduler js/globalThis))
(exists? (.-postTask (.-scheduler js/globalThis))))
(->> stream
(rx/with-latest-from profile)
(rx/merge-map process-performance-event)
(rx/debounce debounce-performance-event-time))
(rx/empty)))
(rx/filter :profile-id)
(rx/map (fn [event]
(let [session* (or @session (ct/now))
context (-> @context
(merge (:context event))
(assoc :session session*)
(assoc :session-id cf/session-id)
(assoc :external-session-id (cf/external-session-id))
(add-external-context-info)
(d/without-nils))]
(reset! session session*)
(-> event
(assoc :timestamp (ct/now))
(assoc :context context)))))
(rx/tap (fn [event]
(l/debug :hint "event enqueued")
(swap! buffer append-to-buffer event)))
(rx/switch-map #(rx/timer session-timeout))
(rx/take-until stopper)
(rx/subs! (fn [_]
(l/debug :hint "session reinitialized")
(reset! session nil))
(fn [cause]
(l/error :hint "error on event batching stream" :cause cause))
(fn []
(l/debug :hint "events batching stream terminated")))))
(fn [cause]
(l/debug :hint "unable to fetch backend flags" :cause cause))))))))
(defn event
[props]