diff --git a/backend/resources/migrations/0007.tasks.sql b/backend/resources/migrations/0007.tasks.sql new file mode 100644 index 0000000000..b6adb3a1be --- /dev/null +++ b/backend/resources/migrations/0007.tasks.sql @@ -0,0 +1,17 @@ +CREATE TABLE IF NOT EXISTS tasks ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + + created_at timestamptz NOT NULL DEFAULT clock_timestamp(), + modified_at timestamptz NOT NULL DEFAULT clock_timestamp(), + completed_at timestamptz NULL DEFAULT NULL, + scheduled_at timestamptz NOT NULL, + + name text NOT NULL, + props bytea NOT NULL, + + retry_num smallint NOT NULL DEFAULT 0, + status text NOT NULL DEFAULT 'new' +); + +CREATE INDEX tasks__scheduled_at__idx + ON tasks (scheduled_at); diff --git a/backend/src/uxbox/emails.clj b/backend/src/uxbox/emails.clj index dafedba3a8..652d321cc5 100644 --- a/backend/src/uxbox/emails.clj +++ b/backend/src/uxbox/emails.clj @@ -13,9 +13,9 @@ [uxbox.common.exceptions :as ex] [uxbox.common.spec :as us] [uxbox.db :as db] + [uxbox.tasks :as tasks] [uxbox.media :as media] - [uxbox.util.emails :as emails] - [uxbox.util.blob :as blob])) + [uxbox.util.emails :as emails])) ;; --- Defaults @@ -33,19 +33,17 @@ (defn send! "Schedule the email for sending." - [email context] - (us/assert fn? email) - (us/assert map? context) - (let [defaults {:from (:email-from cfg/config) - :reply-to (:email-reply-to cfg/config)} - data (->> (merge defaults context) - (email) - (blob/encode)) - priority (case (:priority context :high) :low 1 :high 10) - sql "insert into email_queue (data, priority) - values ($1, $2) returning *"] - (-> (db/query-one db/pool [sql data priority]) - (p/then' (constantly nil))))) + ([email context] (send! db/pool email context)) + ([conn email context] + (us/assert fn? email) + (us/assert map? context) + (let [defaults {:from (:email-from cfg/config) + :reply-to (:email-reply-to cfg/config)} + data (->> (merge defaults context) + (email))] + (tasks/schedule! conn {:name "sendmail" + :delay 0 + :props data})))) ;; --- Emails diff --git a/backend/src/uxbox/http.clj b/backend/src/uxbox/http.clj index 64bd9275da..f4098841a9 100644 --- a/backend/src/uxbox/http.clj +++ b/backend/src/uxbox/http.clj @@ -9,8 +9,8 @@ [clojure.tools.logging :as log] [mount.core :as mount :refer [defstate]] [promesa.core :as p] - [uxbox.config :as cfg] [uxbox.core :refer [system]] + [uxbox.config :as cfg] [uxbox.http.errors :as errors] [uxbox.http.interceptors :as interceptors] [uxbox.http.session :as session] @@ -23,13 +23,6 @@ [vertx.web :as vw] [vertx.web.interceptors :as vxi])) -(declare login-handler) -(declare logout-handler) -(declare register-handler) -(declare mutation-handler) -(declare query-handler) -(declare echo-handler) - (defn- on-start [ctx] (let [cors-opts {:origin (:http-server-cors cfg/config "http://localhost:3449") @@ -78,9 +71,11 @@ (vh/server ctx {:handler handler :port (:http-server-port cfg/config)}))) -(defstate instances - :start (.availableProcessors (Runtime/getRuntime))) +(def num-cpus + (delay (.availableProcessors (Runtime/getRuntime)))) (defstate server - :start (let [factory (vc/verticle {:on-start on-start})] - @(vc/deploy! system factory {:instances instances}))) + :start (let [vf (vc/verticle {:on-start on-start})] + @(vc/deploy! system vf {:instances @num-cpus}))) + + diff --git a/backend/src/uxbox/jobs/gc.clj b/backend/src/uxbox/jobs/gc.clj deleted file mode 100644 index 04ae9f6ef3..0000000000 --- a/backend/src/uxbox/jobs/gc.clj +++ /dev/null @@ -1,38 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) 2016-2019 Andrey Antukh - -(ns uxbox.jobs.gc - (:require - [promesa.core :as p] - [uxbox.core :refer [system]] - [uxbox.db :as db] - [uxbox.util.jobs :as uj] - [mount.core :as mount :refer [defstate]])) - -;; TODO: add images-gc -;; TODO: add icons-gc -;; TODO: add pages-gc - -;; --- Delete Projects - -(def sql:remove-deleted-projects - "DELETE FROM projects - WHERE deleted_at is not null - AND (now()-deleted_at)::interval > '10 day'::interval - RETURNING id;") - -(defn clean-deleted-projects - "Clean deleted projects." - [opts] - (db/with-atomic [conn db/pool] - (-> (db/query-one conn sql:remove-deleted-projects) - (p/then (constantly nil))))) - -(defstate projects-cleaner-task - :start (uj/schedule! system #'clean-deleted-projects - {::uj/interval 3600000})) ;; 1h - - diff --git a/backend/src/uxbox/jobs/sendmail.clj b/backend/src/uxbox/jobs/sendmail.clj deleted file mode 100644 index 9c8cd570fa..0000000000 --- a/backend/src/uxbox/jobs/sendmail.clj +++ /dev/null @@ -1,137 +0,0 @@ -;; This Source Code Form is subject to the terms of the Mozilla Public -;; License, v. 2.0. If a copy of the MPL was not distributed with this -;; file, You can obtain one at http://mozilla.org/MPL/2.0/. -;; -;; Copyright (c) 2016-2019 Andrey Antukh - -(ns uxbox.jobs.sendmail - "Email sending jobs." - (:require - [clojure.tools.logging :as log] - [cuerdas.core :as str] - [promesa.core :as p] - [uxbox.core :refer [system]] - [postal.core :as postal] - [uxbox.common.exceptions :as ex] - [uxbox.config :as cfg] - [uxbox.db :as db] - [uxbox.util.jobs :as uj] - [uxbox.util.blob :as blob] - [mount.core :as mount :refer [defstate]])) - -;; TODO: implement low priority sending emails. - -(defn- decode-email-row - [{:keys [data] :as row}] - (when row - (cond-> row - data (assoc :data (blob/decode data))))) - -(defn- fetch-emails - [conn] - (let [sql "select eq.* from email_queue as eq - where eq.status = 'pending' - and eq.priority = 10 - and eq.deleted_at is null - order by eq.priority desc, - eq.created_at desc;"] - (-> (db/query conn sql) - (p/then (partial mapv decode-email-row))))) - -(defn- fetch-failed-emails - [conn] - (let [sql "select eq.* from email_queue as eq - where eq.status = 'failed' - and eq.deleted_at is null - and eq.retries < $1 - order by eq.priority desc, - eq.created_at desc;"] - (-> (db/query conn sql) - (p/then (partial mapv decode-email-row))))) - -(defn- mark-email-as-sent - [conn id] - (let [sql "update email_queue - set status = 'ok' - where id = $1"] - (-> (db/query-one conn [sql id]) - (p/then (constantly nil))))) - -(defn- mark-email-as-failed - [conn id] - (let [sql "update email_queue - set status = 'failed', - retries = retries + 1 - where id = $1 - and deleted_at is null;"] - (-> (db/query-one conn [sql id]) - (p/then (constantly nil))))) - -(defn- get-smtp-config - [config] - {:host (:smtp-host config) - :port (:smtp-port config) - :user (:smtp-user config) - :pass (:smtp-password config) - :ssl (:smtp-ssl config) - :tls (:smtp-tls config) - :enabled (:smtp-enabled config)}) - -(defn- send-email-to-console - [email] - (let [out (with-out-str - (println "email console dump:") - (println "******** start email" (:id email) "**********") - (println " from: " (:from email)) - (println " to: " (:to email "---")) - (println " reply-to: " (:reply-to email)) - (println " subject: " (:subject email)) - (println " content:") - (doseq [item (rest (:body email))] - (when (str/starts-with? (:type item) "text/plain") - (println (:content item)))) - (println "******** end email "(:id email) "**********"))] - (log/info out) - {:error :SUCCESS})) - -(defn impl-sendmail - [email] - (p/future - (let [config (get-smtp-config cfg/config) - result (if (:enabled config) - (postal/send-message config email) - (send-email-to-console email))] - (when (not= (:error result) :SUCCESS) - (ex/raise :type :sendmail-error - :code :email-not-sent - :context result)) - nil))) - -(defn send-email - [conn {:keys [id data] :as entry}] - (-> (impl-sendmail data) - (p/handle (fn [v e] - (if e - (do - (log/error e "Error on sending email" id) - (mark-email-as-failed conn id)) - (mark-email-as-sent conn id)))))) - - -;; --- Main Task Functions - -(defn send-emails - [opts] - (db/with-atomic [conn db/pool] - (p/let [items (fetch-emails conn)] - (-> (p/run! (partial send-email conn) items) - (p/then' (constantly (count items))))))) - -(defn send-failed-emails - [opts] - (db/with-atomic [conn db/pool] - (p/let [items (fetch-failed-emails conn)] - (p/run! (partial send-email conn) items)))) - -(defstate sendmail-task - :start (uj/schedule! system #'send-emails {::uj/interval (* 10 1000)})) ;; 20s diff --git a/backend/src/uxbox/main.clj b/backend/src/uxbox/main.clj index 3a9ec4ce49..b8e6fc3e34 100644 --- a/backend/src/uxbox/main.clj +++ b/backend/src/uxbox/main.clj @@ -8,15 +8,16 @@ ;; Copyright (c) 2016-2020 Andrey Antukh (ns uxbox.main - (:require [mount.core :as mount] - [uxbox.config :as cfg] - [uxbox.migrations] - [uxbox.db] - [uxbox.http] - #_[uxbox.scheduled-jobs]) + (:require + [mount.core :as mount] + [uxbox.config :as cfg] + [uxbox.migrations] + [uxbox.db] + [uxbox.http] + #_[uxbox.scheduled-jobs]) (:gen-class)) -;; --- Entry point (only for uberjar) +;; --- Entry point (defn -main [& args] diff --git a/backend/src/uxbox/migrations.clj b/backend/src/uxbox/migrations.clj index 22de8de269..aec662b42c 100644 --- a/backend/src/uxbox/migrations.clj +++ b/backend/src/uxbox/migrations.clj @@ -35,6 +35,9 @@ {:desc "Initial icons tables" :name "0006-icons" :fn (mg/resource "migrations/0006.icons.sql")} + {:desc "Initial tasks tables" + :name "0007-tasks" + :fn (mg/resource "migrations/0007.tasks.sql")} ]}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/uxbox/tasks.clj b/backend/src/uxbox/tasks.clj new file mode 100644 index 0000000000..1282cf58b1 --- /dev/null +++ b/backend/src/uxbox/tasks.clj @@ -0,0 +1,50 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 Andrey Antukh + +(ns uxbox.tasks + "Async tasks abstraction (impl)." + (:require + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [mount.core :as mount :refer [defstate]] + [promesa.core :as p] + [uxbox.common.spec :as us] + [uxbox.config :as cfg] + [uxbox.core :refer [system]] + [uxbox.db :as db] + [uxbox.tasks.demo-gc] + [uxbox.tasks.sendmail] + [uxbox.tasks.impl :as impl] + [vertx.core :as vc] + [vertx.timers :as vt])) + +;; --- Public API + +(s/def ::name ::us/string) +(s/def ::delay ::us/integer) +(s/def ::props map?) +(s/def ::task-spec + (s/keys :req-un [::name ::delay] :opt-un [::props])) + +(defn schedule! + ([task] (schedule! db/pool task)) + ([conn task] + (us/assert ::task-spec task) + (impl/schedule! conn task))) + +;; --- State initialization + +(def ^:private tasks + [#'uxbox.tasks.demo-gc/handler + #'uxbox.tasks.sendmail/handler]) + +(defstate tasks + :start (as-> (impl/verticle tasks) $$ + (vc/deploy! system $$ {:instances 1}) + (deref $$))) diff --git a/backend/src/uxbox/tasks/demo_gc.clj b/backend/src/uxbox/tasks/demo_gc.clj new file mode 100644 index 0000000000..4c8b569a6e --- /dev/null +++ b/backend/src/uxbox/tasks/demo_gc.clj @@ -0,0 +1,20 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 Andrey Antukh + +(ns uxbox.tasks.demo-gc + "Demo accounts garbage collector." + (:require + [clojure.tools.logging :as log])) + +;; TODO + +(defn handler + {:uxbox.tasks/name "demo-gc"} + [{:keys [props] :as task}] + ) diff --git a/backend/src/uxbox/tasks/gc.clj b/backend/src/uxbox/tasks/gc.clj new file mode 100644 index 0000000000..0fd6ce7492 --- /dev/null +++ b/backend/src/uxbox/tasks/gc.clj @@ -0,0 +1,50 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2016-2020 Andrey Antukh + +(ns uxbox.tasks.gc + (:require + [clojure.tools.logging :as log] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [postal.core :as postal] + [promesa.core :as p] + [uxbox.common.exceptions :as ex] + [uxbox.common.spec :as us] + [uxbox.config :as cfg] + [uxbox.db :as db] + [uxbox.util.blob :as blob])) + +;; TODO: add images-gc with proper resource removal +;; TODO: add icons-gc +;; TODO: add pages-gc +;; TODO: test this + +;; --- Delete Projects + +(def ^:private sql:delete-project + "delete from projects + where id = $1 + and deleted_at is not null;") + +(s/def ::id ::us/uuid) +(s/def ::delete-project + (s/keys :req-un [::id])) + +(defn- delete-project + "Clean deleted projects." + [{:keys [id] :as props}] + (us/assert ::delete-project props) + (db/with-atomic [conn db/pool] + (-> (db/query-one conn [sql:delete-project id]) + (p/then (constantly nil))))) + +(defn handler + {:uxbox.tasks/name "delete-project"} + [{:keys [props] :as task}] + (delete-project props)) diff --git a/backend/src/uxbox/tasks/impl.clj b/backend/src/uxbox/tasks/impl.clj new file mode 100644 index 0000000000..50f51f0d7d --- /dev/null +++ b/backend/src/uxbox/tasks/impl.clj @@ -0,0 +1,137 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 Andrey Antukh + +(ns uxbox.tasks.impl + "Async tasks implementation." + (:require + [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] + [mount.core :as mount :refer [defstate]] + [promesa.core :as p] + [uxbox.common.spec :as us] + [uxbox.config :as cfg] + [uxbox.core :refer [system]] + [uxbox.db :as db] + [uxbox.util.blob :as blob] + [uxbox.util.time :as tm] + [vertx.core :as vc] + [vertx.timers :as vt]) + (:import java.time.Duration)) + +(def ^:private num-cpus + (delay (.availableProcessors (Runtime/getRuntime)))) + +(def ^:private sql:update-failed-task + "update tasks + set scheduled_at = now() + cast($1::text as interval), + status = 'error' + retry_num = retry_num + 1 + where id = $2;") + +(defn- reschedule + [conn task] + (let [duration (io.vertx.pgclient.data.Interval/of 0 0 0 0 0 5) + sqlv [sql:update-failed-task duration (:id task)]] + (-> (db/query-one conn sqlv) + (p/then' (constantly nil))))) + +(def ^:private sql:update-completed-task + "update tasks + set completed_at = clock_timestamp(), + status = 'completed' + where id = $1") + +(defn- mark-as-completed + [conn task] + (-> (db/query-one conn [sql:update-completed-task (:id task)]) + (p/then' (constantly nil)))) + +(defn- handle-task + [handlers {:keys [name] :as task}] + (let [task-fn (get handlers name)] + (if task-fn + (task-fn task) + (do + (log/warn "no task handler found for" (pr-str name)) + nil)))) + +(def ^:private sql:select-next-task + "select * from tasks as t + where t.scheduled_at <= now() + and (t.status = 'new' or (t.status = 'error' and t.retry_num < 3)) + order by t.scheduled_at + limit 1 + for update skip locked") + +(defn- decode-task-row + [{:keys [props] :as row}] + (when row + (cond-> row + props (assoc :props (blob/decode props))))) + +(defn- event-loop + [{:keys [handlers] :as opts}] + (db/with-atomic [conn db/pool] + (-> (db/query-one conn sql:select-next-task) + (p/then decode-task-row) + (p/then (fn [item] + (when item + (-> (p/do! (handle-task handlers item)) + (p/handle (fn [v e] + (if e + (reschedule conn item) + (mark-as-completed conn item)))) + (p/then' (constantly ::handled))))))))) + +(defn- event-loop-handler + [{:keys [::counter max-barch-size] + :or {counter 1 max-barch-size 10} + :as opts}] + (-> (event-loop opts) + (p/then (fn [result] + (when (and (= result ::handled) + (> max-barch-size counter)) + (event-loop-handler (assoc opts ::counter (inc counter)))))))) + +(def ^:private sql:insert-new-task + "insert into tasks (name, props, scheduled_at) + values ($1, $2, now()+cast($3::text as interval)) returning id") + +(defn schedule! + [conn {:keys [name delay props] :as task}] + (let [delay (tm/duration delay) + duration (->> (/ (.toMillis ^Duration delay) 1000.0) + (format "%s seconds")) + props (blob/encode props)] + (-> (db/query-one conn [sql:insert-new-task name props duration]) + (p/then' (fn [task] (:id task)))))) + +(defn- on-start + [ctx handlers] + (vt/schedule! ctx {::vt/fn #'event-loop-handler + ::vt/delay 1000 + ::vt/repeat true + :max-batch-size 10 + :handlers handlers})) + +(defn verticle + [tasks] + (s/assert (s/coll-of (s/or :fn fn? :var var?)) tasks) + (let [handlers (reduce (fn [acc f] + (let [task-name (:uxbox.tasks/name (meta f))] + (if task-name + (assoc acc task-name f) + (do + (log/warn "skiping task, no name provided in metadata" (pr-str f)) + acc)))) + {} + tasks) + on-start #(on-start % handlers)] + (vc/verticle {:on-start on-start}))) + diff --git a/backend/src/uxbox/tasks/sendmail.clj b/backend/src/uxbox/tasks/sendmail.clj new file mode 100644 index 0000000000..c4b29ebb77 --- /dev/null +++ b/backend/src/uxbox/tasks/sendmail.clj @@ -0,0 +1,66 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 Andrey Antukh + +(ns uxbox.tasks.sendmail + "Email sending jobs." + (:require + [clojure.tools.logging :as log] + [cuerdas.core :as str] + [postal.core :as postal] + [promesa.core :as p] + [uxbox.common.exceptions :as ex] + [uxbox.config :as cfg] + [uxbox.core :refer [system]] + [uxbox.util.blob :as blob])) + +(defn- get-smtp-config + [config] + {:host (:smtp-host config) + :port (:smtp-port config) + :user (:smtp-user config) + :pass (:smtp-password config) + :ssl (:smtp-ssl config) + :tls (:smtp-tls config) + :enabled (:smtp-enabled config)}) + +(defn- send-email-to-console + [email] + (let [out (with-out-str + (println "email console dump:") + (println "******** start email" (:id email) "**********") + (println " from: " (:from email)) + (println " to: " (:to email "---")) + (println " reply-to: " (:reply-to email)) + (println " subject: " (:subject email)) + (println " content:") + (doseq [item (rest (:body email))] + (when (str/starts-with? (:type item) "text/plain") + (println (:content item)))) + (println "******** end email "(:id email) "**********"))] + (log/info out) + {:error :SUCCESS})) + +(defn send-email + [email] + (p/future + (let [config (get-smtp-config cfg/config) + result (if (:enabled config) + (postal/send-message config email) + (send-email-to-console email))] + (when (not= (:error result) :SUCCESS) + (ex/raise :type :sendmail-error + :code :email-not-sent + :context result)) + nil))) + +(defn handler + {:uxbox.tasks/name "sendmail"} + [{:keys [props] :as task}] + (send-email props)) +