diff --git a/backend/resources/migrations/0007.tasks.sql b/backend/resources/migrations/0007.tasks.sql index b6adb3a1be..e1009ac1f1 100644 --- a/backend/resources/migrations/0007.tasks.sql +++ b/backend/resources/migrations/0007.tasks.sql @@ -1,6 +1,8 @@ CREATE TABLE IF NOT EXISTS tasks ( id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + queue text NOT NULL, + created_at timestamptz NOT NULL DEFAULT clock_timestamp(), modified_at timestamptz NOT NULL DEFAULT clock_timestamp(), completed_at timestamptz NULL DEFAULT NULL, @@ -9,9 +11,11 @@ CREATE TABLE IF NOT EXISTS tasks ( name text NOT NULL, props bytea NOT NULL, + error_text text NULL DEFAULT NULL, + retry_num smallint NOT NULL DEFAULT 0, status text NOT NULL DEFAULT 'new' ); CREATE INDEX tasks__scheduled_at__idx - ON tasks (scheduled_at); + ON tasks (scheduled_at, queue); diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj index 1282cf58b1..9a13bff94e 100644 --- a/backend/src/uxbox/tasks.clj +++ b/backend/src/uxbox/tasks.clj @@ -40,11 +40,16 @@ ;; --- State initialization +;; TODO: missing self maintanance task; when the queue table is full +;; of completed/failed task, the performance starts degrading +;; linearly, so after some arbitrary number of tasks is processed, we +;; need to perform a maintenance and delete some old tasks. + (def ^:private tasks [#'uxbox.tasks.demo-gc/handler #'uxbox.tasks.sendmail/handler]) -(defstate tasks - :start (as-> (impl/verticle tasks) $$ +(defstate small-tasks + :start (as-> (impl/verticle {:tasks tasks :queue "default"}) $$ (vc/deploy! system $$ {:instances 1}) (deref $$))) diff --git a/backend/src/uxbox/tasks/demo_gc.clj b/backend/src/uxbox/tasks/demo_gc.clj index 4c8b569a6e..7ddc29a4f9 100644 --- a/backend/src/uxbox/tasks/demo_gc.clj +++ b/backend/src/uxbox/tasks/demo_gc.clj @@ -17,4 +17,5 @@ (defn handler {:uxbox.tasks/name "demo-gc"} [{:keys [props] :as task}] + (prn "debug" props) ) diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj index 50f51f0d7d..22521e8c4c 100644 --- a/backend/src/uxbox/tasks/impl.clj +++ b/backend/src/uxbox/tasks/impl.clj @@ -24,20 +24,23 @@ [vertx.timers :as vt]) (:import java.time.Duration)) -(def ^:private num-cpus - (delay (.availableProcessors (Runtime/getRuntime)))) +(defn- string-strack-trace + [err] + (with-out-str + (.printStackTrace err (java.io.PrintWriter. *out*)))) (def ^:private sql:update-failed-task "update tasks - set scheduled_at = now() + cast($1::text as interval), + set scheduled_at = clock_timestamp() + '5 seconds'::interval, + error_text = $1 status = 'error' retry_num = retry_num + 1 where id = $2;") (defn- reschedule - [conn task] - (let [duration (io.vertx.pgclient.data.Interval/of 0 0 0 0 0 5) - sqlv [sql:update-failed-task duration (:id task)]] + [conn task error] + (let [error (string-strack-trace error) + sqlv [sql:update-failed-task error (:id task)]] (-> (db/query-one conn sqlv) (p/then' (constantly nil))))) @@ -64,6 +67,7 @@ (def ^:private sql:select-next-task "select * from tasks as t where t.scheduled_at <= now() + and t.queue = $1 and (t.status = 'new' or (t.status = 'error' and t.retry_num < 3)) order by t.scheduled_at limit 1 @@ -76,16 +80,16 @@ props (assoc :props (blob/decode props))))) (defn- event-loop - [{:keys [handlers] :as opts}] + [{:keys [handlers queue] :as opts}] (db/with-atomic [conn db/pool] - (-> (db/query-one conn sql:select-next-task) + (-> (db/query-one conn [sql:select-next-task queue]) (p/then decode-task-row) (p/then (fn [item] (when item (-> (p/do! (handle-task handlers item)) (p/handle (fn [v e] (if e - (reschedule conn item) + (reschedule conn item e) (mark-as-completed conn item)))) (p/then' (constantly ::handled))))))))) @@ -100,29 +104,37 @@ (event-loop-handler (assoc opts ::counter (inc counter)))))))) (def ^:private sql:insert-new-task - "insert into tasks (name, props, scheduled_at) - values ($1, $2, now()+cast($3::text as interval)) returning id") + "insert into tasks (name, props, queue, scheduled_at) + values ($1, $2, $3, clock_timestamp()+cast($4::text as interval)) + returning id") (defn schedule! - [conn {:keys [name delay props] :as task}] - (let [delay (tm/duration delay) + [conn {:keys [name delay props queue] :as task}] + (let [queue (if (string? queue) queue "default") + delay (tm/duration delay) duration (->> (/ (.toMillis ^Duration delay) 1000.0) (format "%s seconds")) props (blob/encode props)] - (-> (db/query-one conn [sql:insert-new-task name props duration]) + (-> (db/query-one conn [sql:insert-new-task name props queue duration]) (p/then' (fn [task] (:id task)))))) (defn- on-start - [ctx handlers] + [ctx queue handlers] (vt/schedule! ctx {::vt/fn #'event-loop-handler ::vt/delay 1000 ::vt/repeat true :max-batch-size 10 + :queue queue :handlers handlers})) +(s/def ::tasks (s/coll-of (s/or :fn fn? :var var?))) +(s/def ::queue ::us/string) +(s/def ::verticle-options + (s/keys :req-un [::tasks ::queue])) + (defn verticle - [tasks] - (s/assert (s/coll-of (s/or :fn fn? :var var?)) tasks) + [{:keys [tasks queue] :as options}] + (s/assert ::verticle-options options) (let [handlers (reduce (fn [acc f] (let [task-name (:uxbox.tasks/name (meta f))] (if task-name @@ -132,6 +144,6 @@ acc)))) {} tasks) - on-start #(on-start % handlers)] + on-start #(on-start % queue handlers)] (vc/verticle {:on-start on-start})))