From 3a0870690bff3133948a7ed01da6336a600fe7dd Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 8 Aug 2025 11:26:49 +0200 Subject: [PATCH] WIP --- backend/src/app/features/fdata.clj | 68 +++++++++++++++--------------- backend/src/app/srepl/main.clj | 13 ++++++ 2 files changed, 46 insertions(+), 35 deletions(-) diff --git a/backend/src/app/features/fdata.clj b/backend/src/app/features/fdata.clj index fe5eeab65c..d80f39dc42 100644 --- a/backend/src/app/features/fdata.clj +++ b/backend/src/app/features/fdata.clj @@ -386,59 +386,57 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (def sql:get-unmigrated-files - "SELECT f.id, f.data, - row_number() OVER w AS index + "SELECT f.id, f.data FROM file AS f WHERE f.data IS NOT NULL - WINDOW w AS (order by f.modified_at ASC) ORDER BY f.modified_at ASC - LIMIT 100000") + LIMIT ? + FOR UPDATE") (defn migrate-to-storage "Migrate the current existing files to store data in new storage tables." - [system] + [system & {:keys [chunk-size] :or {chunk-size 5000}}] (let [timestamp (ct/now)] (db/tx-run! system (fn [{:keys [::db/conn]}] - (run! (fn [{:keys [id data index]}] - (l/dbg :hint "migrating file" :file-id (str id) :index index) - (db/update! conn :file {:data nil} {:id id} ::db/return-keys false) - (db/insert! conn :file-data - {:backend "db" - :metadata nil - :type "main" - :data data - :created-at timestamp - :modified-at timestamp - :file-id id - :id id} - {::db/return-keys false})) - (db/plan conn [sql:get-unmigrated-files] - {:fetch-size 1})))))) + (reduce (fn [total {:keys [id data index]}] + (l/dbg :hint "migrating file" :file-id (str id)) + (db/update! conn :file {:data nil} {:id id} ::db/return-keys false) + (db/insert! conn :file-data + {:backend "db" + :metadata nil + :type "main" + :data data + :created-at timestamp + :modified-at timestamp + :file-id id + :id id} + {::db/return-keys false}) + (inc total)) + 0 + (db/plan conn [sql:get-unmigrated-files chunk-size] + {:fetch-size 1})))))) (def sql:get-migrated-files - "SELECT f.id, f.data, - row_number() OVER w AS index + "SELECT f.id, f.data FROM file_data AS f WHERE f.data IS NOT NULL AND f.id = f.file_id - WINDOW w AS (order by f.id ASC) - ORDER BY f.id ASC") + ORDER BY f.id ASC + LIMIT ? + FOR UPDATE") (defn rollback-from-storage "Migrate back to the file table storage." - [system] + [system & {:keys [chunk-size] :or {chunk-size 5000}}] (db/tx-run! system (fn [{:keys [::db/conn]}] - (run! (fn [{:keys [id data index]}] - (l/dbg :hint "rollback file" :file-id (str id) :index index) - (db/update! conn :file {:data data} {:id id} ::db/return-keys false) - (db/delete! conn :file-data {:id id} ::db/return-keys false)) - (db/plan conn [sql:get-migrated-files] + (reduce (fn [total {:keys [id data index]}] + (l/dbg :hint "rollback file" :file-id (str id) :index index) + (db/update! conn :file {:data data} {:id id} ::db/return-keys false) + (db/delete! conn :file-data {:id id} ::db/return-keys false) + (inc total)) + 0 + (db/plan conn [sql:get-migrated-files] {:fetch-size 1}))))) - - - - - diff --git a/backend/src/app/srepl/main.clj b/backend/src/app/srepl/main.clj index a542f8fd2f..807c1b2a25 100644 --- a/backend/src/app/srepl/main.clj +++ b/backend/src/app/srepl/main.clj @@ -833,6 +833,19 @@ (with-open [reader (io/reader path)] (process-data! system deleted-at (line-seq reader)))))))) + +(defn process-chunks + "A generic function that executes the specified proc iterativelly + until 0 results is returned" + [cfg proc-fn & params] + (loop [total 0] + (let [result (apply proc-fn cfg params)] + (if (pos? result) + (do + (l/trc :hint "chunk processed" :size result :total total) + (recur (+ total result))) + total)))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; CASCADE FIXING ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;