This commit is contained in:
Andrey Antukh 2025-08-13 11:24:06 +02:00
parent 38066c73ee
commit 7e9c8e8f01
2 changed files with 71 additions and 10 deletions

View File

@ -391,12 +391,13 @@
WHERE f.data IS NOT NULL
ORDER BY f.modified_at ASC
LIMIT ?
FOR UPDATE")
FOR UPDATE
SKIP LOCKED")
(defn migrate-files-to-storage
"Migrate the current existing files to store data in new storage
tables."
[system & {:keys [chunk-size] :or {chunk-size 500}}]
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(reduce (fn [total {:keys [id data index created-at modified-at]}]
@ -424,11 +425,12 @@
AND f.id = f.file_id
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE")
FOR UPDATE
SKIP LOCKED")
(defn rollback-files-from-storage
"Migrate back to the file table storage."
[system & {:keys [chunk-size] :or {chunk-size 500}}]
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(reduce (fn [total {:keys [id data]}]
@ -447,16 +449,17 @@
AND f.label IS NOT NULL
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE")
FOR UPDATE
SKIP LOCKED")
(defn migrate-snapshots-to-storage
"Migrate the current existing files to store data in new storage
tables."
[system & {:keys [chunk-size] :or {chunk-size 500}}]
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(reduce (fn [total {:keys [id file-id data created-at modified-at]}]
(l/dbg :hint "migrating file" :file-id (str id))
(l/dbg :hint "migrating snapshot" :file-id (str file-id) :id (str id))
(db/update! conn :file-change {:data nil} {:id id :file-id file-id} ::db/return-keys false)
(db/insert! conn :file-data
{:backend "db"
@ -481,15 +484,19 @@
AND f.id != f.file_id
ORDER BY f.id ASC
LIMIT ?
FOR UPDATE")
FOR UPDATE
SKIP LOCKED")
(defn rollback-snapshots-from-storage
"Migrate back to the file table storage."
[system & {:keys [chunk-size] :or {chunk-size 500}}]
[system & {:keys [chunk-size] :or {chunk-size 100}}]
(db/tx-run! system
(fn [{:keys [::db/conn]}]
(db/exec! conn ["SET statement_timeout = 0"])
(db/exec! conn ["SET idle_in_transaction_session_timeout = 0"])
(reduce (fn [total {:keys [id file-id data]}]
(l/dbg :hint "rollback snapshot" :file-id (str id))
(l/dbg :hint "rollback snapshot" :file-id (str id) :id (str id))
(db/update! conn :file-change {:data data} {:id id :file-id file-id} ::db/return-keys false)
(db/delete! conn :file-data {:id id :file-id file-id} ::db/return-keys false)
(inc total))

View File

@ -550,6 +550,60 @@
:rollback rollback?
:elapsed elapsed))))))
(defn process!
"Apply a function to all files in the database"
[& {:keys [max-jobs
rollback?
max-chunks
proc-fn]
:or {max-chunks Long/MAX_VALUE
rollback? true}
:as opts}]
(l/dbg :hint "process:start"
:rollback rollback?
:max-jobs max-jobs
:max-chunks max-chunks)
(let [tpoint (ct/tpoint)
max-jobs (or max-jobs (px/get-available-processors))
chunks (atom 0)
start-job
(fn [jid]
(l/dbg :hint "start job thread" :jid jid)
(px/sleep 1000)
(loop [total 0]
(let [result (-> main/system
(assoc ::db/rollback rollback?)
(proc-fn opts))
total (+ total result)
chunks (swap! chunks inc)]
(l/dbg :hint "chunk processed" :jid jid :total total :chunk result)
(when (and (pos? result)
(< chunks max-chunks))
(recur total)))))]
(try
(let [jobs (->> (range max-jobs)
(map (fn [jid] (px/fn->thread (partial start-job jid))))
(doall))]
(doseq [job jobs]
(.join ^java.lang.Thread job)))
(catch Throwable cause
(l/dbg :hint "process:error" :cause cause))
(finally
(let [elapsed (ct/format-duration (tpoint))]
(l/dbg :hint "process:end"
:rollback rollback?
:elapsed elapsed))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; DELETE/RESTORE OBJECTS (WITH CASCADE, SOFT)