From 7e9c8e8f01d2c6d022fe19d57618a90fc65cc6f6 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 13 Aug 2025 11:24:06 +0200 Subject: [PATCH] WIP --- backend/src/app/features/fdata.clj | 27 +++++++++------ backend/src/app/srepl/main.clj | 54 ++++++++++++++++++++++++++++++ 2 files changed, 71 insertions(+), 10 deletions(-) diff --git a/backend/src/app/features/fdata.clj b/backend/src/app/features/fdata.clj index 65cca6f320..cfb0aea9f5 100644 --- a/backend/src/app/features/fdata.clj +++ b/backend/src/app/features/fdata.clj @@ -391,12 +391,13 @@ WHERE f.data IS NOT NULL ORDER BY f.modified_at ASC LIMIT ? - FOR UPDATE") + FOR UPDATE + SKIP LOCKED") (defn migrate-files-to-storage "Migrate the current existing files to store data in new storage tables." - [system & {:keys [chunk-size] :or {chunk-size 500}}] + [system & {:keys [chunk-size] :or {chunk-size 100}}] (db/tx-run! system (fn [{:keys [::db/conn]}] (reduce (fn [total {:keys [id data index created-at modified-at]}] @@ -424,11 +425,12 @@ AND f.id = f.file_id ORDER BY f.id ASC LIMIT ? - FOR UPDATE") + FOR UPDATE + SKIP LOCKED") (defn rollback-files-from-storage "Migrate back to the file table storage." - [system & {:keys [chunk-size] :or {chunk-size 500}}] + [system & {:keys [chunk-size] :or {chunk-size 100}}] (db/tx-run! system (fn [{:keys [::db/conn]}] (reduce (fn [total {:keys [id data]}] @@ -447,16 +449,17 @@ AND f.label IS NOT NULL ORDER BY f.id ASC LIMIT ? - FOR UPDATE") + FOR UPDATE + SKIP LOCKED") (defn migrate-snapshots-to-storage "Migrate the current existing files to store data in new storage tables." - [system & {:keys [chunk-size] :or {chunk-size 500}}] + [system & {:keys [chunk-size] :or {chunk-size 100}}] (db/tx-run! system (fn [{:keys [::db/conn]}] (reduce (fn [total {:keys [id file-id data created-at modified-at]}] - (l/dbg :hint "migrating file" :file-id (str id)) + (l/dbg :hint "migrating snapshot" :file-id (str file-id) :id (str id)) (db/update! conn :file-change {:data nil} {:id id :file-id file-id} ::db/return-keys false) (db/insert! conn :file-data {:backend "db" @@ -481,15 +484,19 @@ AND f.id != f.file_id ORDER BY f.id ASC LIMIT ? - FOR UPDATE") + FOR UPDATE + SKIP LOCKED") (defn rollback-snapshots-from-storage "Migrate back to the file table storage." - [system & {:keys [chunk-size] :or {chunk-size 500}}] + [system & {:keys [chunk-size] :or {chunk-size 100}}] (db/tx-run! system (fn [{:keys [::db/conn]}] + (db/exec! conn ["SET statement_timeout = 0"]) + (db/exec! conn ["SET idle_in_transaction_session_timeout = 0"]) + (reduce (fn [total {:keys [id file-id data]}] - (l/dbg :hint "rollback snapshot" :file-id (str id)) + (l/dbg :hint "rollback snapshot" :file-id (str id) :id (str id)) (db/update! conn :file-change {:data data} {:id id :file-id file-id} ::db/return-keys false) (db/delete! conn :file-data {:id id :file-id file-id} ::db/return-keys false) (inc total)) diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index 807c1b2a25..7683820b54 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -550,6 +550,60 @@ :rollback rollback? :elapsed elapsed)))))) +(defn process! + "Apply a function to all files in the database" + [& {:keys [max-jobs + rollback? + max-chunks + proc-fn] + :or {max-chunks Long/MAX_VALUE + rollback? true} + :as opts}] + + (l/dbg :hint "process:start" + :rollback rollback? + :max-jobs max-jobs + :max-chunks max-chunks) + + (let [tpoint (ct/tpoint) + max-jobs (or max-jobs (px/get-available-processors)) + chunks (atom 0) + + start-job + (fn [jid] + (l/dbg :hint "start job thread" :jid jid) + (px/sleep 1000) + + (loop [total 0] + (let [result (-> main/system + (assoc ::db/rollback rollback?) + (proc-fn opts)) + total (+ total result) + chunks (swap! chunks inc)] + + (l/dbg :hint "chunk processed" :jid jid :total total :chunk result) + (when (and (pos? result) + (< chunks max-chunks)) + (recur total)))))] + + (try + (let [jobs (->> (range max-jobs) + (map (fn [jid] (px/fn->thread (partial start-job jid)))) + (doall))] + (doseq [job jobs] + (.join ^java.lang.Thread job))) + + (catch Throwable cause + (l/dbg :hint "process:error" :cause cause)) + + (finally + (let [elapsed (ct/format-duration (tpoint))] + (l/dbg :hint "process:end" + :rollback rollback? + :elapsed elapsed)))))) + + + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; DELETE/RESTORE OBJECTS (WITH CASCADE, SOFT)