This commit is contained in:
Andrey Antukh 2025-08-08 11:26:49 +02:00
parent 872b8fec85
commit 3a0870690b
2 changed files with 46 additions and 35 deletions

View File

@ -386,59 +386,57 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def sql:get-unmigrated-files
"SELECT f.id, f.data,
row_number() OVER w AS index
"SELECT f.id, f.data
FROM file AS f
WHERE f.data IS NOT NULL
WINDOW w AS (order by f.modified_at ASC)
ORDER BY f.modified_at ASC
LIMIT 100000")
LIMIT ?
FOR UPDATE")
(defn migrate-to-storage
"Migrate the current existing files to store data in new storage
tables."
[system]
[system & {:keys [chunk-size] :or {chunk-size 5000}}]
(let [timestamp (ct/now)]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(run! (fn [{:keys [id data index]}]
(l/dbg :hint "migrating file" :file-id (str id) :index index)
(db/update! conn :file {:data nil} {:id id} ::db/return-keys false)
(db/insert! conn :file-data
{:backend "db"
:metadata nil
:type "main"
:data data
:created-at timestamp
:modified-at timestamp
:file-id id
:id id}
{::db/return-keys false}))
(db/plan conn [sql:get-unmigrated-files]
{:fetch-size 1}))))))
(reduce (fn [total {:keys [id data index]}]
(l/dbg :hint "migrating file" :file-id (str id))
(db/update! conn :file {:data nil} {:id id} ::db/return-keys false)
(db/insert! conn :file-data
{:backend "db"
:metadata nil
:type "main"
:data data
:created-at timestamp
:modified-at timestamp
:file-id id
:id id}
{::db/return-keys false})
(inc total))
0
(db/plan conn [sql:get-unmigrated-files chunk-size]
{:fetch-size 1}))))))
(def sql:get-migrated-files
"SELECT f.id, f.data,
row_number() OVER w AS index
"SELECT f.id, f.data
FROM file_data AS f
WHERE f.data IS NOT NULL
AND f.id = f.file_id
WINDOW w AS (order by f.id ASC)
ORDER BY f.id ASC")
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE")
(defn rollback-from-storage
"Migrate back to the file table storage."
[system]
[system & {:keys [chunk-size] :or {chunk-size 5000}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(run! (fn [{:keys [id data index]}]
(l/dbg :hint "rollback file" :file-id (str id) :index index)
(db/update! conn :file {:data data} {:id id} ::db/return-keys false)
(db/delete! conn :file-data {:id id} ::db/return-keys false))
(db/plan conn [sql:get-migrated-files]
(reduce (fn [total {:keys [id data index]}]
(l/dbg :hint "rollback file" :file-id (str id) :index index)
(db/update! conn :file {:data data} {:id id} ::db/return-keys false)
(db/delete! conn :file-data {:id id} ::db/return-keys false)
(inc total))
0
(db/plan conn [sql:get-migrated-files]
{:fetch-size 1})))))

View File

@ -833,6 +833,19 @@
(with-open [reader (io/reader path)]
(process-data! system deleted-at (line-seq reader))))))))
(defn process-chunks
"A generic function that executes the specified proc iterativelly
until 0 results is returned"
[cfg proc-fn & params]
(loop [total 0]
(let [result (apply proc-fn cfg params)]
(if (pos? result)
(do
(l/trc :hint "chunk processed" :size result :total total)
(recur (+ total result)))
total))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; CASCADE FIXING
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;