This commit is contained in:
Andrey Antukh 2025-08-13 11:38:12 +02:00
parent 3f99b1b626
commit f75b7ea284

View File

@ -554,25 +554,23 @@
"Apply a function to all files in the database"
[& {:keys [max-jobs
rollback?
max-chunks
max-items
chunk-size
proc-fn]
:or {max-chunks Long/MAX_VALUE
:or {max-items Long/MAX_VALUE
chunk-size 100
rollback? true}
:as opts}]
(let [tpoint (ct/tpoint)
max-jobs (or max-jobs (px/get-available-processors))
max-chunks (max max-chunks max-jobs)
processed (atom 0)
opts (-> opts
(assoc :chunk-size chunk-size)
(dissoc :rollback?)
(dissoc :proc-fn)
(dissoc :max-jobs)
(dissoc :max-chunks))
(dissoc :max-items))
start-job
(fn [jid]
@ -585,20 +583,17 @@
(proc-fn opts))]
(l/dbg :hint "chunk processed" :jid jid :total total :chunk result)
(let [total (+ total result)
chunks (swap! processed inc)]
(when (and (pos? result)
(< chunks max-chunks))
(let [total (swap! processed + result)]
(l/dbg :hint "chunk processed" :jid jid :total total :chunk result)
(when (and (pos? result)
(< total max-items))
(recur total))))))]
(l/dbg :hint "process:start"
:rollback rollback?
:max-jobs max-jobs
:max-chunks max-chunks)
(add-watch processed ::watch
(fn [_ _ _ v]
(l/dbg :hint "total chunks processed" :chunks v :items (* chunk-size v))))
:max-items max-items)
(try
(let [jobs (->> (range max-jobs)
@ -613,6 +608,7 @@
(finally
(let [elapsed (ct/format-duration (tpoint))]
(l/dbg :hint "process:end"
:processed @processed
:rollback rollback?
:elapsed elapsed))))))