This commit is contained in:
Andrey Antukh 2026-05-08 14:09:26 +02:00
parent d56c452288
commit c635d0532b
5 changed files with 96 additions and 87 deletions

View File

@ -238,6 +238,16 @@
(some? tnow)
(assoc :tracked-at tnow))))
(def ^:private xf:filter-safe-props
"Transducer that keeps only map entries whose values are UUIDs."
(filter (fn [[_ v]] (uuid? v))))
(defn filter-safe-props
"Return only UUID-valued keys from a props map. This preserves
object relations while maintaining anonymity."
[props]
(into {} xf:filter-safe-props props))
(defn- append-audit-entry
[cfg params]
(let [params (-> params
@ -287,9 +297,10 @@
:version
:client-version
:client-user-agent]))
safe-props (filter-safe-props (:props params {}))
params (-> params
(assoc :source "telemetry")
(assoc :props {})
(assoc :source "telemetry:backend")
(assoc :props safe-props)
(assoc :context safe-context)
(assoc :ip-addr (db/inet "0.0.0.0"))
(assoc :created-at tday)

View File

@ -15,7 +15,7 @@
[app.config :as cf]
[app.db :as db]
[app.http :as-alias http]
[app.loggers.audit :as-alias audit]
[app.loggers.audit :as audit]
[app.loggers.database :as loggers.db]
[app.loggers.mattermost :as loggers.mm]
[app.rpc :as-alias rpc]
@ -124,16 +124,17 @@
(comp
(map adjust-timestamp)
(map (fn [event]
(let [tday (ct/truncate (::audit/created-at event) :days)]
(let [tday (ct/truncate (::audit/created-at event) :days)
safe-props (audit/filter-safe-props (::audit/props event {}))]
[(::audit/id event)
(::audit/name event)
"telemetry"
"telemetry:frontend"
(::audit/type event)
tday
tday
(::audit/profile-id event)
(db/inet "0.0.0.0")
(db/tjson {})
(db/tjson safe-props)
(db/tjson (filter-safe-context (::audit/context event {})))])))))
(defn- handle-events

View File

@ -145,7 +145,7 @@
(def ^:private sql:get-counters
"SELECT name, count(*) AS count
FROM audit_log
WHERE source IN ('backend', 'frontend', 'telemetry')
WHERE source IN ('backend', 'frontend', 'telemetry:backend', 'telemetry:frontend')
AND created_at >= date_trunc('day', now())
AND created_at < date_trunc('day', now()) + interval '1 day'
GROUP BY 1
@ -188,7 +188,7 @@
:uri (cf/get :telemetry-uri)
:headers {"content-type" "application/json"}
:body (json/encode-str data)}
response (http/req cfg request)]
response (http/req cfg request {:skip-ssrf-check? true})]
(when (> (:status response) 206)
(ex/raise :type :internal
:code :invalid-response
@ -212,39 +212,28 @@
;; AUDIT-EVENT BATCH (TELEMETRY MODE)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Maximum number of telemetry-mode audit rows allowed to accumulate in
;; the audit_log table. When this limit is exceeded the oldest rows are
;; deleted before collection (we accept losing events rather than
;; letting the table grow unboundedly).
;; Telemetry events older than this are purged by the GC step so the
;; buffer stays bounded.
(def ^:private batch-size 10000)
(def ^:private max-telemetry-events 100000)
(def ^:private sql:count-telemetry-events
"SELECT count(*) AS cnt FROM audit_log WHERE source = 'telemetry'")
(def ^:private sql:gc-events
"DELETE FROM audit_log
WHERE id IN (
SELECT id FROM audit_log
WHERE source = 'telemetry'
ORDER BY created_at ASC
LIMIT ?)")
WHERE source IN ('telemetry:backend', 'telemetry:frontend')
AND created_at < now() - interval '7 days'")
(defn- gc-events
"Delete the oldest telemetry-mode events when the table exceeds the
configured cap so that the buffer stays bounded."
"Delete telemetry-mode events older than `telemetry-retention-days`
so that the buffer stays bounded."
[{:keys [::db/conn]}]
(let [cnt (-> (db/exec-one! conn [sql:count-telemetry-events]) :cnt long)
excess (- cnt max-telemetry-events)]
(when (pos? excess)
(l/warn :hint "telemetry audit_log cap exceeded; dropping oldest events"
:count excess)
(db/exec-one! conn [sql:gc-events (int excess)]))))
(let [result (db/exec-one! conn [sql:gc-events])]
(when (pos? (:next.jdbc/update-count result))
(l/warn :hint "purged stale telemetry events"
:count (:next.jdbc/update-count result)))))
(def ^:private sql:fetch-telemetry-events
"SELECT id, name, type, source, tracked_at, profile_id, context
FROM audit_log
WHERE source = 'telemetry'
WHERE source IN ('telemetry:backend', 'telemetry:frontend')
ORDER BY created_at ASC
LIMIT ?")
@ -276,7 +265,7 @@
:uri (cf/get :telemetry-uri)
:headers {"content-type" "application/json"}
:body (json/encode-str payload)}
resp (http/req cfg request)]
resp (http/req cfg request {:skip-ssrf-check? true})]
(if (<= (:status resp) 206)
true
(do

View File

@ -137,13 +137,13 @@
(let [[row :as rows] (->> (th/db-exec! ["select * from audit_log"])
(mapv decode-row))]
(t/is (= 1 (count rows)))
;; source is telemetry, not frontend
(t/is (= "telemetry" (:source row)))
;; source is telemetry:frontend, not frontend
(t/is (= "telemetry:frontend" (:source row)))
;; profile-id preserved
(t/is (= (:id prof) (:profile-id row)))
;; event name preserved
(t/is (= "navigate" (:name row)))
;; props stripped to empty
;; props only contain UUID-valued keys (non-UUID stripped)
(t/is (= {} (:props row)))
;; ip zeroed
(t/is (= "0.0.0.0" (str (:ip-addr row))))

View File

@ -27,14 +27,15 @@
(defn- insert-telemetry-row!
"Insert a single anonymised audit_log row as the telemetry mode does."
([name] (insert-telemetry-row! name {}))
([name {:keys [tracked-at created-at]
([name {:keys [tracked-at created-at source]
:or {tracked-at (ct/now)
created-at (ct/now)}}]
created-at (ct/now)
source "telemetry:backend"}}]
(th/db-insert! :audit-log
{:id (uuid/next)
:name name
:type "action"
:source "telemetry"
:source source
:profile-id uuid/zero
:ip-addr (db/inet "0.0.0.0")
:props (db/tjson {})
@ -43,7 +44,7 @@
:created-at created-at})))
(defn- count-telemetry-rows []
(-> (th/db-exec-one! ["SELECT count(*) AS cnt FROM audit_log WHERE source = 'telemetry'"])
(-> (th/db-exec-one! ["SELECT count(*) AS cnt FROM audit_log WHERE source IN ('telemetry:backend', 'telemetry:frontend')"])
:cnt
long))
@ -293,72 +294,79 @@
(t/is (= 2 (count-telemetry-rows)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; GC / SAFETY-CAP TESTS
;; GC / RETENTION-WINDOW TESTS
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(t/deftest test-gc-removes-excess-rows-before-collection
;; Lower the cap to 2 and insert 5 rows. After the task runs the
;; 3 oldest rows must have been pruned and the 2 newest shipped.
(t/deftest test-gc-purges-events-older-than-7-days
;; Insert events from 8 days ago (stale) and from today (fresh).
;; After the task runs, stale events must be purged by GC and fresh
;; ones shipped by the batch sender.
(with-mocks [legacy-mock {:target 'app.tasks.telemetry/make-legacy-request
:return nil}
batch-mock {:target 'app.tasks.telemetry/send-event-batch
:return true}]
(with-redefs [cf/flags #{:telemetry}
telemetry/max-telemetry-events 2]
;; Insert rows with strictly ordered timestamps so we can reason
;; about which ones survive.
(let [t0 (ct/now)]
(doseq [i (range 5)]
(insert-telemetry-row!
(str "event-" i)
{:created-at (ct/plus t0 (ct/duration {:seconds i}))
:tracked-at (ct/plus t0 (ct/duration {:seconds i}))})))
(with-redefs [cf/flags #{:telemetry}]
(let [now (ct/now)
eight-days (ct/minus now (ct/duration {:days 8}))]
;; Stale events (older than 7 days)
(insert-telemetry-row! "stale-1" {:created-at eight-days :tracked-at eight-days})
(insert-telemetry-row! "stale-2" {:created-at eight-days :tracked-at eight-days})
;; Fresh events (today)
(insert-telemetry-row! "fresh-1" {:created-at now :tracked-at now})
(insert-telemetry-row! "fresh-2" {:created-at now :tracked-at now})
(t/is (= 5 (count-telemetry-rows)))
(t/is (= 4 (count-telemetry-rows)))
(th/run-task! :telemetry {:send? true :enabled? true})
(th/run-task! :telemetry {:send? true :enabled? true})
;; GC deleted 3, then the remaining 2 were shipped and deleted
(t/is (= 0 (count-telemetry-rows))))))
;; GC purged the 2 stale rows, batch sender shipped the 2 fresh ones
(t/is (= 0 (count-telemetry-rows)))))))
(t/deftest test-gc-does-not-run-when-under-cap
;; When the row count is below the cap, no GC deletion should occur
;; and all rows should be forwarded to the batch sender.
(t/deftest test-gc-keeps-events-within-7-day-window
;; When all events are within the 7-day window, GC must not delete
;; anything and all rows are forwarded to the batch sender.
(let [batch-events (atom nil)]
(with-mocks [legacy-mock {:target 'app.tasks.telemetry/make-legacy-request
:return nil}]
(with-redefs [cf/flags #{:telemetry}
telemetry/max-telemetry-events 100
(with-redefs [cf/flags #{:telemetry}
telemetry/send-event-batch
(fn [_cfg batch]
(reset! batch-events batch)
true)]
(insert-telemetry-row! "event-a")
(insert-telemetry-row! "event-b")
(let [six-days-ago (ct/minus (ct/now) (ct/duration {:days 6}))]
(insert-telemetry-row! "recent-1" {:created-at six-days-ago :tracked-at six-days-ago})
(insert-telemetry-row! "recent-2" {:created-at six-days-ago :tracked-at six-days-ago}))
(th/run-task! :telemetry {:send? true :enabled? true})
;; Both events forwarded to the batch — GC left them alone
;; Both events forwarded — GC left them alone
(t/is (= 2 (count @batch-events)))
(t/is (= 0 (count-telemetry-rows)))))))
(t/deftest test-gc-cap-exactly-at-limit-does-not-delete
;; Row count == cap means excess is zero; nothing should be deleted
;; by the GC step.
(with-mocks [legacy-mock {:target 'app.tasks.telemetry/make-legacy-request
:return nil}
batch-mock {:target 'app.tasks.telemetry/send-event-batch
:return true}]
(with-redefs [cf/flags #{:telemetry}
telemetry/max-telemetry-events 3]
(insert-telemetry-row! "a")
(insert-telemetry-row! "b")
(insert-telemetry-row! "c")
(t/deftest test-gc-deletes-only-stale-events
;; Insert a mix of stale (8 days old) and fresh (1 day old) events.
;; After GC, only fresh events should remain for the batch sender.
(let [batch-events (atom nil)]
(with-mocks [legacy-mock {:target 'app.tasks.telemetry/make-legacy-request
:return nil}]
(with-redefs [cf/flags #{:telemetry}
telemetry/send-event-batch
(fn [_cfg batch]
(reset! batch-events batch)
true)]
(let [eight-days (ct/minus (ct/now) (ct/duration {:days 8}))
one-day (ct/minus (ct/now) (ct/duration {:days 1}))]
(insert-telemetry-row! "stale" {:created-at eight-days :tracked-at eight-days})
(insert-telemetry-row! "fresh" {:created-at one-day :tracked-at one-day}))
(th/run-task! :telemetry {:send? true :enabled? true})
(t/is (= 2 (count-telemetry-rows)))
;; All 3 shipped — none dropped by GC
(t/is (= 0 (count-telemetry-rows))))))
(th/run-task! :telemetry {:send? true :enabled? true})
;; GC purged stale, batch shipped fresh
(t/is (= 1 (count @batch-events)))
(t/is (= "fresh" (:name (first @batch-events))))
(t/is (= 0 (count-telemetry-rows)))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; ANONYMITY TESTS
@ -380,7 +388,7 @@
{:id (uuid/next)
:name "create-project"
:type "action"
:source "telemetry"
:source "telemetry:backend"
:profile-id uuid/zero
:ip-addr (db/inet "0.0.0.0")
:props (db/tjson {})
@ -388,8 +396,8 @@
:tracked-at (ct/now)
:created-at (ct/now)})
(let [[row] (th/db-exec! ["SELECT * FROM audit_log WHERE source = 'telemetry'"])]
(t/is (= "telemetry" (:source row)))
(let [[row] (th/db-exec! ["SELECT * FROM audit_log WHERE source = 'telemetry:backend'"])]
(t/is (= "telemetry:backend" (:source row)))
;; props are always empty
(t/is (= "{}" (str (:props row))))
;; ip_addr is the sentinel zero address
@ -418,7 +426,7 @@
{:id (uuid/next)
:name "navigate"
:type "action"
:source "telemetry"
:source "telemetry:frontend"
:profile-id uuid/zero
:ip-addr (db/inet "0.0.0.0")
:props (db/tjson {})
@ -457,7 +465,7 @@
::audit/name "create-project"
::audit/profile-id (:id profile)}]
(db/tx-run! th/*system* handle-event! event)
(let [[row] (th/db-exec! ["SELECT * FROM audit_log WHERE source = 'telemetry'"])]
(let [[row] (th/db-exec! ["SELECT * FROM audit_log WHERE source = 'telemetry:backend'"])]
(t/is (some? row))
(let [created-at (:created-at row)
tracked-at (:tracked-at row)
@ -468,7 +476,7 @@
(t/deftest test-backend-ingest-full-row-shape
;; Verify the full row shape stored by handle-event! in telemetry mode:
;; source=telemetry, empty props, zeroed ip, context filtered to safe
;; source=telemetry:backend, empty props, zeroed ip, context filtered to safe
;; backend keys only, profile-id preserved, timestamps truncated.
(with-redefs [cf/flags #{:telemetry}
cf/telemetry-enabled? true]
@ -486,10 +494,10 @@
::audit/props {:some-prop "value"}}]
(db/tx-run! th/*system* handle-event! event)
(let [[row] (th/db-exec! ["SELECT * FROM audit_log WHERE source = 'telemetry'"])]
(let [[row] (th/db-exec! ["SELECT * FROM audit_log WHERE source = 'telemetry:backend'"])]
(t/is (some? row))
;; source
(t/is (= "telemetry" (:source row)))
(t/is (= "telemetry:backend" (:source row)))
;; profile-id preserved
(t/is (= (:id profile) (:profile-id row)))
;; name