diff --git a/backend/src/app/srepl/helpers.clj b/backend/src/app/srepl/helpers.clj index 715fb77329..13b399f1bd 100644 --- a/backend/src/app/srepl/helpers.clj +++ b/backend/src/app/srepl/helpers.clj @@ -110,7 +110,7 @@ The `on-file` parameter should be a function that receives the file and the previous state and returns the new state." - [system & {:keys [chunk-size max-items start-at on-file on-error on-end] + [system & {:keys [chunk-size max-items start-at on-file on-error on-end on-init] :or {chunk-size 10 max-items Long/MAX_VALUE}}] (letfn [(get-chunk [conn cursor] (let [rows (db/exec! conn [sql:retrieve-files-chunk cursor chunk-size])] @@ -124,24 +124,27 @@ (take max-items) (map #(update % :data blob/decode)))) - (on-error* [file cause] + (on-error* [cause file] (println "unexpected exception happened on processing file: " (:id file)) (strace/print-stack-trace cause))] - (db/with-atomic [conn (:app.db/pool system)] - (loop [state {} - files (get-candidates conn)] - (if-let [file (first files)] - (let [state' (try - (on-file file state) - (catch Throwable cause - (let [on-error (or on-error on-error*)] - (on-error file cause))))] - (recur (or state' state) (rest files))) + (when (fn? on-init) (on-init)) - (if (fn? on-end) - (on-end state) - state)))))) + (db/with-atomic [conn (:app.db/pool system)] + (doseq [file (get-candidates conn)] + (binding [*conn* conn + pmap/*tracked* (atom {}) + pmap/*load-fn* (partial files/load-pointer conn (:id file)) + ffeat/*wrap-with-pointer-map-fn* + (if (contains? (:features file) "storage/pointer-map") pmap/wrap identity) + ffeat/*wrap-with-objects-map-fn* + (if (contains? (:features file) "storage/objectd-map") omap/wrap identity)] + (try + (on-file file) + (catch Throwable cause + ((or on-error on-error*) cause file)))))) + + (when (fn? on-end) (on-end)))) (defn update-pages "Apply a function to all pages of one file. The function receives a page and returns an updated page."