From 405a73e8bad48d14ed72588418d9a4d1f2d342ed Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 19 May 2026 14:30:44 +0200 Subject: [PATCH] :sparkles: Add climit impl and config for file snapshot methods (#9722) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * :sparkles: Add dedicated concurrency limit for restore-file-snapshot This adds a dedicated climit configuration for the restore-file-snapshot RPC method with :permits 1 per profile (plus queue of 2 and 60s timeout) and a global limit of 3. Previously the method only used the generic root/by-profile and root/global limits, allowing up to 7 concurrent restore operations per profile which caused database row lock contention on FOR UPDATE and connection pool exhaustion. * :sparkles: Skip locking on restore! to avoid blocking other operations Changes the row lock acquisition in restore! from a blocking FOR UPDATE to FOR UPDATE SKIP LOCKED. If the file row is already locked by another concurrent operation (e.g., another restore or an update-file), the query returns no rows and the caller fails fast with a clear conflict error instead of blocking indefinitely holding a database connection. * :sparkles: Add queue and timeout limits to root/by-profile concurrency limit Previously root/by-profile had no queue limit (unbounded Integer/MAX_VALUE) and no timeout, allowing requests to pile up indefinitely behind a profile whose permits were exhausted by long-running operations. This could lead to memory pressure and cascading failures. Now limited to 30 queued requests with a 30-second timeout so excess requests fail fast. * :sparkles: Move backup snapshot creation outside restore transaction The backup snapshot (fsnap/create!) is now created in its own short-lived connection before the actual restore transaction begins. This ensures the backup is persisted independently of the restore outcome and reduces the restore transaction window. The restore itself runs inside a db/tx-run! block with an optimistic locking check: it reads the file with FOR UPDATE and compares its revn against the value captured at backup time. If the file was edited concurrently, the restore aborts with a conflict error to prevent data loss. Co-dependent with the SKIP LOCKED change in restore! — the FOR UPDATE acquired here is in the same transaction as restore!, so the SKIP LOCKED inside restore! correctly sees the row as unlocked (same transaction). * :recycle: Remove unused private function get-minimal-file The local get-minimal-file function in file_snapshots.clj is no longer used since restore! switched to direct exec-one! with FOR UPDATE SKIP LOCKED. The sql:get-minimal-file SQL constant is still used directly. * :sparkles: Add minor improvements on db connection management * :recycle: Refactor create-file-snapshot to use explicit transaction management Remove automatic transaction wrapping (`::db/transaction true`) and pass `cfg` through the call chain instead of destructured `conn`. Wrap `fsnap/create!` in an explicit `db/tx-run!` for clearer transaction boundaries. Signed-off-by: Andrey Antukh * :sparkles: Add dedicated concurrency limit for create-file-snapshot This adds a dedicated climit configuration for the create-file-snapshot RPC method with :permits 1 per profile (plus queue of 2 and 60s timeout) and a global limit of 3. Previously the method only used the generic root/by-profile and root/global limits, allowing up to 10 concurrent snapshot creation operations per profile which could cause database contention and connection pool exhaustion. Signed-off-by: Andrey Antukh --------- Signed-off-by: Andrey Antukh --- backend/resources/climit.edn | 16 ++- backend/src/app/db.clj | 32 ++++- backend/src/app/features/file_snapshots.clj | 135 +++++++++--------- backend/src/app/main.clj | 4 +- .../src/app/rpc/commands/files_snapshot.clj | 58 +++++--- backend/test/backend_tests/db_test.clj | 43 ++++++ 6 files changed, 190 insertions(+), 98 deletions(-) create mode 100644 backend/test/backend_tests/db_test.clj diff --git a/backend/resources/climit.edn b/backend/resources/climit.edn index 34d2184153..7d8234499b 100644 --- a/backend/resources/climit.edn +++ b/backend/resources/climit.edn @@ -19,7 +19,7 @@ {:permits 40} :root/by-profile - {:permits 10} + {:permits 10 :queue 30 :timeout 30000} :file-thumbnail-ops/global {:permits 20} @@ -27,4 +27,16 @@ {:permits 2} :submit-audit-events/by-profile - {:permits 1 :queue 3}} + {:permits 1 :queue 3} + + :restore-file-snapshot/global + {:permits 3} + + :restore-file-snapshot/by-profile + {:permits 1 :queue 2 :timeout 60000} + + :create-file-snapshot/global + {:permits 3} + + :create-file-snapshot/by-profile + {:permits 1 :queue 2 :timeout 60000}} diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index c23ea07524..de80175040 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -27,7 +27,9 @@ [next.jdbc.transaction]) (:import com.zaxxer.hikari.HikariConfig + com.zaxxer.hikari.HikariConfigMXBean com.zaxxer.hikari.HikariDataSource + com.zaxxer.hikari.HikariPoolMXBean com.zaxxer.hikari.metrics.prometheus.PrometheusMetricsTrackerFactory io.whitfin.siphash.SipHasher io.whitfin.siphash.SipHasherContainer @@ -67,9 +69,8 @@ (def defaults {::name :main - ::min-size 0 ::max-size 60 - ::connection-timeout 10000 + ::connection-timeout 30000 ::validation-timeout 10000 ::idle-timeout 120000 ; 2min ::max-lifetime 1800000 ; 30m @@ -82,7 +83,7 @@ (defmethod ig/init-key ::pool [_ cfg] (let [{:keys [::uri ::read-only] :as cfg} - (merge defaults cfg)] + (merge defaults (d/without-nils cfg))] (when uri (l/info :hint "initialize connection pool" :name (d/name (::name cfg)) @@ -90,7 +91,8 @@ :read-only read-only :credentials (and (contains? cfg ::username) (contains? cfg ::password)) - :min-size (::min-size cfg) + :min-size (or (::min-size cfg) + (::max-size cfg)) :max-size (::max-size cfg)) (create-pool cfg)))) @@ -111,7 +113,9 @@ [{:keys [::uri] :as cfg}] ;; (app.common.pprint/pprint cfg) - (let [config (HikariConfig.)] + (let [config (HikariConfig.) + max-size (::max-size cfg) + min-size (or (::min-size cfg) max-size)] (doto config (.setJdbcUrl (str "jdbc:" uri)) (.setPoolName (d/name (::name cfg))) @@ -121,8 +125,8 @@ (.setValidationTimeout (::validation-timeout cfg)) (.setIdleTimeout (::idle-timeout cfg)) (.setMaxLifetime (::max-lifetime cfg)) - (.setMinimumIdle (::min-size cfg)) - (.setMaximumPoolSize (::max-size cfg)) + (.setMinimumIdle min-size) + (.setMaximumPoolSize max-size) (.setConnectionInitSql initsql) (.setInitializationFailTimeout -1)) @@ -180,6 +184,20 @@ :code :invalid-connection :hint "invalid connection provided"))) +(defn pool-stats + "Given a HikariDataSource instance, returns a map with current pool + statistics: active/idle connections, threads awaiting connection, + total connections, maximum pool size, and minimum idle connections." + [^HikariDataSource pool] + (let [^HikariPoolMXBean pool-mxbean (.getHikariPoolMXBean pool) + ^HikariConfigMXBean cfg-mxbean (.getHikariConfigMXBean pool)] + {:active-connections (.getActiveConnections pool-mxbean) + :idle-connections (.getIdleConnections pool-mxbean) + :threads-awaiting-connection (.getThreadsAwaitingConnection pool-mxbean) + :total-connections (.getTotalConnections pool-mxbean) + :maximum-pool-size (.getMaximumPoolSize cfg-mxbean) + :minimum-idle (.getMinimumIdle cfg-mxbean)})) + (defn create-pool [cfg] (let [dsc (create-datasource-config cfg)] diff --git a/backend/src/app/features/file_snapshots.clj b/backend/src/app/features/file_snapshots.clj index 192030cbf8..e4c2985f10 100644 --- a/backend/src/app/features/file_snapshots.clj +++ b/backend/src/app/features/file_snapshots.clj @@ -66,11 +66,6 @@ LEFT JOIN file_data AS fd ON (fd.file_id = f.id AND fd.id = f.id) WHERE f.id = ?") -(defn- get-minimal-file - [cfg id & {:as opts}] - (-> (db/get-with-sql cfg [sql:get-minimal-file id] opts) - (d/update-when :metadata fdata/decode-metadata))) - (def ^:private sql:get-snapshot-without-data (str "WITH snapshots AS (" sql:snapshots ")" "SELECT c.id, @@ -319,79 +314,87 @@ (defn restore! [{:keys [::db/conn] :as cfg} file-id snapshot-id] - (let [file (get-minimal-file conn file-id {::db/for-update true}) - vern (rand-int Integer/MAX_VALUE) + (let [lock-sql (str sql:get-minimal-file " FOR UPDATE OF f SKIP LOCKED") + row (db/exec-one! conn [lock-sql file-id])] - storage - (sto/resolve cfg {::db/reuse-conn true}) + (when-not row + (ex/raise :type :conflict + :code :file-locked + :hint "the file is currently locked by another operation, retry later")) - snapshot - (get-snapshot cfg file-id snapshot-id)] + (let [file (d/update-when row :metadata fdata/decode-metadata) + vern (rand-int Integer/MAX_VALUE) - (when-not snapshot - (ex/raise :type :not-found - :code :snapshot-not-found - :hint "unable to find snapshot with the provided label" - :snapshot-id snapshot-id - :file-id file-id)) + storage + (sto/resolve cfg {::db/reuse-conn true}) - (when-not (:data snapshot) - (ex/raise :type :internal - :code :snapshot-without-data - :hint "snapshot has no data" - :label (:label snapshot) - :file-id file-id)) + snapshot + (get-snapshot cfg file-id snapshot-id)] - (let [;; If the snapshot has applied migrations stored, we reuse - ;; them, if not, we take a safest set of migrations as - ;; starting point. This is because, at the time of - ;; implementing snapshots, migrations were not taken into - ;; account so we need to make this backward compatible in - ;; some way. - migrations - (or (:migrations snapshot) - (fmg/generate-migrations-from-version 67)) + (when-not snapshot + (ex/raise :type :not-found + :code :snapshot-not-found + :hint "unable to find snapshot with the provided label" + :snapshot-id snapshot-id + :file-id file-id)) - file - (-> file - (update :revn inc) - (assoc :migrations migrations) - (assoc :data (:data snapshot)) - (assoc :vern vern) - (assoc :version (:version snapshot)) - (assoc :has-media-trimmed false) - (assoc :modified-at (:modified-at snapshot)) - (assoc :features (:features snapshot)))] + (when-not (:data snapshot) + (ex/raise :type :internal + :code :snapshot-without-data + :hint "snapshot has no data" + :label (:label snapshot) + :file-id file-id)) - (l/dbg :hint "restoring snapshot" - :file-id (str file-id) - :label (:label snapshot) - :snapshot-id (str (:id snapshot))) + (let [;; If the snapshot has applied migrations stored, we reuse + ;; them, if not, we take a safest set of migrations as + ;; starting point. This is because, at the time of + ;; implementing snapshots, migrations were not taken into + ;; account so we need to make this backward compatible in + ;; some way. + migrations + (or (:migrations snapshot) + (fmg/generate-migrations-from-version 67)) - ;; In the same way, on reseting the file data, we need to restore - ;; the applied migrations on the moment of taking the snapshot - (bfc/update-file! cfg file ::bfc/reset-migrations? true) + file + (-> file + (update :revn inc) + (assoc :migrations migrations) + (assoc :data (:data snapshot)) + (assoc :vern vern) + (assoc :version (:version snapshot)) + (assoc :has-media-trimmed false) + (assoc :modified-at (:modified-at snapshot)) + (assoc :features (:features snapshot)))] - ;; FIXME: this should be separated functions, we should not have - ;; inline sql here. + (l/dbg :hint "restoring snapshot" + :file-id (str file-id) + :label (:label snapshot) + :snapshot-id (str (:id snapshot))) - ;; clean object thumbnails - (let [sql (str "update file_tagged_object_thumbnail " - " set deleted_at = now() " - " where file_id=? returning media_id") - res (db/exec! conn [sql file-id])] - (doseq [media-id (into #{} (keep :media-id) res)] - (sto/touch-object! storage media-id))) + ;; In the same way, on reseting the file data, we need to restore + ;; the applied migrations on the moment of taking the snapshot + (bfc/update-file! cfg file ::bfc/reset-migrations? true) - ;; clean file thumbnails - (let [sql (str "update file_thumbnail " - " set deleted_at = now() " - " where file_id=? returning media_id") - res (db/exec! conn [sql file-id])] - (doseq [media-id (into #{} (keep :media-id) res)] - (sto/touch-object! storage media-id))) + ;; FIXME: this should be separated functions, we should not have + ;; inline sql here. - vern))) + ;; clean object thumbnails + (let [sql (str "update file_tagged_object_thumbnail " + " set deleted_at = now() " + " where file_id=? returning media_id") + res (db/exec! conn [sql file-id])] + (doseq [media-id (into #{} (keep :media-id) res)] + (sto/touch-object! storage media-id))) + + ;; clean file thumbnails + (let [sql (str "update file_thumbnail " + " set deleted_at = now() " + " where file_id=? returning media_id") + res (db/exec! conn [sql file-id])] + (doseq [media-id (into #{} (keep :media-id) res)] + (sto/touch-object! storage media-id))) + + vern)))) (defn delete! [cfg & {:keys [id file-id deleted-at]}] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 940775bdf0..2c154ea95c 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -154,8 +154,8 @@ ::db/username (cf/get :database-username) ::db/password (cf/get :database-password) ::db/read-only (cf/get :database-readonly false) - ::db/min-size (cf/get :database-min-pool-size 0) - ::db/max-size (cf/get :database-max-pool-size 60) + ::db/min-size (cf/get :database-min-pool-size) + ::db/max-size (cf/get :database-max-pool-size) ::mtx/metrics (ig/ref ::mtx/metrics)} ;; Default netty IO pool (shared between several services) diff --git a/backend/src/app/rpc/commands/files_snapshot.clj b/backend/src/app/rpc/commands/files_snapshot.clj index 8325772361..e4f7d3d18d 100644 --- a/backend/src/app/rpc/commands/files_snapshot.clj +++ b/backend/src/app/rpc/commands/files_snapshot.clj @@ -17,6 +17,7 @@ [app.main :as-alias main] [app.msgbus :as mbus] [app.rpc :as-alias rpc] + [app.rpc.climit :as-alias climit] [app.rpc.commands.files :as files] [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] @@ -43,9 +44,10 @@ (sv/defmethod ::create-file-snapshot {::doc/added "1.20" ::sm/params schema:create-file-snapshot - ::db/transaction true} - [{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id label]}] - (files/check-edition-permissions! conn profile-id file-id) + ::climit/id [[:create-file-snapshot/by-profile ::rpc/profile-id] + [:create-file-snapshot/global]]} + [cfg {:keys [::rpc/profile-id file-id label]}] + (files/check-edition-permissions! cfg profile-id file-id) (let [file (bfc/get-file cfg file-id :realize? true) project (db/get-by-id cfg :project (:project-id file))] @@ -57,10 +59,10 @@ (quotes/check! {::quotes/id ::quotes/snapshots-per-file} {::quotes/id ::quotes/snapshots-per-team})) - (fsnap/create! cfg file - {:label label - :profile-id profile-id - :created-by "user"}))) + (db/tx-run! cfg fsnap/create! file + {:label label + :profile-id profile-id + :created-by "user"}))) (def ^:private schema:restore-file-snapshot [:map {:title "restore-file-snapshot"} @@ -70,29 +72,43 @@ (sv/defmethod ::restore-file-snapshot {::doc/added "1.20" ::sm/params schema:restore-file-snapshot - ::db/transaction true} - [{:keys [::db/conn ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id ::rpc/session-id file-id id] :as params}] - (files/check-edition-permissions! conn profile-id file-id) + ::climit/id [[:restore-file-snapshot/by-profile ::rpc/profile-id] + [:restore-file-snapshot/global]]} + [{:keys [::db/pool ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id ::rpc/session-id file-id id] :as params}] + + ;; Check permissions and read current file state (short-lived, outside restore transaction) + (files/check-edition-permissions! pool profile-id file-id) (let [file (bfc/get-file cfg file-id) - team (teams/get-team conn + team (teams/get-team pool :profile-id profile-id :file-id file-id) - delay (ldel/get-deletion-delay team)] + delay (ldel/get-deletion-delay team) + file-revn (:revn file)] + ;; Create backup snapshot of the current state (committed immediately + ;; independently of the restore outcome) (fsnap/create! cfg file {:profile-id profile-id :deleted-at (ct/in-future delay) :created-by "system"}) - (let [vern (fsnap/restore! cfg file-id id)] - ;; Send to the clients a notification to reload the file - (mbus/pub! msgbus - :topic (:id file) - :message {:type :file-restored - :session-id session-id - :file-id (:id file) - :vern vern}) - nil))) + ;; Restore snapshot inside its own transaction; the revn check + ;; ensures no data is lost if the file was edited concurrently + (db/tx-run! cfg + (fn [{:keys [::db/conn] :as cfg}] + (let [current (bfc/get-minimal-file conn file-id {::db/for-update true})] + (when (not= (:revn current) file-revn) + (ex/raise :type :conflict + :code :file-modified + :hint "the file was modified during the restore process, please retry"))) + (let [vern (fsnap/restore! cfg file-id id)] + (mbus/pub! msgbus + :topic (:id file) + :message {:type :file-restored + :session-id session-id + :file-id (:id file) + :vern vern}) + nil))))) (def ^:private schema:update-file-snapshot [:map {:title "update-file-snapshot"} diff --git a/backend/test/backend_tests/db_test.clj b/backend/test/backend_tests/db_test.clj new file mode 100644 index 0000000000..b61ce6c920 --- /dev/null +++ b/backend/test/backend_tests/db_test.clj @@ -0,0 +1,43 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns backend-tests.db-test + (:require + [app.db :as db] + [backend-tests.helpers :as th] + [clojure.test :as t]) + (:import + com.zaxxer.hikari.HikariConfig + com.zaxxer.hikari.HikariDataSource + java.sql.Connection)) + +(t/use-fixtures :once th/state-init) + +(t/deftest pool-stats-returns-expected-keys + (let [stats (db/pool-stats th/*pool*)] + (t/testing "all expected keys are present" + (t/is (contains? stats :active-connections)) + (t/is (contains? stats :idle-connections)) + (t/is (contains? stats :threads-awaiting-connection)) + (t/is (contains? stats :total-connections)) + (t/is (contains? stats :maximum-pool-size)) + (t/is (contains? stats :minimum-idle))) + + (t/testing "values are non-negative integers" + (t/is (>= (:active-connections stats) 0)) + (t/is (>= (:idle-connections stats) 0)) + (t/is (>= (:threads-awaiting-connection stats) 0)) + (t/is (>= (:total-connections stats) 0)) + (t/is (>= (:maximum-pool-size stats) 0)) + (t/is (>= (:minimum-idle stats) 0))) + + (t/testing "total connections equals active + idle" + (t/is (= (:total-connections stats) + (+ (:active-connections stats) + (:idle-connections stats))))) + + (t/testing "maximum pool size is reasonable" + (t/is (pos? (:maximum-pool-size stats))))))