Add climit impl and config for file snapshot methods (#9722)

*  Add dedicated concurrency limit for restore-file-snapshot

This adds a dedicated climit configuration for the restore-file-snapshot
RPC method with :permits 1 per profile (plus queue of 2 and 60s timeout)
and a global limit of 3. Previously the method only used the generic
root/by-profile and root/global limits, allowing up to 7 concurrent
restore operations per profile which caused database row lock contention
on FOR UPDATE and connection pool exhaustion.

*  Skip locking on restore! to avoid blocking other operations

Changes the row lock acquisition in restore! from a blocking FOR UPDATE
to FOR UPDATE SKIP LOCKED. If the file row is already locked by another
concurrent operation (e.g., another restore or an update-file), the query
returns no rows and the caller fails fast with a clear conflict error
instead of blocking indefinitely holding a database connection.

*  Add queue and timeout limits to root/by-profile concurrency limit

Previously root/by-profile had no queue limit (unbounded Integer/MAX_VALUE)
and no timeout, allowing requests to pile up indefinitely behind a profile
whose permits were exhausted by long-running operations. This could lead
to memory pressure and cascading failures. Now limited to 30 queued
requests with a 30-second timeout so excess requests fail fast.

*  Move backup snapshot creation outside restore transaction

The backup snapshot (fsnap/create!) is now created in its own short-lived
connection before the actual restore transaction begins. This ensures the
backup is persisted independently of the restore outcome and reduces the
restore transaction window.

The restore itself runs inside a db/tx-run! block with an optimistic
locking check: it reads the file with FOR UPDATE and compares its revn
against the value captured at backup time. If the file was edited
concurrently, the restore aborts with a conflict error to prevent data
loss.

Co-dependent with the SKIP LOCKED change in restore! — the FOR UPDATE
acquired here is in the same transaction as restore!, so the SKIP LOCKED
inside restore! correctly sees the row as unlocked (same transaction).

* ♻️ Remove unused private function get-minimal-file

The local get-minimal-file function in file_snapshots.clj is no longer
used since restore! switched to direct exec-one! with FOR UPDATE SKIP
LOCKED. The sql:get-minimal-file SQL constant is still used directly.

*  Add minor improvements on db connection management

* ♻️ Refactor create-file-snapshot to use explicit transaction management

Remove automatic transaction wrapping (`::db/transaction true`) and
pass `cfg` through the call chain instead of destructured `conn`.
Wrap `fsnap/create!` in an explicit `db/tx-run!` for clearer
transaction boundaries.

Signed-off-by: Andrey Antukh <niwi@niwi.nz>

*  Add dedicated concurrency limit for create-file-snapshot

This adds a dedicated climit configuration for the create-file-snapshot
RPC method with :permits 1 per profile (plus queue of 2 and 60s timeout)
and a global limit of 3. Previously the method only used the generic
root/by-profile and root/global limits, allowing up to 10 concurrent
snapshot creation operations per profile which could cause database
contention and connection pool exhaustion.

Signed-off-by: Andrey Antukh <niwi@niwi.nz>

---------

Signed-off-by: Andrey Antukh <niwi@niwi.nz>
This commit is contained in:
Andrey Antukh 2026-05-19 14:30:44 +02:00 committed by GitHub
parent 87b969bd05
commit 405a73e8ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 190 additions and 98 deletions

View File

@ -19,7 +19,7 @@
{:permits 40}
:root/by-profile
{:permits 10}
{:permits 10 :queue 30 :timeout 30000}
:file-thumbnail-ops/global
{:permits 20}
@ -27,4 +27,16 @@
{:permits 2}
:submit-audit-events/by-profile
{:permits 1 :queue 3}}
{:permits 1 :queue 3}
:restore-file-snapshot/global
{:permits 3}
:restore-file-snapshot/by-profile
{:permits 1 :queue 2 :timeout 60000}
:create-file-snapshot/global
{:permits 3}
:create-file-snapshot/by-profile
{:permits 1 :queue 2 :timeout 60000}}

View File

@ -27,7 +27,9 @@
[next.jdbc.transaction])
(:import
com.zaxxer.hikari.HikariConfig
com.zaxxer.hikari.HikariConfigMXBean
com.zaxxer.hikari.HikariDataSource
com.zaxxer.hikari.HikariPoolMXBean
com.zaxxer.hikari.metrics.prometheus.PrometheusMetricsTrackerFactory
io.whitfin.siphash.SipHasher
io.whitfin.siphash.SipHasherContainer
@ -67,9 +69,8 @@
(def defaults
{::name :main
::min-size 0
::max-size 60
::connection-timeout 10000
::connection-timeout 30000
::validation-timeout 10000
::idle-timeout 120000 ; 2min
::max-lifetime 1800000 ; 30m
@ -82,7 +83,7 @@
(defmethod ig/init-key ::pool
[_ cfg]
(let [{:keys [::uri ::read-only] :as cfg}
(merge defaults cfg)]
(merge defaults (d/without-nils cfg))]
(when uri
(l/info :hint "initialize connection pool"
:name (d/name (::name cfg))
@ -90,7 +91,8 @@
:read-only read-only
:credentials (and (contains? cfg ::username)
(contains? cfg ::password))
:min-size (::min-size cfg)
:min-size (or (::min-size cfg)
(::max-size cfg))
:max-size (::max-size cfg))
(create-pool cfg))))
@ -111,7 +113,9 @@
[{:keys [::uri] :as cfg}]
;; (app.common.pprint/pprint cfg)
(let [config (HikariConfig.)]
(let [config (HikariConfig.)
max-size (::max-size cfg)
min-size (or (::min-size cfg) max-size)]
(doto config
(.setJdbcUrl (str "jdbc:" uri))
(.setPoolName (d/name (::name cfg)))
@ -121,8 +125,8 @@
(.setValidationTimeout (::validation-timeout cfg))
(.setIdleTimeout (::idle-timeout cfg))
(.setMaxLifetime (::max-lifetime cfg))
(.setMinimumIdle (::min-size cfg))
(.setMaximumPoolSize (::max-size cfg))
(.setMinimumIdle min-size)
(.setMaximumPoolSize max-size)
(.setConnectionInitSql initsql)
(.setInitializationFailTimeout -1))
@ -180,6 +184,20 @@
:code :invalid-connection
:hint "invalid connection provided")))
(defn pool-stats
"Given a HikariDataSource instance, returns a map with current pool
statistics: active/idle connections, threads awaiting connection,
total connections, maximum pool size, and minimum idle connections."
[^HikariDataSource pool]
(let [^HikariPoolMXBean pool-mxbean (.getHikariPoolMXBean pool)
^HikariConfigMXBean cfg-mxbean (.getHikariConfigMXBean pool)]
{:active-connections (.getActiveConnections pool-mxbean)
:idle-connections (.getIdleConnections pool-mxbean)
:threads-awaiting-connection (.getThreadsAwaitingConnection pool-mxbean)
:total-connections (.getTotalConnections pool-mxbean)
:maximum-pool-size (.getMaximumPoolSize cfg-mxbean)
:minimum-idle (.getMinimumIdle cfg-mxbean)}))
(defn create-pool
[cfg]
(let [dsc (create-datasource-config cfg)]

View File

@ -66,11 +66,6 @@
LEFT JOIN file_data AS fd ON (fd.file_id = f.id AND fd.id = f.id)
WHERE f.id = ?")
(defn- get-minimal-file
[cfg id & {:as opts}]
(-> (db/get-with-sql cfg [sql:get-minimal-file id] opts)
(d/update-when :metadata fdata/decode-metadata)))
(def ^:private sql:get-snapshot-without-data
(str "WITH snapshots AS (" sql:snapshots ")"
"SELECT c.id,
@ -319,79 +314,87 @@
(defn restore!
[{:keys [::db/conn] :as cfg} file-id snapshot-id]
(let [file (get-minimal-file conn file-id {::db/for-update true})
vern (rand-int Integer/MAX_VALUE)
(let [lock-sql (str sql:get-minimal-file " FOR UPDATE OF f SKIP LOCKED")
row (db/exec-one! conn [lock-sql file-id])]
storage
(sto/resolve cfg {::db/reuse-conn true})
(when-not row
(ex/raise :type :conflict
:code :file-locked
:hint "the file is currently locked by another operation, retry later"))
snapshot
(get-snapshot cfg file-id snapshot-id)]
(let [file (d/update-when row :metadata fdata/decode-metadata)
vern (rand-int Integer/MAX_VALUE)
(when-not snapshot
(ex/raise :type :not-found
:code :snapshot-not-found
:hint "unable to find snapshot with the provided label"
:snapshot-id snapshot-id
:file-id file-id))
storage
(sto/resolve cfg {::db/reuse-conn true})
(when-not (:data snapshot)
(ex/raise :type :internal
:code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
snapshot
(get-snapshot cfg file-id snapshot-id)]
(let [;; If the snapshot has applied migrations stored, we reuse
;; them, if not, we take a safest set of migrations as
;; starting point. This is because, at the time of
;; implementing snapshots, migrations were not taken into
;; account so we need to make this backward compatible in
;; some way.
migrations
(or (:migrations snapshot)
(fmg/generate-migrations-from-version 67))
(when-not snapshot
(ex/raise :type :not-found
:code :snapshot-not-found
:hint "unable to find snapshot with the provided label"
:snapshot-id snapshot-id
:file-id file-id))
file
(-> file
(update :revn inc)
(assoc :migrations migrations)
(assoc :data (:data snapshot))
(assoc :vern vern)
(assoc :version (:version snapshot))
(assoc :has-media-trimmed false)
(assoc :modified-at (:modified-at snapshot))
(assoc :features (:features snapshot)))]
(when-not (:data snapshot)
(ex/raise :type :internal
:code :snapshot-without-data
:hint "snapshot has no data"
:label (:label snapshot)
:file-id file-id))
(l/dbg :hint "restoring snapshot"
:file-id (str file-id)
:label (:label snapshot)
:snapshot-id (str (:id snapshot)))
(let [;; If the snapshot has applied migrations stored, we reuse
;; them, if not, we take a safest set of migrations as
;; starting point. This is because, at the time of
;; implementing snapshots, migrations were not taken into
;; account so we need to make this backward compatible in
;; some way.
migrations
(or (:migrations snapshot)
(fmg/generate-migrations-from-version 67))
;; In the same way, on reseting the file data, we need to restore
;; the applied migrations on the moment of taking the snapshot
(bfc/update-file! cfg file ::bfc/reset-migrations? true)
file
(-> file
(update :revn inc)
(assoc :migrations migrations)
(assoc :data (:data snapshot))
(assoc :vern vern)
(assoc :version (:version snapshot))
(assoc :has-media-trimmed false)
(assoc :modified-at (:modified-at snapshot))
(assoc :features (:features snapshot)))]
;; FIXME: this should be separated functions, we should not have
;; inline sql here.
(l/dbg :hint "restoring snapshot"
:file-id (str file-id)
:label (:label snapshot)
:snapshot-id (str (:id snapshot)))
;; clean object thumbnails
(let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; In the same way, on reseting the file data, we need to restore
;; the applied migrations on the moment of taking the snapshot
(bfc/update-file! cfg file ::bfc/reset-migrations? true)
;; clean file thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; FIXME: this should be separated functions, we should not have
;; inline sql here.
vern)))
;; clean object thumbnails
(let [sql (str "update file_tagged_object_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
;; clean file thumbnails
(let [sql (str "update file_thumbnail "
" set deleted_at = now() "
" where file_id=? returning media_id")
res (db/exec! conn [sql file-id])]
(doseq [media-id (into #{} (keep :media-id) res)]
(sto/touch-object! storage media-id)))
vern))))
(defn delete!
[cfg & {:keys [id file-id deleted-at]}]

View File

@ -154,8 +154,8 @@
::db/username (cf/get :database-username)
::db/password (cf/get :database-password)
::db/read-only (cf/get :database-readonly false)
::db/min-size (cf/get :database-min-pool-size 0)
::db/max-size (cf/get :database-max-pool-size 60)
::db/min-size (cf/get :database-min-pool-size)
::db/max-size (cf/get :database-max-pool-size)
::mtx/metrics (ig/ref ::mtx/metrics)}
;; Default netty IO pool (shared between several services)

View File

@ -17,6 +17,7 @@
[app.main :as-alias main]
[app.msgbus :as mbus]
[app.rpc :as-alias rpc]
[app.rpc.climit :as-alias climit]
[app.rpc.commands.files :as files]
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
@ -43,9 +44,10 @@
(sv/defmethod ::create-file-snapshot
{::doc/added "1.20"
::sm/params schema:create-file-snapshot
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id label]}]
(files/check-edition-permissions! conn profile-id file-id)
::climit/id [[:create-file-snapshot/by-profile ::rpc/profile-id]
[:create-file-snapshot/global]]}
[cfg {:keys [::rpc/profile-id file-id label]}]
(files/check-edition-permissions! cfg profile-id file-id)
(let [file (bfc/get-file cfg file-id :realize? true)
project (db/get-by-id cfg :project (:project-id file))]
@ -57,10 +59,10 @@
(quotes/check! {::quotes/id ::quotes/snapshots-per-file}
{::quotes/id ::quotes/snapshots-per-team}))
(fsnap/create! cfg file
{:label label
:profile-id profile-id
:created-by "user"})))
(db/tx-run! cfg fsnap/create! file
{:label label
:profile-id profile-id
:created-by "user"})))
(def ^:private schema:restore-file-snapshot
[:map {:title "restore-file-snapshot"}
@ -70,29 +72,43 @@
(sv/defmethod ::restore-file-snapshot
{::doc/added "1.20"
::sm/params schema:restore-file-snapshot
::db/transaction true}
[{:keys [::db/conn ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id ::rpc/session-id file-id id] :as params}]
(files/check-edition-permissions! conn profile-id file-id)
::climit/id [[:restore-file-snapshot/by-profile ::rpc/profile-id]
[:restore-file-snapshot/global]]}
[{:keys [::db/pool ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id ::rpc/session-id file-id id] :as params}]
;; Check permissions and read current file state (short-lived, outside restore transaction)
(files/check-edition-permissions! pool profile-id file-id)
(let [file (bfc/get-file cfg file-id)
team (teams/get-team conn
team (teams/get-team pool
:profile-id profile-id
:file-id file-id)
delay (ldel/get-deletion-delay team)]
delay (ldel/get-deletion-delay team)
file-revn (:revn file)]
;; Create backup snapshot of the current state (committed immediately
;; independently of the restore outcome)
(fsnap/create! cfg file
{:profile-id profile-id
:deleted-at (ct/in-future delay)
:created-by "system"})
(let [vern (fsnap/restore! cfg file-id id)]
;; Send to the clients a notification to reload the file
(mbus/pub! msgbus
:topic (:id file)
:message {:type :file-restored
:session-id session-id
:file-id (:id file)
:vern vern})
nil)))
;; Restore snapshot inside its own transaction; the revn check
;; ensures no data is lost if the file was edited concurrently
(db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}]
(let [current (bfc/get-minimal-file conn file-id {::db/for-update true})]
(when (not= (:revn current) file-revn)
(ex/raise :type :conflict
:code :file-modified
:hint "the file was modified during the restore process, please retry")))
(let [vern (fsnap/restore! cfg file-id id)]
(mbus/pub! msgbus
:topic (:id file)
:message {:type :file-restored
:session-id session-id
:file-id (:id file)
:vern vern})
nil)))))
(def ^:private schema:update-file-snapshot
[:map {:title "update-file-snapshot"}

View File

@ -0,0 +1,43 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns backend-tests.db-test
(:require
[app.db :as db]
[backend-tests.helpers :as th]
[clojure.test :as t])
(:import
com.zaxxer.hikari.HikariConfig
com.zaxxer.hikari.HikariDataSource
java.sql.Connection))
(t/use-fixtures :once th/state-init)
(t/deftest pool-stats-returns-expected-keys
(let [stats (db/pool-stats th/*pool*)]
(t/testing "all expected keys are present"
(t/is (contains? stats :active-connections))
(t/is (contains? stats :idle-connections))
(t/is (contains? stats :threads-awaiting-connection))
(t/is (contains? stats :total-connections))
(t/is (contains? stats :maximum-pool-size))
(t/is (contains? stats :minimum-idle)))
(t/testing "values are non-negative integers"
(t/is (>= (:active-connections stats) 0))
(t/is (>= (:idle-connections stats) 0))
(t/is (>= (:threads-awaiting-connection stats) 0))
(t/is (>= (:total-connections stats) 0))
(t/is (>= (:maximum-pool-size stats) 0))
(t/is (>= (:minimum-idle stats) 0)))
(t/testing "total connections equals active + idle"
(t/is (= (:total-connections stats)
(+ (:active-connections stats)
(:idle-connections stats)))))
(t/testing "maximum pool size is reasonable"
(t/is (pos? (:maximum-pool-size stats))))))