From cd53d3659c65f817c7a6c34d7f36bc8e5f73d02f Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 5 Nov 2025 10:15:21 +0100 Subject: [PATCH 1/2] :bug: Truncate worker scheduled-at to milliseconds The nanosecond precision has the problem with transit serialization roundtrip used for pass data on the worker scheduler throught redis and generates unnecesary rescheduling. --- backend/src/app/worker.clj | 36 +++++++++++++++++-------------- backend/src/app/worker/runner.clj | 7 ++++-- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index d1f728cdb7..a5d794e6f7 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -77,8 +77,8 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (def ^:private sql:insert-new-task - "insert into task (id, name, props, queue, label, priority, max_retries, scheduled_at) - values (?, ?, ?, ?, ?, ?, ?, now() + ?) + "insert into task (id, name, props, queue, label, priority, max_retries, created_at, modified_at, scheduled_at) + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) returning id") (def ^:private @@ -88,7 +88,7 @@ AND queue=? AND label=? AND status = 'new' - AND scheduled_at > now()") + AND scheduled_at > ?") (def ^:private schema:options [:map {:title "submit-options"} @@ -111,17 +111,19 @@ (check-options! options) - (let [duration (ct/duration delay) - interval (db/interval duration) - props (db/tjson params) - id (uuid/next) - tenant (cf/get :tenant) - task (d/name task) - queue (str/ffmt "%:%" tenant (d/name queue)) - conn (db/get-connectable options) - deleted (when dedupe - (-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label]) - :next.jdbc/update-count))] + (let [delay (ct/duration delay) + now (ct/now) + scheduled-at (-> (ct/plus now delay) + (ct/truncate :millisecond)) + props (db/tjson params) + id (uuid/next) + tenant (cf/get :tenant) + task (d/name task) + queue (str/ffmt "%:%" tenant (d/name queue)) + conn (db/get-connectable options) + deleted (when dedupe + (-> (db/exec-one! conn [sql:remove-not-started-tasks task queue label now]) + (db/get-update-count)))] (l/trc :hint "submit task" :name task @@ -129,11 +131,13 @@ :queue queue :label label :dedupe (boolean dedupe) - :delay (ct/format-duration duration) + :delay (ct/format-duration delay) :replace (or deleted 0)) (db/exec-one! conn [sql:insert-new-task id task props queue - label priority max-retries interval]) + label priority max-retries + now now scheduled-at]) + id)) (defn invoke! diff --git a/backend/src/app/worker/runner.clj b/backend/src/app/worker/runner.clj index f3f44bfd30..37fc55471a 100644 --- a/backend/src/app/worker/runner.clj +++ b/backend/src/app/worker/runner.clj @@ -158,7 +158,9 @@ (inst-ms (:scheduled-at task))) (l/wrn :hint "skiping task, rescheduled" :task-id task-id - :runner-id id) + :runner-id id + :scheduled-at (ct/format-inst (:scheduled-at task)) + :expected-scheduled-at (ct/format-inst scheduled-at)) :else (let [result (run-task cfg task)] @@ -179,7 +181,8 @@ {:error explain :status "retry" :modified-at now - :scheduled-at (ct/plus now delay) + :scheduled-at (-> (ct/plus now delay) + (ct/truncate :millisecond)) :retry-num nretry} {:id (:id task)}) nil)) From 7d5c1c9b5f3c788dab96934b716d6cf252390812 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 5 Nov 2025 10:18:09 +0100 Subject: [PATCH 2/2] :sparkles: Make file-gc-scheduler task compatible with virtual clock And simplify implementation --- backend/src/app/tasks/file_gc.clj | 3 +++ backend/src/app/tasks/file_gc_scheduler.clj | 26 ++++++++++----------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index 946b9aa42b..e1f6d2c542 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -218,6 +218,9 @@ (when (or (nil? revn) (= revn (:revn file))) file))) +;; FIXME: we should skip files that does not match the revn on the +;; props and add proper schema for this task props + (defn- process-file! [cfg {:keys [file-id] :as props}] (if-let [file (get-file cfg props)] diff --git a/backend/src/app/tasks/file_gc_scheduler.clj b/backend/src/app/tasks/file_gc_scheduler.clj index 32c1d4152c..ee1cb7c673 100644 --- a/backend/src/app/tasks/file_gc_scheduler.clj +++ b/backend/src/app/tasks/file_gc_scheduler.clj @@ -8,6 +8,7 @@ "A maintenance task that is responsible of properly scheduling the file-gc task for all files that matches the eligibility threshold." (:require + [app.common.logging :as l] [app.common.time :as ct] [app.config :as cf] [app.db :as db] @@ -21,25 +22,24 @@ f.modified_at FROM file AS f WHERE f.has_media_trimmed IS false - AND f.modified_at < now() - ?::interval + AND f.modified_at < ? AND f.deleted_at IS NULL ORDER BY f.modified_at DESC FOR UPDATE OF f SKIP LOCKED") -(defn- get-candidates - [{:keys [::db/conn ::min-age] :as cfg}] - (let [min-age (db/interval min-age)] - (db/plan conn [sql:get-candidates min-age] {:fetch-size 10}))) - (defn- schedule! - [cfg] + [{:keys [::db/conn] :as cfg} threshold] (let [total (reduce (fn [total {:keys [id modified-at revn]}] - (let [params {:file-id id :modified-at modified-at :revn revn}] + (let [params {:file-id id :revn revn}] + (l/trc :hint "schedule" + :file-id (str id) + :revn revn + :modified-at (ct/format-inst modified-at)) (wrk/submit! (assoc cfg ::wrk/params params)) (inc total))) 0 - (get-candidates cfg))] + (db/plan conn [sql:get-candidates threshold] {:fetch-size 10}))] {:processed total})) (defmethod ig/assert-key ::handler @@ -53,12 +53,12 @@ (defmethod ig/init-key ::handler [_ cfg] (fn [{:keys [props] :as task}] - (let [min-age (ct/duration (or (:min-age props) (::min-age cfg)))] + (let [threshold (-> (ct/duration (or (:min-age props) (::min-age cfg))) + (ct/in-past))] (-> cfg (assoc ::db/rollback (:rollback? props)) - (assoc ::min-age min-age) (assoc ::wrk/task :file-gc) (assoc ::wrk/priority 10) (assoc ::wrk/mark-retries 0) - (assoc ::wrk/delay 1000) - (db/tx-run! schedule!))))) + (assoc ::wrk/delay 10000) + (db/tx-run! schedule! threshold)))))