diff --git a/backend/deps.edn b/backend/deps.edn
index b57ee55ec1..b75718a73f 100644
--- a/backend/deps.edn
+++ b/backend/deps.edn
@@ -29,8 +29,6 @@
org.postgresql/postgresql {:mvn/version "42.4.0"}
com.zaxxer/HikariCP {:mvn/version "5.0.1"}
- funcool/datoteka {:mvn/version "3.0.64"}
-
buddy/buddy-hashers {:mvn/version "1.8.158"}
buddy/buddy-sign {:mvn/version "3.4.333"}
diff --git a/backend/resources/log4j2-devenv.xml b/backend/resources/log4j2-devenv.xml
index 7cc9abe3b3..fe69b81978 100644
--- a/backend/resources/log4j2-devenv.xml
+++ b/backend/resources/log4j2-devenv.xml
@@ -30,6 +30,8 @@
+
+
diff --git a/backend/scripts/repl b/backend/scripts/repl
index d200ae3f34..984c87bdd4 100755
--- a/backend/scripts/repl
+++ b/backend/scripts/repl
@@ -2,7 +2,7 @@
export PENPOT_HOST=devenv
export PENPOT_TENANT=dev
-export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enable-transit-readable-response enable-demo-users disable-secure-session-cookies"
+export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enable-transit-readable-response enable-demo-users disable-secure-session-cookies enable-rpc-rate-limit enable-warn-rpc-rate-limits"
# export PENPOT_DATABASE_URI="postgresql://172.17.0.1:5432/penpot"
# export PENPOT_DATABASE_USERNAME="penpot"
@@ -16,6 +16,8 @@ export PENPOT_FLAGS="$PENPOT_FLAGS enable-backend-asserts enable-audit-log enabl
# export PENPOT_LOGGERS_LOKI_URI="http://172.17.0.1:3100/loki/api/v1/push"
# export PENPOT_AUDIT_LOG_ARCHIVE_URI="http://localhost:6070/api/audit"
+export PENPOT_DEFAULT_RATE_LIMIT="default,window,10000/h"
+
# Initialize MINIO config
mc alias set penpot-s3/ http://minio:9000 minioadmin minioadmin
mc admin user add penpot-s3 penpot-devenv penpot-devenv
diff --git a/backend/src/app/auth/oidc.clj b/backend/src/app/auth/oidc.clj
index 39572bb18c..d660eef25f 100644
--- a/backend/src/app/auth/oidc.clj
+++ b/backend/src/app/auth/oidc.clj
@@ -15,9 +15,11 @@
[app.common.uri :as u]
[app.config :as cf]
[app.db :as db]
+ [app.http.client :as http]
[app.http.middleware :as hmw]
[app.loggers.audit :as audit]
[app.rpc.queries.profile :as profile]
+ [app.tokens :as tokens]
[app.util.json :as json]
[app.util.time :as dt]
[app.worker :as wrk]
@@ -47,7 +49,7 @@
(defn- discover-oidc-config
[{:keys [http-client]} {:keys [base-uri] :as opts}]
(let [discovery-uri (u/join base-uri ".well-known/openid-configuration")
- response (ex/try (http-client {:method :get :uri (str discovery-uri)} {:sync? true}))]
+ response (ex/try (http/req! http-client {:method :get :uri (str discovery-uri)} {:sync? true}))]
(cond
(ex/exception? response)
(do
@@ -158,10 +160,10 @@
(defn- retrieve-github-email
[{:keys [http-client]} tdata info]
(or (some-> info :email p/resolved)
- (-> (http-client {:uri "https://api.github.com/user/emails"
- :headers {"Authorization" (dm/str (:type tdata) " " (:token tdata))}
- :timeout 6000
- :method :get})
+ (-> (http/req! http-client {:uri "https://api.github.com/user/emails"
+ :headers {"Authorization" (dm/str (:type tdata) " " (:token tdata))}
+ :timeout 6000
+ :method :get})
(p/then (fn [{:keys [status body] :as response}]
(when-not (s/int-in-range? 200 300 status)
(ex/raise :type :internal
@@ -278,7 +280,7 @@
:uri (:token-uri provider)
:body (u/map->query-string params)}]
(p/then
- (http-client req)
+ (http/req! http-client req)
(fn [{:keys [status body] :as res}]
(if (= status 200)
(let [data (json/read body)]
@@ -292,11 +294,10 @@
(defn- retrieve-user-info
[{:keys [provider http-client] :as cfg} tdata]
(letfn [(retrieve []
- (http-client {:uri (:user-uri provider)
- :headers {"Authorization" (str (:type tdata) " " (:token tdata))}
- :timeout 6000
- :method :get}))
-
+ (http/req! http-client {:uri (:user-uri provider)
+ :headers {"Authorization" (str (:type tdata) " " (:token tdata))}
+ :timeout 6000
+ :method :get}))
(validate-response [response]
(when-not (s/int-in-range? 200 300 (:status response))
(ex/raise :type :internal
@@ -353,7 +354,7 @@
::props]))
(defn retrieve-info
- [{:keys [tokens provider] :as cfg} {:keys [params] :as request}]
+ [{:keys [sprops provider] :as cfg} {:keys [params] :as request}]
(letfn [(validate-oidc [info]
;; If the provider is OIDC, we can proceed to check
;; roles if they are defined.
@@ -392,7 +393,7 @@
(let [state (get params :state)
code (get params :code)
- state (tokens :verify {:token state :iss :oauth})]
+ state (tokens/verify sprops {:token state :iss :oauth})]
(-> (p/resolved code)
(p/then #(retrieve-access-token cfg %))
(p/then #(retrieve-user-info cfg %))
@@ -420,13 +421,13 @@
(redirect-response uri)))
(defn- generate-redirect
- [{:keys [tokens session audit] :as cfg} request info profile]
+ [{:keys [sprops session audit] :as cfg} request info profile]
(if profile
(let [sxf ((:create session) (:id profile))
token (or (:invitation-token info)
- (tokens :generate {:iss :auth
- :exp (dt/in-future "15m")
- :profile-id (:id profile)}))
+ (tokens/generate sprops {:iss :auth
+ :exp (dt/in-future "15m")
+ :profile-id (:id profile)}))
params {:token token}
uri (-> (u/uri (:public-uri cfg))
@@ -448,7 +449,7 @@
:iss :prepared-register
:is-active true
:exp (dt/in-future {:hours 48}))
- token (tokens :generate info)
+ token (tokens/generate sprops info)
params (d/without-nils
{:token token
:fullname (:fullname info)})
@@ -458,13 +459,13 @@
(redirect-response uri))))
(defn- auth-handler
- [{:keys [tokens] :as cfg} {:keys [params] :as request}]
+ [{:keys [sprops] :as cfg} {:keys [params] :as request}]
(let [props (audit/extract-utm-params params)
- state (tokens :generate
- {:iss :oauth
- :invitation-token (:invitation-token params)
- :props props
- :exp (dt/in-future "15m")})
+ state (tokens/generate sprops
+ {:iss :oauth
+ :invitation-token (:invitation-token params)
+ :props props
+ :exp (dt/in-future "15m")})
uri (build-auth-uri cfg state)]
(yrs/response 200 {:redirect-uri uri})))
@@ -496,16 +497,16 @@
:hint "provider not configured"))))))})
(s/def ::public-uri ::us/not-empty-string)
-(s/def ::http-client fn?)
+(s/def ::http-client ::http/client)
(s/def ::session map?)
-(s/def ::tokens fn?)
+(s/def ::sprops map?)
(s/def ::providers map?)
(defmethod ig/pre-init-spec ::routes
[_]
(s/keys :req-un [::public-uri
::session
- ::tokens
+ ::sprops
::http-client
::providers
::db/pool
diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj
index 5bb0119b2a..0d6fd2e2a4 100644
--- a/backend/src/app/config.clj
+++ b/backend/src/app/config.clj
@@ -20,6 +20,7 @@
[clojure.pprint :as pprint]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
+ [datoteka.fs :as fs]
[environ.core :refer [env]]
[integrant.core :as ig]))
@@ -83,16 +84,18 @@
;; a server prop key where initial project is stored.
:initial-project-skey "initial-project"})
+(s/def ::default-rpc-rlimit ::us/vector-of-strings)
+(s/def ::rpc-rlimit-config ::fs/path)
(s/def ::media-max-file-size ::us/integer)
-(s/def ::flags ::us/vec-of-valid-keywords)
+(s/def ::flags ::us/vector-of-keywords)
(s/def ::telemetry-enabled ::us/boolean)
(s/def ::audit-log-archive-uri ::us/string)
(s/def ::audit-log-gc-max-age ::dt/duration)
-(s/def ::admins ::us/set-of-non-empty-strings)
+(s/def ::admins ::us/set-of-strings)
(s/def ::file-change-snapshot-every ::us/integer)
(s/def ::file-change-snapshot-timeout ::dt/duration)
@@ -131,8 +134,8 @@
(s/def ::oidc-token-uri ::us/string)
(s/def ::oidc-auth-uri ::us/string)
(s/def ::oidc-user-uri ::us/string)
-(s/def ::oidc-scopes ::us/set-of-non-empty-strings)
-(s/def ::oidc-roles ::us/set-of-non-empty-strings)
+(s/def ::oidc-scopes ::us/set-of-strings)
+(s/def ::oidc-roles ::us/set-of-strings)
(s/def ::oidc-roles-attr ::us/keyword)
(s/def ::oidc-email-attr ::us/keyword)
(s/def ::oidc-name-attr ::us/keyword)
@@ -165,11 +168,14 @@
(s/def ::profile-complaint-threshold ::us/integer)
(s/def ::public-uri ::us/string)
(s/def ::redis-uri ::us/string)
-(s/def ::registration-domain-whitelist ::us/set-of-non-empty-strings)
-(s/def ::rlimit-font ::us/integer)
-(s/def ::rlimit-file-update ::us/integer)
-(s/def ::rlimit-image ::us/integer)
-(s/def ::rlimit-password ::us/integer)
+(s/def ::registration-domain-whitelist ::us/set-of-strings)
+
+
+
+(s/def ::rpc-semaphore-permits-font ::us/integer)
+(s/def ::rpc-semaphore-permits-file-update ::us/integer)
+(s/def ::rpc-semaphore-permits-image ::us/integer)
+(s/def ::rpc-semaphore-permits-password ::us/integer)
(s/def ::smtp-default-from ::us/string)
(s/def ::smtp-default-reply-to ::us/string)
(s/def ::smtp-host ::us/string)
@@ -217,6 +223,7 @@
::database-min-pool-size
::database-max-pool-size
::default-blob-version
+ ::default-rpc-rlimit
::error-report-webhook
::default-executor-parallelism
::blocking-executor-parallelism
@@ -272,10 +279,11 @@
::public-uri
::redis-uri
::registration-domain-whitelist
- ::rlimit-font
- ::rlimit-file-update
- ::rlimit-image
- ::rlimit-password
+ ::rpc-semaphore-permits-font
+ ::rpc-semaphore-permits-file-update
+ ::rpc-semaphore-permits-image
+ ::rpc-semaphore-permits-password
+ ::rpc-rlimit-config
::sentry-dsn
::sentry-debug
::sentry-attach-stack-trace
diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj
index 8785f272ea..d3f76df802 100644
--- a/backend/src/app/db.clj
+++ b/backend/src/app/db.clj
@@ -150,7 +150,7 @@
;; When metrics namespace is provided
(when metrics
- (->> (:registry metrics)
+ (->> (::mtx/registry metrics)
(PrometheusMetricsTrackerFactory.)
(.setMetricsTrackerFactory config)))
diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj
index 7bef64e789..a86227e60c 100644
--- a/backend/src/app/http.clj
+++ b/backend/src/app/http.clj
@@ -114,18 +114,18 @@
;; HTTP ROUTER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
-(s/def ::oauth map?)
-(s/def ::storage map?)
(s/def ::assets map?)
-(s/def ::feedback fn?)
-(s/def ::ws fn?)
(s/def ::audit-handler fn?)
(s/def ::awsns-handler fn?)
-(s/def ::session map?)
-(s/def ::rpc-routes (s/nilable vector?))
(s/def ::debug-routes (s/nilable vector?))
-(s/def ::oidc-routes (s/nilable vector?))
(s/def ::doc-routes (s/nilable vector?))
+(s/def ::feedback fn?)
+(s/def ::oauth map?)
+(s/def ::oidc-routes (s/nilable vector?))
+(s/def ::rpc-routes (s/nilable vector?))
+(s/def ::session map?)
+(s/def ::storage map?)
+(s/def ::ws fn?)
(defmethod ig/pre-init-spec ::router [_]
(s/keys :req-un [::mtx/metrics
@@ -151,7 +151,7 @@
[middleware/errors errors/handle]
[middleware/restrict-methods]]}
- ["/metrics" {:handler (:handler metrics)}]
+ ["/metrics" {:handler (::mtx/handler metrics)}]
["/assets" {:middleware [(:middleware session)]}
["/by-id/:id" {:handler (:objects-handler assets)}]
["/by-file-media-id/:id" {:handler (:file-objects-handler assets)}]
diff --git a/backend/src/app/http/awsns.clj b/backend/src/app/http/awsns.clj
index dd5bc35a3c..c14be0b268 100644
--- a/backend/src/app/http/awsns.clj
+++ b/backend/src/app/http/awsns.clj
@@ -11,6 +11,8 @@
[app.common.logging :as l]
[app.db :as db]
[app.db.sql :as sql]
+ [app.http.client :as http]
+ [app.tokens :as tokens]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
@@ -24,10 +26,11 @@
(declare parse-notification)
(declare process-report)
-(s/def ::http-client fn?)
+(s/def ::http-client ::http/client)
+(s/def ::sprops map?)
(defmethod ig/pre-init-spec ::handler [_]
- (s/keys :req-un [::db/pool ::http-client]))
+ (s/keys :req-un [::db/pool ::http-client ::sprops]))
(defmethod ig/init-key ::handler
[_ {:keys [executor] :as cfg}]
@@ -46,7 +49,7 @@
(let [surl (get body "SubscribeURL")
stopic (get body "TopicArn")]
(l/info :action "subscription received" :topic stopic :url surl)
- (http-client {:uri surl :method :post :timeout 10000} {:sync? true}))
+ (http/req! http-client {:uri surl :method :post :timeout 10000} {:sync? true}))
(= mtype "Notification")
(when-let [message (parse-json (get body "Message"))]
@@ -97,10 +100,10 @@
(get mail "headers")))
(defn- extract-identity
- [{:keys [tokens] :as cfg} headers]
+ [{:keys [sprops]} headers]
(let [tdata (get headers "x-penpot-data")]
(when-not (str/empty? tdata)
- (let [result (tokens :verify {:token tdata :iss :profile-identity})]
+ (let [result (tokens/verify sprops {:token tdata :iss :profile-identity})]
(:profile-id result)))))
(defn- parse-notification
diff --git a/backend/src/app/http/client.clj b/backend/src/app/http/client.clj
index 8718606b1d..e9cddbff5c 100644
--- a/backend/src/app/http/client.clj
+++ b/backend/src/app/http/client.clj
@@ -31,7 +31,6 @@
(http/send-async req {:client client :as response-type}))))
{::client client})))
-
(defn req!
"A convencience toplevel function for gradual migration to a new API
convention."
diff --git a/backend/src/app/http/errors.clj b/backend/src/app/http/errors.clj
index 852cd1cafc..7939b309b5 100644
--- a/backend/src/app/http/errors.clj
+++ b/backend/src/app/http/errors.clj
@@ -10,6 +10,7 @@
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.common.spec :as us]
+ [app.http :as-alias http]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[yetti.request :as yrq]
@@ -50,6 +51,11 @@
[err _]
(yrs/response 400 (ex-data err)))
+(defmethod handle-exception :rate-limit
+ [err _]
+ (let [headers (-> err ex-data ::http/headers)]
+ (yrs/response :status 429 :body "" :headers headers)))
+
(defmethod handle-exception :validation
[err _]
(let [{:keys [code] :as data} (ex-data err)]
diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj
index f3e4016415..20ec1b94b4 100644
--- a/backend/src/app/http/session.clj
+++ b/backend/src/app/http/session.clj
@@ -11,6 +11,7 @@
[app.config :as cf]
[app.db :as db]
[app.db.sql :as sql]
+ [app.tokens :as tokens]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
@@ -39,7 +40,7 @@
(delete-session [store key]))
(defn- make-database-store
- [{:keys [pool tokens executor]}]
+ [{:keys [pool sprops executor]}]
(reify ISessionStore
(read-session [_ token]
(px/with-dispatch executor
@@ -50,9 +51,9 @@
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
created-at (or (:created-at data) (dt/now))
- token (tokens :generate {:iss "authentication"
- :iat created-at
- :uid profile-id})
+ token (tokens/generate sprops {:iss "authentication"
+ :iat created-at
+ :uid profile-id})
params {:user-agent user-agent
:profile-id profile-id
:created-at created-at
@@ -68,14 +69,13 @@
{:id (:id data)})
(assoc data :updated-at updated-at))))
-
(delete-session [_ token]
(px/with-dispatch executor
(db/delete! pool :http-session {:id token})
nil))))
(defn make-inmemory-store
- [{:keys [tokens]}]
+ [{:keys [sprops]}]
(let [cache (atom {})]
(reify ISessionStore
(read-session [_ token]
@@ -86,9 +86,9 @@
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
created-at (or (:created-at data) (dt/now))
- token (tokens :generate {:iss "authentication"
- :iat created-at
- :uid profile-id})
+ token (tokens/generate sprops {:iss "authentication"
+ :iat created-at
+ :uid profile-id})
params {:user-agent user-agent
:created-at created-at
:updated-at created-at
@@ -108,9 +108,9 @@
(swap! cache dissoc token)
nil)))))
-(s/def ::tokens fn?)
+(s/def ::sprops map?)
(defmethod ig/pre-init-spec ::store [_]
- (s/keys :req-un [::db/pool ::wrk/executor ::tokens]))
+ (s/keys :req-un [::db/pool ::wrk/executor ::sprops]))
(defmethod ig/init-key ::store
[_ {:keys [pool] :as cfg}]
diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj
index dc1289a094..ac9de126bb 100644
--- a/backend/src/app/loggers/audit.clj
+++ b/backend/src/app/loggers/audit.clj
@@ -15,6 +15,7 @@
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
+ [app.tokens :as tokens]
[app.util.async :as aa]
[app.util.time :as dt]
[app.worker :as wrk]
@@ -237,10 +238,10 @@
(s/def ::http-client fn?)
(s/def ::uri ::us/string)
-(s/def ::tokens fn?)
+(s/def ::sprops map?)
(defmethod ig/pre-init-spec ::archive-task [_]
- (s/keys :req-un [::db/pool ::tokens ::http-client]
+ (s/keys :req-un [::db/pool ::sprops ::http-client]
:opt-un [::uri]))
(defmethod ig/init-key ::archive-task
@@ -276,7 +277,7 @@
for update skip locked;")
(defn archive-events
- [{:keys [pool uri tokens http-client] :as cfg}]
+ [{:keys [pool uri sprops http-client] :as cfg}]
(letfn [(decode-row [{:keys [props ip-addr context] :as row}]
(cond-> row
(db/pgobject? props)
@@ -300,9 +301,9 @@
:context]))
(send [events]
- (let [token (tokens :generate {:iss "authentication"
- :iat (dt/now)
- :uid uuid/zero})
+ (let [token (tokens/generate sprops {:iss "authentication"
+ :iat (dt/now)
+ :uid uuid/zero})
body (t/encode {:events events})
headers {"content-type" "application/transit+json"
"origin" (cf/get :public-uri)
diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj
index 47054d5d4e..45636727a3 100644
--- a/backend/src/app/main.clj
+++ b/backend/src/app/main.clj
@@ -64,13 +64,14 @@
:app.migrations/all
{:main (ig/ref :app.migrations/migrations)}
+ :app.redis/redis
+ {:uri (cf/get :redis-uri)
+ :metrics (ig/ref :app.metrics/metrics)}
+
:app.msgbus/msgbus
{:backend (cf/get :msgbus-backend :redis)
:executor (ig/ref [::default :app.worker/executor])
- :redis-uri (cf/get :redis-uri)}
-
- :app.tokens/tokens
- {:keys (ig/ref :app.setup/keys)}
+ :redis (ig/ref :app.redis/redis)}
:app.storage.tmp/cleaner
{:executor (ig/ref [::worker :app.worker/executor])
@@ -92,7 +93,7 @@
:app.http.session/store
{:pool (ig/ref :app.db/pool)
- :tokens (ig/ref :app.tokens/tokens)
+ :sprops (ig/ref :app.setup/props)
:executor (ig/ref [::default :app.worker/executor])}
:app.http.session/gc-task
@@ -100,7 +101,7 @@
:max-age (cf/get :auth-token-cookie-max-age)}
:app.http.awsns/handler
- {:tokens (ig/ref :app.tokens/tokens)
+ {:sprops (ig/ref :app.setup/props)
:pool (ig/ref :app.db/pool)
:http-client (ig/ref :app.http/client)
:executor (ig/ref [::worker :app.worker/executor])}
@@ -168,13 +169,14 @@
:github (ig/ref :app.auth.oidc/github-provider)
:gitlab (ig/ref :app.auth.oidc/gitlab-provider)
:oidc (ig/ref :app.auth.oidc/generic-provider)}
- :tokens (ig/ref :app.tokens/tokens)
+ :sprops (ig/ref :app.setup/props)
:http-client (ig/ref :app.http/client)
:pool (ig/ref :app.db/pool)
:session (ig/ref :app.http/session)
:public-uri (cf/get :public-uri)
:executor (ig/ref [::default :app.worker/executor])}
+ ;; TODO: revisit the dependencies of this service, looks they are too much unused of them
:app.http/router
{:assets (ig/ref :app.http.assets/handlers)
:feedback (ig/ref :app.http.feedback/handler)
@@ -186,7 +188,6 @@
:metrics (ig/ref :app.metrics/metrics)
:public-uri (cf/get :public-uri)
:storage (ig/ref :app.storage/storage)
- :tokens (ig/ref :app.tokens/tokens)
:audit-handler (ig/ref :app.loggers.audit/http-handler)
:rpc-routes (ig/ref :app.rpc/routes)
:doc-routes (ig/ref :app.rpc.doc/routes)
@@ -218,11 +219,12 @@
:app.rpc/methods
{:pool (ig/ref :app.db/pool)
:session (ig/ref :app.http/session)
- :tokens (ig/ref :app.tokens/tokens)
+ :sprops (ig/ref :app.setup/props)
:metrics (ig/ref :app.metrics/metrics)
:storage (ig/ref :app.storage/storage)
:msgbus (ig/ref :app.msgbus/msgbus)
:public-uri (cf/get :public-uri)
+ :redis (ig/ref :app.redis/redis)
:audit (ig/ref :app.loggers.audit/collector)
:ldap (ig/ref :app.auth.ldap/provider)
:http-client (ig/ref :app.http/client)
@@ -293,9 +295,6 @@
{:pool (ig/ref :app.db/pool)
:key (cf/get :secret-key)}
- :app.setup/keys
- {:props (ig/ref :app.setup/props)}
-
:app.loggers.zmq/receiver
{:endpoint (cf/get :loggers-zmq-uri)}
@@ -309,7 +308,7 @@
:app.loggers.audit/archive-task
{:uri (cf/get :audit-log-archive-uri)
- :tokens (ig/ref :app.tokens/tokens)
+ :sprops (ig/ref :app.setup/props)
:pool (ig/ref :app.db/pool)
:http-client (ig/ref :app.http/client)}
diff --git a/backend/src/app/media.clj b/backend/src/app/media.clj
index 99cbe15cdd..796047f106 100644
--- a/backend/src/app/media.clj
+++ b/backend/src/app/media.clj
@@ -20,7 +20,7 @@
[clojure.java.shell :as sh]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
- [datoteka.core :as fs])
+ [datoteka.fs :as fs])
(:import
org.im4java.core.ConvertCmd
org.im4java.core.IMOperation
diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj
index 4c5e10e198..e170f9af77 100644
--- a/backend/src/app/metrics.clj
+++ b/backend/src/app/metrics.clj
@@ -8,6 +8,8 @@
(:refer-clojure :exclude [run!])
(:require
[app.common.logging :as l]
+ [app.common.spec :as us]
+ [app.metrics.definition :as-alias mdef]
[clojure.spec.alpha :as s]
[integrant.core :as ig])
(:import
@@ -16,11 +18,12 @@
io.prometheus.client.Counter$Child
io.prometheus.client.Gauge
io.prometheus.client.Gauge$Child
- io.prometheus.client.Summary
- io.prometheus.client.Summary$Child
- io.prometheus.client.Summary$Builder
io.prometheus.client.Histogram
io.prometheus.client.Histogram$Child
+ io.prometheus.client.SimpleCollector
+ io.prometheus.client.Summary
+ io.prometheus.client.Summary$Builder
+ io.prometheus.client.Summary$Child
io.prometheus.client.exporter.common.TextFormat
io.prometheus.client.hotspot.DefaultExports
java.io.StringWriter))
@@ -28,7 +31,7 @@
(set! *warn-on-reflection* true)
(declare create-registry)
-(declare create)
+(declare create-collector)
(declare handler)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@@ -37,120 +40,151 @@
(def default-metrics
{:update-file-changes
- {:name "rpc_update_file_changes_total"
- :help "A total number of changes submitted to update-file."
- :type :counter}
+ {::mdef/name "penpot_rpc_update_file_changes_total"
+ ::mdef/help "A total number of changes submitted to update-file."
+ ::mdef/type :counter}
:update-file-bytes-processed
- {:name "rpc_update_file_bytes_processed_total"
- :help "A total number of bytes processed by update-file."
- :type :counter}
+ {::mdef/name "penpot_rpc_update_file_bytes_processed_total"
+ ::mdef/help "A total number of bytes processed by update-file."
+ ::mdef/type :counter}
:rpc-mutation-timing
- {:name "rpc_mutation_timing"
- :help "RPC mutation method call timming."
- :labels ["name"]
- :type :histogram}
+ {::mdef/name "penpot_rpc_mutation_timing"
+ ::mdef/help "RPC mutation method call timming."
+ ::mdef/labels ["name"]
+ ::mdef/type :histogram}
:rpc-command-timing
- {:name "rpc_command_timing"
- :help "RPC command method call timming."
- :labels ["name"]
- :type :histogram}
+ {::mdef/name "penpot_rpc_command_timing"
+ ::mdef/help "RPC command method call timming."
+ ::mdef/labels ["name"]
+ ::mdef/type :histogram}
:rpc-query-timing
- {:name "rpc_query_timing"
- :help "RPC query method call timing."
- :labels ["name"]
- :type :histogram}
+ {::mdef/name "penpot_rpc_query_timing"
+ ::mdef/help "RPC query method call timing."
+ ::mdef/labels ["name"]
+ ::mdef/type :histogram}
:websocket-active-connections
- {:name "websocket_active_connections"
- :help "Active websocket connections gauge"
- :type :gauge}
+ {::mdef/name "penpot_websocket_active_connections"
+ ::mdef/help "Active websocket connections gauge"
+ ::mdef/type :gauge}
:websocket-messages-total
- {:name "websocket_message_total"
- :help "Counter of processed messages."
- :labels ["op"]
- :type :counter}
+ {::mdef/name "penpot_websocket_message_total"
+ ::mdef/help "Counter of processed messages."
+ ::mdef/labels ["op"]
+ ::mdef/type :counter}
:websocket-session-timing
- {:name "websocket_session_timing"
- :help "Websocket session timing (seconds)."
- :type :summary}
+ {::mdef/name "penpot_websocket_session_timing"
+ ::mdef/help "Websocket session timing (seconds)."
+ ::mdef/type :summary}
:session-update-total
- {:name "http_session_update_total"
- :help "A counter of session update batch events."
- :type :counter}
+ {::mdef/name "penpot_http_session_update_total"
+ ::mdef/help "A counter of session update batch events."
+ ::mdef/type :counter}
:tasks-timing
- {:name "penpot_tasks_timing"
- :help "Background tasks timing (milliseconds)."
- :labels ["name"]
- :type :summary}
+ {::mdef/name "penpot_tasks_timing"
+ ::mdef/help "Background tasks timing (milliseconds)."
+ ::mdef/labels ["name"]
+ ::mdef/type :summary}
- :rlimit-queued-submissions
- {:name "penpot_rlimit_queued_submissions"
- :help "Current number of queued submissions on RLIMIT."
- :labels ["name"]
- :type :gauge}
+ :redis-eval-timing
+ {::mdef/name "penpot_redis_eval_timing"
+ ::mdef/help "Redis EVAL commands execution timings (ms)"
+ ::mdef/labels ["name"]
+ ::mdef/type :summary}
- :rlimit-used-permits
- {:name "penpot_rlimit_used_permits"
- :help "Current number of used permits on RLIMIT."
- :labels ["name"]
- :type :gauge}
+ :rpc-semaphore-queued-submissions
+ {::mdef/name "penpot_rpc_semaphore_queued_submissions"
+ ::mdef/help "Current number of queued submissions on RPC-SEMAPHORE."
+ ::mdef/labels ["name"]
+ ::mdef/type :gauge}
- :rlimit-acquires-total
- {:name "penpot_rlimit_acquires_total"
- :help "Total number of acquire operations on RLIMIT."
- :labels ["name"]
- :type :counter}
+ :rpc-semaphore-used-permits
+ {::mdef/name "penpot_rpc_semaphore_used_permits"
+ ::mdef/help "Current number of used permits on RPC-SEMAPHORE."
+ ::mdef/labels ["name"]
+ ::mdef/type :gauge}
+
+ :rpc-semaphore-acquires-total
+ {::mdef/name "penpot_rpc_semaphore_acquires_total"
+ ::mdef/help "Total number of acquire operations on RPC-SEMAPHORE."
+ ::mdef/labels ["name"]
+ ::mdef/type :counter}
:executors-active-threads
- {:name "penpot_executors_active_threads"
- :help "Current number of threads available in the executor service."
- :labels ["name"]
- :type :gauge}
+ {::mdef/name "penpot_executors_active_threads"
+ ::mdef/help "Current number of threads available in the executor service."
+ ::mdef/labels ["name"]
+ ::mdef/type :gauge}
:executors-completed-tasks
- {:name "penpot_executors_completed_tasks_total"
- :help "Aproximate number of completed tasks by the executor."
- :labels ["name"]
- :type :counter}
+ {::mdef/name "penpot_executors_completed_tasks_total"
+ ::mdef/help "Aproximate number of completed tasks by the executor."
+ ::mdef/labels ["name"]
+ ::mdef/type :counter}
:executors-running-threads
- {:name "penpot_executors_running_threads"
- :help "Current number of threads with state RUNNING."
- :labels ["name"]
- :type :gauge}
+ {::mdef/name "penpot_executors_running_threads"
+ ::mdef/help "Current number of threads with state RUNNING."
+ ::mdef/labels ["name"]
+ ::mdef/type :gauge}
:executors-queued-submissions
- {:name "penpot_executors_queued_submissions"
- :help "Current number of queued submissions."
- :labels ["name"]
- :type :gauge}})
+ {::mdef/name "penpot_executors_queued_submissions"
+ ::mdef/help "Current number of queued submissions."
+ ::mdef/labels ["name"]
+ ::mdef/type :gauge}})
+
+(s/def ::mdef/name string?)
+(s/def ::mdef/help string?)
+(s/def ::mdef/labels (s/every string? :kind vector?))
+(s/def ::mdef/type #{:gauge :counter :summary :histogram})
+
+(s/def ::mdef/instance
+ #(instance? SimpleCollector %))
+
+(s/def ::mdef/definition
+ (s/keys :req [::mdef/name
+ ::mdef/help
+ ::mdef/type]
+ :opt [::mdef/labels
+ ::mdef/instance]))
+
+(s/def ::definitions
+ (s/map-of keyword? ::mdef/definition))
+
+(s/def ::registry
+ #(instance? CollectorRegistry %))
+
+(s/def ::handler fn?)
+(s/def ::metrics
+ (s/keys :req [::registry
+ ::handler
+ ::definitions]))
(defmethod ig/init-key ::metrics
[_ _]
(l/info :action "initialize metrics")
(let [registry (create-registry)
definitions (reduce-kv (fn [res k v]
- (->> (assoc v :registry registry)
- (create)
+ (->> (assoc v ::registry registry)
+ (create-collector)
(assoc res k)))
{}
default-metrics)]
- {:handler (partial handler registry)
- :definitions definitions
- :registry registry}))
-(s/def ::handler fn?)
-(s/def ::registry #(instance? CollectorRegistry %))
-(s/def ::metrics
- (s/keys :req-un [::registry ::handler]))
+ (us/verify! ::definitions definitions)
+
+ {::handler (partial handler registry)
+ ::definitions definitions
+ ::registry registry}))
(defn- handler
[registry _ respond _]
@@ -174,13 +208,16 @@
(def default-histogram-buckets
[1 5 10 25 50 75 100 250 500 750 1000 2500 5000 7500])
+(defmulti run-collector! (fn [mdef _] (::mdef/type mdef)))
+(defmulti create-collector ::mdef/type)
+
(defn run!
- [{:keys [definitions]} {:keys [id] :as params}]
+ [{:keys [::definitions]} {:keys [id] :as params}]
(when-let [mobj (get definitions id)]
- ((::fn mobj) params)
+ (run-collector! mobj params)
true))
-(defn create-registry
+(defn- create-registry
[]
(let [registry (CollectorRegistry.)]
(DefaultExports/register registry)
@@ -192,79 +229,89 @@
(and (.isArray ^Class oc)
(= (.getComponentType oc) String))))
-(defn make-counter
- [{:keys [name help registry reg labels] :as props}]
+(defmethod run-collector! :counter
+ [{:keys [::mdef/instance]} {:keys [inc labels] :or {inc 1 labels default-empty-labels}}]
+ (let [instance (.labels instance (if (is-array? labels) labels (into-array String labels)))]
+ (.inc ^Counter$Child instance (double inc))))
+
+(defmethod run-collector! :gauge
+ [{:keys [::mdef/instance]} {:keys [inc dec labels val] :or {labels default-empty-labels}}]
+ (let [instance (.labels ^Gauge instance (if (is-array? labels) labels (into-array String labels)))]
+ (cond (number? inc) (.inc ^Gauge$Child instance (double inc))
+ (number? dec) (.dec ^Gauge$Child instance (double dec))
+ (number? val) (.set ^Gauge$Child instance (double val)))))
+
+(defmethod run-collector! :summary
+ [{:keys [::mdef/instance]} {:keys [val labels] :or {labels default-empty-labels}}]
+ (let [instance (.labels ^Summary instance (if (is-array? labels) labels (into-array String labels)))]
+ (.observe ^Summary$Child instance val)))
+
+(defmethod run-collector! :histogram
+ [{:keys [::mdef/instance]} {:keys [val labels] :or {labels default-empty-labels}}]
+ (let [instance (.labels ^Histogram instance (if (is-array? labels) labels (into-array String labels)))]
+ (.observe ^Histogram$Child instance val)))
+
+(defmethod create-collector :counter
+ [{::mdef/keys [name help reg labels]
+ ::keys [registry]
+ :as props}]
+
(let [registry (or registry reg)
instance (.. (Counter/build)
(name name)
- (help help))
- _ (when (seq labels)
- (.labelNames instance (into-array String labels)))
- instance (.register instance registry)]
+ (help help))]
+ (when (seq labels)
+ (.labelNames instance (into-array String labels)))
- {::instance instance
- ::fn (fn [{:keys [inc labels] :or {inc 1 labels default-empty-labels}}]
- (let [instance (.labels instance (if (is-array? labels) labels (into-array String labels)))]
- (.inc ^Counter$Child instance (double inc))))}))
+ (assoc props ::mdef/instance (.register instance registry))))
-(defn make-gauge
- [{:keys [name help registry reg labels] :as props}]
+(defmethod create-collector :gauge
+ [{::mdef/keys [name help reg labels]
+ ::keys [registry]
+ :as props}]
(let [registry (or registry reg)
instance (.. (Gauge/build)
(name name)
- (help help))
- _ (when (seq labels)
- (.labelNames instance (into-array String labels)))
- instance (.register instance registry)]
- {::instance instance
- ::fn (fn [{:keys [inc dec labels val] :or {labels default-empty-labels}}]
- (let [instance (.labels ^Gauge instance (if (is-array? labels) labels (into-array String labels)))]
- (cond (number? inc) (.inc ^Gauge$Child instance (double inc))
- (number? dec) (.dec ^Gauge$Child instance (double dec))
- (number? val) (.set ^Gauge$Child instance (double val)))))}))
+ (help help))]
+ (when (seq labels)
+ (.labelNames instance (into-array String labels)))
-(defn make-summary
- [{:keys [name help registry reg labels max-age quantiles buckets]
- :or {max-age 3600 buckets 12 quantiles default-quantiles} :as props}]
+ (assoc props ::mdef/instance (.register instance registry))))
+
+(defmethod create-collector :summary
+ [{::mdef/keys [name help reg labels max-age quantiles buckets]
+ ::keys [registry]
+ :or {max-age 3600 buckets 12 quantiles default-quantiles}
+ :as props}]
(let [registry (or registry reg)
builder (doto (Summary/build)
(.name name)
- (.help help))
- _ (when (seq quantiles)
- (.maxAgeSeconds ^Summary$Builder builder ^long max-age)
- (.ageBuckets ^Summary$Builder builder buckets))
- _ (doseq [[q e] quantiles]
- (.quantile ^Summary$Builder builder q e))
- _ (when (seq labels)
- (.labelNames ^Summary$Builder builder (into-array String labels)))
- instance (.register ^Summary$Builder builder registry)]
+ (.help help))]
- {::instance instance
- ::fn (fn [{:keys [val labels] :or {labels default-empty-labels}}]
- (let [instance (.labels ^Summary instance (if (is-array? labels) labels (into-array String labels)))]
- (.observe ^Summary$Child instance val)))}))
+ (when (seq quantiles)
+ (.maxAgeSeconds ^Summary$Builder builder ^long max-age)
+ (.ageBuckets ^Summary$Builder builder buckets))
-(defn make-histogram
- [{:keys [name help registry reg labels buckets]
- :or {buckets default-histogram-buckets}}]
+ (doseq [[q e] quantiles]
+ (.quantile ^Summary$Builder builder q e))
+
+ (when (seq labels)
+ (.labelNames ^Summary$Builder builder (into-array String labels)))
+
+ (assoc props ::mdef/instance (.register ^Summary$Builder builder registry))))
+
+(defmethod create-collector :histogram
+ [{::mdef/keys [name help reg labels buckets]
+ ::keys [registry]
+ :or {buckets default-histogram-buckets}
+ :as props}]
(let [registry (or registry reg)
instance (doto (Histogram/build)
(.name name)
(.help help)
- (.buckets (into-array Double/TYPE buckets)))
- _ (when (seq labels)
- (.labelNames instance (into-array String labels)))
- instance (.register instance registry)]
+ (.buckets (into-array Double/TYPE buckets)))]
- {::instance instance
- ::fn (fn [{:keys [val labels] :or {labels default-empty-labels}}]
- (let [instance (.labels ^Histogram instance (if (is-array? labels) labels (into-array String labels)))]
- (.observe ^Histogram$Child instance val)))}))
+ (when (seq labels)
+ (.labelNames instance (into-array String labels)))
-(defn create
- [{:keys [type] :as props}]
- (case type
- :counter (make-counter props)
- :gauge (make-gauge props)
- :summary (make-summary props)
- :histogram (make-histogram props)))
+ (assoc props ::mdef/instance (.register instance registry))))
diff --git a/backend/src/app/msgbus.clj b/backend/src/app/msgbus.clj
index e14bf9e126..b4c1a6a772 100644
--- a/backend/src/app/msgbus.clj
+++ b/backend/src/app/msgbus.clj
@@ -13,28 +13,14 @@
[app.common.spec :as us]
[app.common.transit :as t]
[app.config :as cfg]
+ [app.redis :as redis]
[app.util.async :as aa]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
- [promesa.core :as p])
- (:import
- io.lettuce.core.RedisClient
- io.lettuce.core.RedisURI
- io.lettuce.core.api.StatefulConnection
- io.lettuce.core.api.StatefulRedisConnection
- io.lettuce.core.api.async.RedisAsyncCommands
- io.lettuce.core.codec.ByteArrayCodec
- io.lettuce.core.codec.RedisCodec
- io.lettuce.core.codec.StringCodec
- io.lettuce.core.pubsub.RedisPubSubListener
- io.lettuce.core.pubsub.StatefulRedisPubSubConnection
- io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
- io.lettuce.core.resource.ClientResources
- io.lettuce.core.resource.DefaultClientResources
- java.time.Duration))
+ [promesa.core :as p]))
(set! *warn-on-reflection* true)
@@ -62,18 +48,14 @@
:timeout (dt/duration {:seconds 30})}
(d/without-nils cfg)))
-(s/def ::timeout ::dt/duration)
-(s/def ::redis-uri ::us/string)
(s/def ::buffer-size ::us/integer)
(defmethod ig/pre-init-spec ::msgbus [_]
- (s/keys :req-un [::buffer-size ::redis-uri ::timeout ::wrk/executor]))
+ (s/keys :req-un [::buffer-size ::redis/timeout ::redis/redis ::wrk/executor]))
(defmethod ig/init-key ::msgbus
- [_ {:keys [buffer-size redis-uri] :as cfg}]
- (l/info :hint "initialize msgbus"
- :buffer-size buffer-size
- :redis-uri redis-uri)
+ [_ {:keys [buffer-size] :as cfg}]
+ (l/info :hint "initialize msgbus" :buffer-size buffer-size)
(let [cmd-ch (a/chan buffer-size)
rcv-ch (a/chan (a/dropping-buffer buffer-size))
pub-ch (a/chan (a/dropping-buffer buffer-size) xform-prefix-topic)
@@ -106,33 +88,17 @@
;; --- IMPL
(defn- redis-connect
- [{:keys [redis-uri timeout] :as cfg}]
- (let [codec (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE)
-
- resources (.. (DefaultClientResources/builder)
- (ioThreadPoolSize 4)
- (computationThreadPoolSize 4)
- (build))
-
- uri (RedisURI/create redis-uri)
- rclient (RedisClient/create ^ClientResources resources ^RedisURI uri)
-
- pconn (.connect ^RedisClient rclient ^RedisCodec codec)
- sconn (.connectPubSub ^RedisClient rclient ^RedisCodec codec)]
-
- (.setTimeout ^StatefulRedisConnection pconn ^Duration timeout)
- (.setTimeout ^StatefulRedisPubSubConnection sconn ^Duration timeout)
-
+ [{:keys [timeout redis] :as cfg}]
+ (let [pconn (redis/connect redis :timeout timeout)
+ sconn (redis/connect redis :type :pubsub :timeout timeout)]
(-> cfg
- (assoc ::resources resources)
(assoc ::pconn pconn)
(assoc ::sconn sconn))))
(defn- redis-disconnect
- [{:keys [::pconn ::sconn ::resources] :as cfg}]
- (.. ^StatefulConnection pconn close)
- (.. ^StatefulConnection sconn close)
- (.shutdown ^ClientResources resources))
+ [{:keys [::pconn ::sconn] :as cfg}]
+ (redis/close! pconn)
+ (redis/close! sconn))
(defn- conj-subscription
"A low level function that is responsible to create on-demand
@@ -204,27 +170,18 @@
(defn- create-listener
[rcv-ch]
- (reify RedisPubSubListener
- (message [_ _pattern _topic _message])
- (message [_ topic message]
- ;; There are no back pressure, so we use a slidding
- ;; buffer for cases when the pubsub broker sends
- ;; more messages that we can process.
- (let [val {:topic topic :message (t/decode message)}]
- (when-not (a/offer! rcv-ch val)
- (l/warn :msg "dropping message on subscription loop"))))
- (psubscribed [_ _pattern _count])
- (punsubscribed [_ _pattern _count])
- (subscribed [_ _topic _count])
- (unsubscribed [_ _topic _count])))
+ (redis/pubsub-listener
+ :on-message (fn [_ topic message]
+ ;; There are no back pressure, so we use a slidding
+ ;; buffer for cases when the pubsub broker sends
+ ;; more messages that we can process.
+ (let [val {:topic topic :message (t/decode message)}]
+ (when-not (a/offer! rcv-ch val)
+ (l/warn :msg "dropping message on subscription loop"))))))
(defn start-io-loop
[{:keys [::sconn ::rcv-ch ::pub-ch ::state executor] :as cfg}]
-
- ;; Add a single listener to the pubsub connection
- (.addListener ^StatefulRedisPubSubConnection sconn
- ^RedisPubSubListener (create-listener rcv-ch))
-
+ (redis/add-listener! sconn (create-listener rcv-ch))
(letfn [(send-to-topic [topic message]
(a/go-loop [chans (seq (get-in @state [:topics topic]))
closed #{}]
@@ -270,11 +227,10 @@
intended to be used in core.async go blocks."
[{:keys [::pconn] :as cfg} {:keys [topic message]}]
(let [message (t/encode message)
- res (a/chan 1)
- pcomm (.async ^StatefulRedisConnection pconn)]
- (-> (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)
+ res (a/chan 1)]
+ (-> (redis/publish! pconn topic message)
(p/finally (fn [_ cause]
- (when (and cause (.isOpen ^StatefulConnection pconn))
+ (when (and cause (redis/open? pconn))
(a/offer! res cause))
(a/close! res))))
res))
@@ -283,14 +239,10 @@
"Create redis subscription. Blocking operation, intended to be used
inside an agent."
[{:keys [::sconn] :as cfg} topic]
- (let [topic (into-array String [topic])
- scomm (.sync ^StatefulRedisPubSubConnection sconn)]
- (.subscribe ^RedisPubSubCommands scomm topic)))
+ (redis/subscribe! sconn topic))
(defn redis-unsub
"Removes redis subscription. Blocking operation, intended to be used
inside an agent."
[{:keys [::sconn] :as cfg} topic]
- (let [topic (into-array String [topic])
- scomm (.sync ^StatefulRedisPubSubConnection sconn)]
- (.unsubscribe ^RedisPubSubCommands scomm topic)))
+ (redis/unsubscribe! sconn topic))
diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj
new file mode 100644
index 0000000000..06b7e5e7a4
--- /dev/null
+++ b/backend/src/app/redis.clj
@@ -0,0 +1,319 @@
+;; This Source Code Form is subject to the terms of the Mozilla Public
+;; License, v. 2.0. If a copy of the MPL was not distributed with this
+;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
+;;
+;; Copyright (c) UXBOX Labs SL
+
+(ns app.redis
+ "The msgbus abstraction implemented using redis as underlying backend."
+ (:require
+ [app.common.data :as d]
+ [app.common.logging :as l]
+ [app.common.spec :as us]
+ [app.metrics :as mtx]
+ [app.redis.script :as-alias rscript]
+ [app.util.time :as dt]
+ [clojure.core :as c]
+ [clojure.java.io :as io]
+ [clojure.spec.alpha :as s]
+ [cuerdas.core :as str]
+ [integrant.core :as ig]
+ [promesa.core :as p])
+ (:import
+ clojure.lang.IDeref
+ io.lettuce.core.RedisClient
+ io.lettuce.core.RedisURI
+ io.lettuce.core.ScriptOutputType
+ io.lettuce.core.api.StatefulConnection
+ io.lettuce.core.api.StatefulRedisConnection
+ io.lettuce.core.api.async.RedisAsyncCommands
+ io.lettuce.core.api.async.RedisScriptingAsyncCommands
+ io.lettuce.core.codec.ByteArrayCodec
+ io.lettuce.core.codec.RedisCodec
+ io.lettuce.core.codec.StringCodec
+ io.lettuce.core.pubsub.RedisPubSubListener
+ io.lettuce.core.pubsub.StatefulRedisPubSubConnection
+ io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
+ io.lettuce.core.resource.ClientResources
+ io.lettuce.core.resource.DefaultClientResources
+ io.netty.util.HashedWheelTimer
+ io.netty.util.Timer
+ java.lang.AutoCloseable
+ java.time.Duration))
+
+(set! *warn-on-reflection* true)
+
+(declare initialize-resources)
+(declare shutdown-resources)
+(declare connect)
+(declare close!)
+
+(s/def ::timer
+ #(instance? Timer %))
+
+(s/def ::connection
+ #(or (instance? StatefulRedisConnection %)
+ (and (instance? IDeref %)
+ (instance? StatefulRedisConnection (deref %)))))
+
+(s/def ::pubsub-connection
+ #(or (instance? StatefulRedisPubSubConnection %)
+ (and (instance? IDeref %)
+ (instance? StatefulRedisPubSubConnection (deref %)))))
+
+(s/def ::redis-uri
+ #(instance? RedisURI %))
+
+(s/def ::resources
+ #(instance? ClientResources %))
+
+(s/def ::pubsub-listener
+ #(instance? RedisPubSubListener %))
+
+(s/def ::uri ::us/not-empty-string)
+(s/def ::timeout ::dt/duration)
+(s/def ::connect? ::us/boolean)
+(s/def ::io-threads ::us/integer)
+(s/def ::worker-threads ::us/integer)
+
+(s/def ::redis
+ (s/keys :req [::resources ::redis-uri ::timer ::mtx/metrics]
+ :opt [::connection]))
+
+(defmethod ig/pre-init-spec ::redis [_]
+ (s/keys :req-un [::uri ::mtx/metrics]
+ :opt-un [::timeout
+ ::connect?
+ ::io-threads
+ ::worker-threads]))
+
+(defmethod ig/prep-key ::redis
+ [_ cfg]
+ (let [runtime (Runtime/getRuntime)
+ cpus (.availableProcessors ^Runtime runtime)]
+ (merge {:timeout (dt/duration 5000)
+ :io-threads (max 3 cpus)
+ :worker-threads (max 3 cpus)}
+ (d/without-nils cfg))))
+
+(defmethod ig/init-key ::redis
+ [_ {:keys [connect?] :as cfg}]
+ (let [cfg (initialize-resources cfg)]
+ (cond-> cfg
+ connect? (assoc ::connection (connect cfg)))))
+
+(defmethod ig/halt-key! ::redis
+ [_ state]
+ (shutdown-resources state))
+
+(def default-codec
+ (RedisCodec/of StringCodec/UTF8 ByteArrayCodec/INSTANCE))
+
+(def string-codec
+ (RedisCodec/of StringCodec/UTF8 StringCodec/UTF8))
+
+(defn- initialize-resources
+ "Initialize redis connection resources"
+ [{:keys [uri io-threads worker-threads connect? metrics] :as cfg}]
+ (l/info :hint "initialize redis resources"
+ :uri uri
+ :io-threads io-threads
+ :worker-threads worker-threads
+ :connect? connect?)
+
+ (let [timer (HashedWheelTimer.)
+ resources (.. (DefaultClientResources/builder)
+ (ioThreadPoolSize ^long io-threads)
+ (computationThreadPoolSize ^long worker-threads)
+ (timer ^Timer timer)
+ (build))
+
+ redis-uri (RedisURI/create ^String uri)]
+
+ (-> cfg
+ (assoc ::mtx/metrics metrics)
+ (assoc ::cache (atom {}))
+ (assoc ::timer timer)
+ (assoc ::redis-uri redis-uri)
+ (assoc ::resources resources))))
+
+(defn- shutdown-resources
+ [{:keys [::resources ::cache ::timer]}]
+ (run! close! (vals @cache))
+ (when resources
+ (.shutdown ^ClientResources resources))
+ (when timer
+ (.stop ^Timer timer)))
+
+(defn connect
+ [{:keys [::resources ::redis-uri] :as cfg}
+ & {:keys [timeout codec type] :or {codec default-codec type :default}}]
+
+ (us/assert! ::resources resources)
+
+ (let [client (RedisClient/create ^ClientResources resources ^RedisURI redis-uri)
+ timeout (or timeout (:timeout cfg))
+ conn (case type
+ :default (.connect ^RedisClient client ^RedisCodec codec)
+ :pubsub (.connectPubSub ^RedisClient client ^RedisCodec codec))]
+
+ (.setTimeout ^StatefulConnection conn ^Duration timeout)
+
+ (reify
+ IDeref
+ (deref [_] conn)
+
+ AutoCloseable
+ (close [_]
+ (.close ^StatefulConnection conn)
+ (.shutdown ^RedisClient client)))))
+
+(defn get-or-connect
+ [{:keys [::cache] :as state} key options]
+ (assoc state ::connection
+ (or (get @cache key)
+ (-> (swap! cache (fn [cache]
+ (when-let [prev (get cache key)]
+ (close! prev))
+ (assoc cache key (connect state options))))
+ (get key)))))
+
+(defn add-listener!
+ [conn listener]
+ (us/assert! ::pubsub-connection @conn)
+ (us/assert! ::pubsub-listener listener)
+
+ (.addListener ^StatefulRedisPubSubConnection @conn
+ ^RedisPubSubListener listener)
+ conn)
+
+(defn publish!
+ [conn topic message]
+ (us/assert! ::us/string topic)
+ (us/assert! ::us/bytes message)
+ (us/assert! ::connection @conn)
+
+ (let [pcomm (.async ^StatefulRedisConnection @conn)]
+ (.publish ^RedisAsyncCommands pcomm ^String topic ^bytes message)))
+
+(defn subscribe!
+ "Blocking operation, intended to be used on a worker/agent thread."
+ [conn & topics]
+ (us/assert! ::pubsub-connection @conn)
+ (let [topics (into-array String (map str topics))
+ cmd (.sync ^StatefulRedisPubSubConnection @conn)]
+ (.subscribe ^RedisPubSubCommands cmd topics)))
+
+(defn unsubscribe!
+ "Blocking operation, intended to be used on a worker/agent thread."
+ [conn & topics]
+ (us/assert! ::pubsub-connection @conn)
+ (let [topics (into-array String (map str topics))
+ cmd (.sync ^StatefulRedisPubSubConnection @conn)]
+ (.unsubscribe ^RedisPubSubCommands cmd topics)))
+
+(defn open?
+ [conn]
+ (.isOpen ^StatefulConnection @conn))
+
+(defn pubsub-listener
+ [& {:keys [on-message on-subscribe on-unsubscribe]}]
+ (reify RedisPubSubListener
+ (message [_ pattern topic message]
+ (when on-message
+ (on-message pattern topic message)))
+
+ (message [_ topic message]
+ (when on-message
+ (on-message nil topic message)))
+
+ (psubscribed [_ pattern count]
+ (when on-subscribe
+ (on-subscribe pattern nil count)))
+
+ (punsubscribed [_ pattern count]
+ (when on-unsubscribe
+ (on-unsubscribe pattern nil count)))
+
+ (subscribed [_ topic count]
+ (when on-subscribe
+ (on-subscribe nil topic count)))
+
+ (unsubscribed [_ topic count]
+ (when on-unsubscribe
+ (on-unsubscribe nil topic count)))))
+
+(defn close!
+ [o]
+ (.close ^AutoCloseable o))
+
+(def ^:private scripts-cache (atom {}))
+(def noop-fn (constantly nil))
+
+(s/def ::rscript/name qualified-keyword?)
+(s/def ::rscript/path ::us/not-empty-string)
+(s/def ::rscript/keys (s/every any? :kind vector?))
+(s/def ::rscript/vals (s/every any? :kind vector?))
+
+(s/def ::rscript/script
+ (s/keys :req [::rscript/name
+ ::rscript/path]
+ :opt [::rscript/keys
+ ::rscript/vals]))
+
+(defn eval!
+ [{:keys [::mtx/metrics] :as state} script]
+ (us/assert! ::rscript/script script)
+ (us/assert! ::redis state)
+
+ (let [rconn (-> state ::connection deref)
+ cmd (.async ^StatefulRedisConnection rconn)
+ keys (into-array String (map str (::rscript/keys script)))
+ vals (into-array String (map str (::rscript/vals script)))
+ sname (::rscript/name script)]
+
+ (letfn [(on-error [cause]
+ (if (instance? io.lettuce.core.RedisNoScriptException cause)
+ (do
+ (l/error :hint "no script found" :name sname :cause cause)
+ (-> (load-script)
+ (p/then eval-script)))
+ (if-let [on-error (::rscript/on-error script)]
+ (on-error cause)
+ (p/rejected cause))))
+
+ (eval-script [sha]
+ (let [tpoint (dt/tpoint)]
+ (-> (.evalsha ^RedisScriptingAsyncCommands cmd
+ ^String sha
+ ^ScriptOutputType ScriptOutputType/MULTI
+ ^"[Ljava.lang.String;" keys
+ ^"[Ljava.lang.String;" vals)
+ (p/then (fn [result]
+ (let [elapsed (tpoint)]
+ (mtx/run! metrics {:id :redis-eval-timing
+ :labels [(name sname)]
+ :val (inst-ms elapsed)})
+ (l/trace :hint "eval script"
+ :name (name sname)
+ :sha sha
+ :params (str/join "," (::rscript/vals script))
+ :elapsed (dt/format-duration elapsed))
+ result)))
+ (p/catch on-error))))
+
+ (read-script []
+ (-> script ::rscript/path io/resource slurp))
+
+ (load-script []
+ (l/trace :hint "load script" :name sname)
+ (-> (.scriptLoad ^RedisScriptingAsyncCommands cmd
+ ^String (read-script))
+ (p/then (fn [sha]
+ (swap! scripts-cache assoc sname sha)
+ sha))))]
+
+ (if-let [sha (get @scripts-cache sname)]
+ (eval-script sha)
+ (-> (load-script)
+ (p/then eval-script))))))
diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj
index 5608ffcbda..162d484449 100644
--- a/backend/src/app/rpc.clj
+++ b/backend/src/app/rpc.clj
@@ -10,10 +10,12 @@
[app.common.logging :as l]
[app.common.spec :as us]
[app.db :as db]
+ [app.http :as-alias http]
[app.loggers.audit :as audit]
[app.metrics :as mtx]
[app.rpc.retry :as retry]
[app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
[app.util.async :as async]
[app.util.services :as sv]
[app.worker :as wrk]
@@ -39,81 +41,72 @@
(ex/ignoring (hook-fn)))
response)
+(defn- handle-response
+ [request result]
+ (let [mdata (meta result)]
+ (p/-> (yrs/response 200 result (::http/headers mdata {}))
+ (handle-response-transformation request mdata)
+ (handle-before-comple-hook mdata))))
+
(defn- rpc-query-handler
"Ring handler that dispatches query requests and convert between
internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise]
- (letfn [(handle-response [result]
- (let [mdata (meta result)]
- (-> (yrs/response 200 result)
- (handle-response-transformation request mdata))))]
+ (let [type (keyword (:type params))
+ data (into {::http/request request} params)
+ data (if profile-id
+ (assoc data :profile-id profile-id ::session-id session-id)
+ (dissoc data :profile-id))
+ method (get methods type default-handler)]
- (let [type (keyword (:type params))
- data (into {::request request} params)
- data (if profile-id
- (assoc data :profile-id profile-id ::session-id session-id)
- (dissoc data :profile-id))
- method (get methods type default-handler)]
-
- (-> (method data)
- (p/then handle-response)
- (p/then respond)
- (p/catch (fn [cause]
- (let [context {:profile-id profile-id}]
- (raise (ex/wrap-with-context cause context)))))))))
+ (-> (method data)
+ (p/then (partial handle-response request))
+ (p/then respond)
+ (p/catch (fn [cause]
+ (let [context {:profile-id profile-id}]
+ (raise (ex/wrap-with-context cause context))))))))
(defn- rpc-mutation-handler
"Ring handler that dispatches mutation requests and convert between
internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise]
- (letfn [(handle-response [result]
- (let [mdata (meta result)]
- (p/-> (yrs/response 200 result)
- (handle-response-transformation request mdata)
- (handle-before-comple-hook mdata))))]
+ (let [type (keyword (:type params))
+ data (into {::request request} params)
+ data (if profile-id
+ (assoc data :profile-id profile-id ::session-id session-id)
+ (dissoc data :profile-id))
- (let [type (keyword (:type params))
- data (into {::request request} params)
- data (if profile-id
- (assoc data :profile-id profile-id ::session-id session-id)
- (dissoc data :profile-id))
-
- method (get methods type default-handler)]
- (-> (method data)
- (p/then handle-response)
- (p/then respond)
- (p/catch (fn [cause]
- (let [context {:profile-id profile-id}]
- (raise (ex/wrap-with-context cause context)))))))))
+ method (get methods type default-handler)]
+ (-> (method data)
+ (p/then (partial handle-response request))
+ (p/then respond)
+ (p/catch (fn [cause]
+ (let [context {:profile-id profile-id}]
+ (raise (ex/wrap-with-context cause context))))))))
(defn- rpc-command-handler
"Ring handler that dispatches cmd requests and convert between
internal async flow into ring async flow."
[methods {:keys [profile-id session-id params] :as request} respond raise]
- (letfn [(handle-response [result]
- (let [mdata (meta result)]
- (p/-> (yrs/response 200 result)
- (handle-response-transformation request mdata)
- (handle-before-comple-hook mdata))))]
+ (let [cmd (keyword (:command params))
+ data (into {::request request} params)
+ data (if profile-id
+ (assoc data :profile-id profile-id ::session-id session-id)
+ (dissoc data :profile-id))
- (let [cmd (keyword (:command params))
- data (into {::request request} params)
- data (if profile-id
- (assoc data :profile-id profile-id ::session-id session-id)
- (dissoc data :profile-id))
-
- method (get methods cmd default-handler)]
- (-> (method data)
- (p/then handle-response)
- (p/then respond)
- (p/catch (fn [cause]
- (let [context {:profile-id profile-id}]
- (raise (ex/wrap-with-context cause context)))))))))
+ method (get methods cmd default-handler)]
+ (-> (method data)
+ (p/then (partial handle-response request))
+ (p/then respond)
+ (p/catch (fn [cause]
+ (let [context {:profile-id profile-id}]
+ (raise (ex/wrap-with-context cause context))))))))
(defn- wrap-metrics
"Wrap service method with metrics measurement."
[{:keys [metrics ::metrics-id]} f mdata]
(let [labels (into-array String [(::sv/name mdata)])]
+
(fn [cfg params]
(let [start (System/nanoTime)]
(p/finally
@@ -177,7 +170,8 @@
[cfg f mdata]
(let [f (as-> f $
(wrap-dispatch cfg $ mdata)
- (rlimit/wrap-rlimit cfg $ mdata)
+ (rsem/wrap cfg $ mdata)
+ (rlimit/wrap cfg $ mdata)
(retry/wrap-retry cfg $ mdata)
(wrap-audit cfg $ mdata)
(wrap-metrics cfg $ mdata)
@@ -258,12 +252,12 @@
(s/def ::public-uri ::us/not-empty-string)
(s/def ::session map?)
(s/def ::storage some?)
-(s/def ::tokens fn?)
+(s/def ::sprops map?)
(defmethod ig/pre-init-spec ::methods [_]
(s/keys :req-un [::storage
::session
- ::tokens
+ ::sprops
::audit
::executors
::public-uri
diff --git a/backend/src/app/rpc/commands/auth.clj b/backend/src/app/rpc/commands/auth.clj
index 6c9afdb197..d052c6b203 100644
--- a/backend/src/app/rpc/commands/auth.clj
+++ b/backend/src/app/rpc/commands/auth.clj
@@ -16,7 +16,8 @@
[app.rpc.doc :as-alias doc]
[app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile]
- [app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
+ [app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[buddy.hashers :as hashers]
@@ -80,7 +81,7 @@
;; ---- COMMAND: login with password
(defn login-with-password
- [{:keys [pool session tokens] :as cfg} {:keys [email password] :as params}]
+ [{:keys [pool session sprops] :as cfg} {:keys [email password] :as params}]
(when-not (contains? cf/flags :login)
(ex/raise :type :restriction
@@ -114,7 +115,7 @@
(profile/decode-profile-row))
invitation (when-let [token (:invitation-token params)]
- (tokens :verify {:token token :iss :team-invitation}))
+ (tokens/verify sprops {:token token :iss :team-invitation}))
;; If invitation member-id does not matches the profile-id, we just proceed to ignore the
;; invitation because invitations matches exactly; and user can't loging with other email and
@@ -135,7 +136,7 @@
(sv/defmethod ::login-with-password
"Performs authentication using penpot password."
{:auth false
- ::rlimit/permits (cf/get :rlimit-password)
+ ::rsem/permits (cf/get :rpc-semaphore-permits-password)
::doc/added "1.15"}
[cfg params]
(login-with-password cfg params))
@@ -156,9 +157,9 @@
;; ---- COMMAND: Recover Profile
(defn recover-profile
- [{:keys [pool tokens] :as cfg} {:keys [token password]}]
+ [{:keys [pool sprops] :as cfg} {:keys [token password]}]
(letfn [(validate-token [token]
- (let [tdata (tokens :verify {:token token :iss :password-recovery})]
+ (let [tdata (tokens/verify sprops {:token token :iss :password-recovery})]
(:profile-id tdata)))
(update-password [conn profile-id]
@@ -176,7 +177,7 @@
(sv/defmethod ::recover-profile
{:auth false
- ::rlimit/permits (cf/get :rlimit-password)
+ ::rsem/permits (cf/get :rpc-semaphore-permits-password)
::doc/added "1.15"}
[cfg params]
(recover-profile cfg params))
@@ -184,12 +185,12 @@
;; ---- COMMAND: Prepare Register
(defn prepare-register
- [{:keys [pool tokens] :as cfg} params]
+ [{:keys [pool sprops] :as cfg} params]
(when-not (contains? cf/flags :registration)
(if-not (contains? params :invitation-token)
(ex/raise :type :restriction
:code :registration-disabled)
- (let [invitation (tokens :verify {:token (:invitation-token params) :iss :team-invitation})]
+ (let [invitation (tokens/verify sprops {:token (:invitation-token params) :iss :team-invitation})]
(when-not (= (:email params) (:member-email invitation))
(ex/raise :type :restriction
:code :email-does-not-match-invitation
@@ -222,7 +223,7 @@
:iss :prepared-register
:exp (dt/in-future "48h")}
- token (tokens :generate params)]
+ token (tokens/generate sprops params)]
(with-meta {:token token}
{::audit/profile-id uuid/zero})))
@@ -297,8 +298,8 @@
(assoc :default-project-id (:default-project-id team)))))
(defn register-profile
- [{:keys [conn tokens session] :as cfg} {:keys [token] :as params}]
- (let [claims (tokens :verify {:token token :iss :prepared-register})
+ [{:keys [conn sprops session] :as cfg} {:keys [token] :as params}]
+ (let [claims (tokens/verify sprops {:token token :iss :prepared-register})
params (merge params claims)]
(check-profile-existence! conn params)
(let [is-active (or (:is-active params)
@@ -308,14 +309,14 @@
(create-profile-relations conn)
(profile/decode-profile-row))
invitation (when-let [token (:invitation-token params)]
- (tokens :verify {:token token :iss :team-invitation}))]
+ (tokens/verify sprops {:token token :iss :team-invitation}))]
(cond
;; If invitation token comes in params, this is because the user comes from team-invitation process;
;; in this case, regenerate token and send back to the user a new invitation token (and mark current
;; session as logged). This happens only if the invitation email matches with the register email.
(and (some? invitation) (= (:email profile) (:member-email invitation)))
(let [claims (assoc invitation :member-id (:id profile))
- token (tokens :generate claims)
+ token (tokens/generate sprops claims)
resp {:invitation-token token}]
(with-meta resp
{:transform-response ((:create session) (:id profile))
@@ -341,14 +342,15 @@
;; In all other cases, send a verification email.
:else
- (let [vtoken (tokens :generate
- {:iss :verify-email
- :exp (dt/in-future "48h")
- :profile-id (:id profile)
- :email (:email profile)})
- ptoken (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})]
+ (let [vtoken (tokens/generate sprops
+ {:iss :verify-email
+ :exp (dt/in-future "48h")
+ :profile-id (:id profile)
+ :email (:email profile)})
+ ptoken (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)
+ :exp (dt/in-future {:days 30})})]
(eml/send! {::eml/conn conn
::eml/factory eml/register
:public-uri (:public-uri cfg)
@@ -366,7 +368,7 @@
(sv/defmethod ::register-profile
{:auth false
- ::rlimit/permits (cf/get :rlimit-password)
+ ::rsem/permits (cf/get :rpc-semaphore-permits-password)
::doc/added "1.15"}
[{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool]
@@ -376,18 +378,19 @@
;; ---- COMMAND: Request Profile Recovery
(defn request-profile-recovery
- [{:keys [pool tokens] :as cfg} {:keys [email] :as params}]
+ [{:keys [pool sprops] :as cfg} {:keys [email] :as params}]
(letfn [(create-recovery-token [{:keys [id] :as profile}]
- (let [token (tokens :generate
- {:iss :password-recovery
- :exp (dt/in-future "15m")
- :profile-id id})]
+ (let [token (tokens/generate sprops
+ {:iss :password-recovery
+ :exp (dt/in-future "15m")
+ :profile-id id})]
(assoc profile :token token)))
(send-email-notification [conn profile]
- (let [ptoken (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})]
+ (let [ptoken (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)
+ :exp (dt/in-future {:days 30})})]
(eml/send! {::eml/conn conn
::eml/factory eml/password-recovery
:public-uri (:public-uri cfg)
diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj
index ff45fb6c6f..305ac8e2cd 100644
--- a/backend/src/app/rpc/mutations/files.clj
+++ b/backend/src/app/rpc/mutations/files.clj
@@ -20,7 +20,7 @@
[app.rpc.permissions :as perms]
[app.rpc.queries.files :as files]
[app.rpc.queries.projects :as proj]
- [app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
[app.storage.impl :as simpl]
[app.util.blob :as blob]
[app.util.services :as sv]
@@ -318,7 +318,7 @@
(contains? o :changes-with-metadata)))))
(sv/defmethod ::update-file
- {::rlimit/permits (cf/get :rlimit-file-update)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-file-update)}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(db/xact-lock! conn id)
diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj
index b24544a88b..2fa930b6e4 100644
--- a/backend/src/app/rpc/mutations/fonts.clj
+++ b/backend/src/app/rpc/mutations/fonts.clj
@@ -15,7 +15,7 @@
[app.media :as media]
[app.rpc.doc :as-alias doc]
[app.rpc.queries.teams :as teams]
- [app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
[app.storage :as sto]
[app.util.services :as sv]
[app.util.time :as dt]
@@ -42,7 +42,7 @@
::font-id ::font-family ::font-weight ::font-style]))
(sv/defmethod ::create-font-variant
- {::rlimit/permits (cf/get :rlimit-font)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-font)}
[{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
(teams/check-edition-permissions! pool profile-id team-id)
diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj
index 25d894d770..50ab06e0f4 100644
--- a/backend/src/app/rpc/mutations/media.clj
+++ b/backend/src/app/rpc/mutations/media.clj
@@ -15,7 +15,7 @@
[app.db :as db]
[app.media :as media]
[app.rpc.queries.teams :as teams]
- [app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
[app.storage :as sto]
[app.storage.tmp :as tmp]
[app.util.bytes :as bs]
@@ -53,7 +53,7 @@
:opt-un [::id]))
(sv/defmethod ::upload-file-media-object
- {::rlimit/permits (cf/get :rlimit-image)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}]
(let [file (select-file pool file-id)
cfg (update cfg :storage media/configure-assets-storage)]
@@ -181,7 +181,7 @@
:opt-un [::id ::name]))
(sv/defmethod ::create-file-media-object-from-url
- {::rlimit/permits (cf/get :rlimit-image)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(let [file (select-file pool file-id)
cfg (update cfg :storage media/configure-assets-storage)]
diff --git a/backend/src/app/rpc/mutations/profile.clj b/backend/src/app/rpc/mutations/profile.clj
index dea366f5d7..a72952be92 100644
--- a/backend/src/app/rpc/mutations/profile.clj
+++ b/backend/src/app/rpc/mutations/profile.clj
@@ -17,8 +17,9 @@
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile]
- [app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
[app.storage :as sto]
+ [app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
@@ -86,7 +87,7 @@
(s/keys :req-un [::profile-id ::password ::old-password]))
(sv/defmethod ::update-profile-password
- {::rlimit/permits (cf/get :rlimit-password)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[{:keys [pool] :as cfg} {:keys [password] :as params}]
(db/with-atomic [conn pool]
(let [profile (validate-password! conn params)
@@ -129,7 +130,7 @@
(s/keys :req-un [::profile-id ::file]))
(sv/defmethod ::update-profile-photo
- {::rlimit/permits (cf/get :rlimit-image)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[cfg {:keys [file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
@@ -183,15 +184,16 @@
{:changed true})
(defn- request-email-change
- [{:keys [conn tokens] :as cfg} {:keys [profile email] :as params}]
- (let [token (tokens :generate
- {:iss :change-email
- :exp (dt/in-future "15m")
- :profile-id (:id profile)
- :email email})
- ptoken (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})]
+ [{:keys [conn sprops] :as cfg} {:keys [profile email] :as params}]
+ (let [token (tokens/generate sprops
+ {:iss :change-email
+ :exp (dt/in-future "15m")
+ :profile-id (:id profile)
+ :email email})
+ ptoken (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)
+ :exp (dt/in-future {:days 30})})]
(when (not= email (:email profile))
(cmd.auth/check-profile-existence! conn params))
@@ -303,7 +305,7 @@
(s/def ::login ::cmd.auth/login-with-password)
(sv/defmethod ::login
- {:auth false ::rlimit/permits (cf/get :rlimit-password)}
+ {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[cfg params]
(cmd.auth/login-with-password cfg params))
@@ -321,7 +323,7 @@
(s/def ::recover-profile ::cmd.auth/recover-profile)
(sv/defmethod ::recover-profile
- {:auth false ::rlimit/permits (cf/get :rlimit-password)}
+ {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[cfg params]
(cmd.auth/recover-profile cfg params))
@@ -338,7 +340,7 @@
(s/def ::register-profile ::cmd.auth/register-profile)
(sv/defmethod ::register-profile
- {:auth false ::rlimit/permits (cf/get :rlimit-password)}
+ {:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
[{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool]
(-> (assoc cfg :conn conn)
diff --git a/backend/src/app/rpc/mutations/teams.clj b/backend/src/app/rpc/mutations/teams.clj
index 7be2a96aab..8e9c1d2c53 100644
--- a/backend/src/app/rpc/mutations/teams.clj
+++ b/backend/src/app/rpc/mutations/teams.clj
@@ -20,8 +20,9 @@
[app.rpc.permissions :as perms]
[app.rpc.queries.profile :as profile]
[app.rpc.queries.teams :as teams]
- [app.rpc.rlimit :as rlimit]
+ [app.rpc.semaphore :as rsem]
[app.storage :as sto]
+ [app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
@@ -289,7 +290,7 @@
(s/keys :req-un [::profile-id ::team-id ::file]))
(sv/defmethod ::update-team-photo
- {::rlimit/permits (cf/get :rlimit-image)}
+ {::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[cfg {:keys [file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
@@ -398,20 +399,21 @@
update set role = ?, valid_until = ?, updated_at = now();")
(defn- create-team-invitation
- [{:keys [conn tokens team profile role email] :as cfg}]
+ [{:keys [conn sprops team profile role email] :as cfg}]
(let [member (profile/retrieve-profile-data-by-email conn email)
token-exp (dt/in-future "168h") ;; 7 days
- itoken (tokens :generate
- {:iss :team-invitation
- :exp token-exp
- :profile-id (:id profile)
- :role role
- :team-id (:id team)
- :member-email (:email member email)
- :member-id (:id member)})
- ptoken (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})]
+ itoken (tokens/generate sprops
+ {:iss :team-invitation
+ :exp token-exp
+ :profile-id (:id profile)
+ :role role
+ :team-id (:id team)
+ :member-email (:email member email)
+ :member-id (:id member)})
+ ptoken (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)
+ :exp (dt/in-future {:days 30})})]
(when (contains? cf/flags :log-invitation-tokens)
(l/trace :hint "invitation token" :token itoken))
diff --git a/backend/src/app/rpc/mutations/verify_token.clj b/backend/src/app/rpc/mutations/verify_token.clj
index a8016c0bbc..3211f3bece 100644
--- a/backend/src/app/rpc/mutations/verify_token.clj
+++ b/backend/src/app/rpc/mutations/verify_token.clj
@@ -12,6 +12,8 @@
[app.loggers.audit :as audit]
[app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile]
+ [app.tokens :as tokens]
+ [app.tokens.spec.team-invitation :as-alias spec.team-invitation]
[app.util.services :as sv]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]))
@@ -23,9 +25,9 @@
:opt-un [::profile-id]))
(sv/defmethod ::verify-token {:auth false}
- [{:keys [pool tokens] :as cfg} {:keys [token] :as params}]
+ [{:keys [pool sprops] :as cfg} {:keys [token] :as params}]
(db/with-atomic [conn pool]
- (let [claims (tokens :verify {:token token})
+ (let [claims (tokens/verify sprops {:token token})
cfg (assoc cfg :conn conn)]
(process-token cfg params claims))))
@@ -76,19 +78,19 @@
(s/def ::iss keyword?)
(s/def ::exp ::us/inst)
-(s/def :internal.tokens.team-invitation/profile-id ::us/uuid)
-(s/def :internal.tokens.team-invitation/role ::us/keyword)
-(s/def :internal.tokens.team-invitation/team-id ::us/uuid)
-(s/def :internal.tokens.team-invitation/member-email ::us/email)
-(s/def :internal.tokens.team-invitation/member-id (s/nilable ::us/uuid))
+(s/def ::spec.team-invitation/profile-id ::us/uuid)
+(s/def ::spec.team-invitation/role ::us/keyword)
+(s/def ::spec.team-invitation/team-id ::us/uuid)
+(s/def ::spec.team-invitation/member-email ::us/email)
+(s/def ::spec.team-invitation/member-id (s/nilable ::us/uuid))
(s/def ::team-invitation-claims
(s/keys :req-un [::iss ::exp
- :internal.tokens.team-invitation/profile-id
- :internal.tokens.team-invitation/role
- :internal.tokens.team-invitation/team-id
- :internal.tokens.team-invitation/member-email]
- :opt-un [:internal.tokens.team-invitation/member-id]))
+ ::spec.team-invitation/profile-id
+ ::spec.team-invitation/role
+ ::spec.team-invitation/team-id
+ ::spec.team-invitation/member-email]
+ :opt-un [::spec.team-invitation/member-id]))
(defn- accept-invitation
[{:keys [conn] :as cfg} {:keys [member-id team-id role member-email] :as claims}]
diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj
index af04af269f..2a90a8aa10 100644
--- a/backend/src/app/rpc/rlimit.clj
+++ b/backend/src/app/rpc/rlimit.clj
@@ -5,63 +5,266 @@
;; Copyright (c) UXBOX Labs SL
(ns app.rpc.rlimit
- "Resource usage limits (in other words: semaphores)."
+ "Rate limit strategies implementation for RPC services.
+
+ It mainly implements two strategies: fixed window and bucket. You
+ can use one of them or both to create a combination of limits. All
+ limits are updated in each request and the most restrictive one
+ blocks the user activity.
+
+ On the HTTP layer it translates to the 429 http response.
+
+ The limits are defined as vector of 3 elements:
+ [ ]
+
+ The opts format is strategy dependent. With fixed `:window` strategy
+ you have the following format:
+ [:somename :window \"1000/m\"]
+
+ Where the first number means the quantity of allowed request and the
+ letter indicates the window unit, that can be `w` for weeks, `h` for
+ hours, `m` for minutes and `s` for seconds.
+
+ The the `:bucket` strategy you will have something like this:
+ [:somename :bucket \"100/10/15s]
+
+ Where the first number indicates the total tokens capacity (or
+ available burst), the second number indicates the refill rate and
+ the last number suffixed with the unit indicates the time window (or
+ interval) of the refill. This means that this limit configurations
+ allow burst of 100 elements and will refill 10 tokens each 15s (1
+ token each 1.5segons).
+
+ The bucket strategy works well for small intervals and window
+ strategy works better for large intervals.
+
+ All limits uses the profile-id as user identifier. In case of the
+ profile-id is not available, the IP address is used as fallback
+ value.
+ "
(:require
[app.common.data :as d]
+ [app.common.data.macros :as dm]
+ [app.common.exceptions :as ex]
[app.common.logging :as l]
- [app.metrics :as mtx]
- [app.util.services :as sv]
+ [app.common.spec :as us]
+ [app.common.uri :as uri]
+ [app.common.uuid :as uuid]
+ [app.config :as cf]
+ [app.http :as-alias http]
+ [app.loggers.audit :refer [parse-client-ip]]
+ [app.redis :as redis]
+ [app.redis.script :as-alias rscript]
+ [app.rpc.rlimit.result :as-alias lresult]
+ [app.util.services :as-alias sv]
+ [app.util.time :as dt]
+ [clojure.spec.alpha :as s]
+ [cuerdas.core :as str]
[promesa.core :as p]))
-(defprotocol IAsyncSemaphore
- (acquire! [_])
- (release! [_]))
+(def ^:private default-timeout
+ (dt/duration 400))
-(defn semaphore
- [{:keys [permits metrics name]}]
- (let [name (d/name name)
- used (volatile! 0)
- queue (volatile! (d/queue))
- labels (into-array String [name])]
- (reify IAsyncSemaphore
- (acquire! [this]
- (let [d (p/deferred)]
- (locking this
- (if (< @used permits)
- (do
- (vswap! used inc)
- (p/resolve! d))
- (vswap! queue conj d)))
+(def ^:private default-options
+ {:codec redis/string-codec
+ :timeout default-timeout})
- (mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels })
- (mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels})
- (mtx/run! metrics {:id :rlimit-acquires-total :inc 1 :labels labels})
- d))
+(def ^:private bucket-rate-limit-script
+ {::rscript/name ::bucket-rate-limit
+ ::rscript/path "app/rpc/rlimit/bucket.lua"})
- (release! [this]
- (locking this
- (if-let [item (peek @queue)]
- (do
- (vswap! queue pop)
- (p/resolve! item))
- (when (pos? @used)
- (vswap! used dec))))
+(def ^:private window-rate-limit-script
+ {::rscript/name ::window-rate-limit
+ ::rscript/path "app/rpc/rlimit/window.lua"})
- (mtx/run! metrics {:id :rlimit-used-permits :val @used :labels labels})
- (mtx/run! metrics {:id :rlimit-queued-submissions :val (count @queue) :labels labels})
- ))))
+(def enabled?
+ "Allows on runtime completly disable rate limiting."
+ (atom true))
-(defn wrap-rlimit
- [{:keys [metrics executors] :as cfg} f mdata]
- (if-let [permits (::permits mdata)]
- (let [sem (semaphore {:permits permits
- :metrics metrics
- :name (::sv/name mdata)})]
- (l/debug :hint "wrapping rlimit" :handler (::sv/name mdata) :permits permits)
- (fn [cfg params]
- (-> (acquire! sem)
- (p/then (fn [_] (f cfg params)) (:default executors))
- (p/finally (fn [_ _] (release! sem))))))
- f))
+(def ^:private window-opts-re
+ #"^(\d+)/([wdhms])$")
+(def ^:private bucket-opts-re
+ #"^(\d+)/(\d+)/(\d+[hms])$")
+(s/def ::strategy (s/and ::us/keyword #{:window :bucket}))
+
+(s/def ::limit-definition
+ (s/tuple ::us/keyword ::strategy string?))
+
+(defmulti parse-limit (fn [[_ strategy _]] strategy))
+(defmulti process-limit (fn [_ _ _ o] (::strategy o)))
+
+(defmethod parse-limit :window
+ [[name strategy opts :as vlimit]]
+ (us/assert! ::limit-definition vlimit)
+ (merge
+ {::name name
+ ::strategy strategy}
+ (if-let [[_ nreq unit] (re-find window-opts-re opts)]
+ (let [nreq (parse-long nreq)]
+ {::nreq nreq
+ ::unit (case unit
+ "d" :days
+ "h" :hours
+ "m" :minutes
+ "s" :seconds
+ "w" :weeks)
+ ::key (dm/str "ratelimit.window." (d/name name))
+ ::opts opts})
+ (ex/raise :type :validation
+ :code :invalid-window-limit-opts
+ :hint (str/ffmt "looks like '%' does not have a valid format" opts)))))
+
+(defmethod parse-limit :bucket
+ [[name strategy opts :as vlimit]]
+ (us/assert! ::limit-definition vlimit)
+ (merge
+ {::name name
+ ::strategy strategy}
+ (if-let [[_ capacity rate interval] (re-find bucket-opts-re opts)]
+ (let [interval (dt/duration interval)
+ rate (parse-long rate)
+ capacity (parse-long capacity)]
+ {::capacity capacity
+ ::rate rate
+ ::interval interval
+ ::opts opts
+ ::params [(dt/->seconds interval) rate capacity]
+ ::key (dm/str "ratelimit.bucket." (d/name name))})
+ (ex/raise :type :validation
+ :code :invalid-bucket-limit-opts
+ :hint (str/ffmt "looks like '%' does not have a valid format" opts)))))
+
+(defmethod process-limit :bucket
+ [redis user-id now {:keys [::key ::params ::service ::capacity ::interval ::rate] :as limit}]
+ (let [script (-> bucket-rate-limit-script
+ (assoc ::rscript/keys [(dm/str key "." service "." user-id)])
+ (assoc ::rscript/vals (conj params (dt/->seconds now))))]
+ (-> (redis/eval! redis script)
+ (p/then (fn [result]
+ (let [allowed? (boolean (nth result 0))
+ remaining (nth result 1)
+ reset (* (/ (inst-ms interval) rate)
+ (- capacity remaining))]
+ (l/trace :hint "limit processed"
+ :service service
+ :limit (name (::name limit))
+ :strategy (name (::strategy limit))
+ :opts (::opts limit)
+ :allowed? allowed?
+ :remaining remaining)
+ (-> limit
+ (assoc ::lresult/allowed? allowed?)
+ (assoc ::lresult/reset (dt/plus now reset))
+ (assoc ::lresult/remaining remaining))))))))
+
+(defmethod process-limit :window
+ [redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}]
+ (let [ts (dt/truncate now unit)
+ ttl (dt/diff now (dt/plus ts {unit 1}))
+ script (-> window-rate-limit-script
+ (assoc ::rscript/keys [(dm/str key "." service "." user-id "." (dt/format-instant ts))])
+ (assoc ::rscript/vals [nreq (dt/->seconds ttl)]))]
+ (-> (redis/eval! redis script)
+ (p/then (fn [result]
+ (let [allowed? (boolean (nth result 0))
+ remaining (nth result 1)]
+ (l/trace :hint "limit processed"
+ :service service
+ :limit (name (::name limit))
+ :strategy (name (::strategy limit))
+ :opts (::opts limit)
+ :allowed? allowed?
+ :remaining remaining)
+ (-> limit
+ (assoc ::lresult/allowed? allowed?)
+ (assoc ::lresult/remaining remaining)
+ (assoc ::lresult/reset (dt/plus ts {unit 1})))))))))
+
+(defn- process-limits
+ [redis user-id limits now]
+ (-> (p/all (map (partial process-limit redis user-id now) (reverse limits)))
+ (p/then (fn [results]
+ (let [remaining (->> results
+ (d/index-by ::name ::lresult/remaining)
+ (uri/map->query-string))
+ reset (->> results
+ (d/index-by ::name (comp dt/->seconds ::lresult/reset))
+ (uri/map->query-string))
+ rejected (->> results
+ (filter (complement ::lresult/allowed?))
+ (first))]
+ (when (and rejected (contains? cf/flags :warn-rpc-rate-limits))
+ (l/warn :hint "rejected rate limit"
+ :user-id (dm/str user-id)
+ :limit-service (-> rejected ::service name)
+ :limit-name (-> rejected ::name name)
+ :limit-strategy (-> rejected ::strategy name)))
+
+ {:enabled? true
+ :allowed? (some? rejected)
+ :headers {"x-rate-limit-remaining" remaining
+ "x-rate-limit-reset" reset}})))))
+
+(defn- parse-limits
+ [service limits]
+ (let [default (some->> (cf/get :default-rpc-rlimit)
+ (us/conform ::limit-definition))
+
+ limits (cond->> limits
+ (some? default) (cons default))]
+
+ (->> (reverse limits)
+ (sequence (comp (map parse-limit)
+ (map #(assoc % ::service service))
+ (d/distinct-xf ::name))))))
+
+(defn- handle-response
+ [f cfg params rres]
+ (if (:enabled? rres)
+ (let [headers {"x-rate-limit-remaining" (:remaining rres)
+ "x-rate-limit-reset" (:reset rres)}]
+ (when-not (:allowed? rres)
+ (ex/raise :type :rate-limit
+ :code :request-blocked
+ :hint "rate limit reached"
+ ::http/headers headers))
+ (-> (f cfg params)
+ (p/then (fn [response]
+ (with-meta response
+ {::http/headers headers})))))
+
+ (f cfg params)))
+
+(defn wrap
+ [{:keys [redis] :as cfg} f {service ::sv/name :as mdata}]
+ (let [limits (parse-limits service (::limits mdata))
+ default-rresp (p/resolved {:enabled? false})]
+
+ (if (and (seq limits)
+ (or (contains? cf/flags :rpc-rate-limit)
+ (contains? cf/flags :soft-rpc-rate-limit)))
+ (fn [cfg {:keys [::http/request] :as params}]
+ (let [user-id (or (:profile-id params)
+ (some-> request parse-client-ip)
+ uuid/zero)
+
+ rresp (when (and user-id @enabled?)
+ (let [redis (redis/get-or-connect redis ::rlimit default-options)
+ rresp (-> (process-limits redis user-id limits (dt/now))
+ (p/catch (fn [cause]
+ ;; If we have an error on processing the
+ ;; rate-limit we just skip it for do not cause
+ ;; service interruption because of redis downtime
+ ;; or similar situation.
+ (l/error :hint "error on processing rate-limit" :cause cause)
+ {:enabled? false})))]
+
+ ;; If soft rate are enabled, we process the rate-limit but return
+ ;; unprotected response.
+ (and (contains? cf/flags :soft-rpc-rate-limit) rresp)))]
+
+ (p/then (or rresp default-rresp)
+ (partial handle-response f cfg params))))
+ f)))
diff --git a/backend/src/app/rpc/rlimit/bucket.lua b/backend/src/app/rpc/rlimit/bucket.lua
new file mode 100644
index 0000000000..4200dec4d1
--- /dev/null
+++ b/backend/src/app/rpc/rlimit/bucket.lua
@@ -0,0 +1,33 @@
+local tokensKey = KEYS[1]
+
+local interval = tonumber(ARGV[1])
+local rate = tonumber(ARGV[2])
+local capacity = tonumber(ARGV[3])
+local timestamp = tonumber(ARGV[4])
+local requested = tonumber(ARGV[5] or 1)
+
+local fillTime = capacity / (rate / interval);
+local ttl = math.floor(fillTime * 2)
+
+local lastTokens = tonumber(redis.call("hget", tokensKey, "tokens"))
+if lastTokens == nil then
+ lastTokens = capacity
+end
+
+local lastRefreshed = tonumber(redis.call("hget", tokensKey, "timestamp"))
+if lastRefreshed == nil then
+ lastRefreshed = 0
+end
+
+local delta = math.max(0, (timestamp - lastRefreshed) / interval)
+local filled = math.min(capacity, lastTokens + math.floor(delta * rate));
+local allowed = filled >= requested
+local newTokens = filled
+if allowed then
+ newTokens = filled - requested
+end
+
+redis.call("hset", tokensKey, "tokens", newTokens, "timestamp", timestamp)
+redis.call("expire", tokensKey, ttl)
+
+return { allowed, newTokens }
diff --git a/backend/src/app/rpc/rlimit/window.lua b/backend/src/app/rpc/rlimit/window.lua
new file mode 100644
index 0000000000..d5e8e8af63
--- /dev/null
+++ b/backend/src/app/rpc/rlimit/window.lua
@@ -0,0 +1,18 @@
+local windowKey = KEYS[1]
+
+local nreq = tonumber(ARGV[1])
+local ttl = tonumber(ARGV[2])
+
+local total = tonumber(redis.call("incr", windowKey))
+redis.call("expire", windowKey, ttl)
+
+local allowed = total <= nreq
+local remaining = nreq - total
+
+if remaining < 0 then
+ remaining = 0
+end
+
+return {allowed, remaining}
+
+
diff --git a/backend/src/app/rpc/semaphore.clj b/backend/src/app/rpc/semaphore.clj
new file mode 100644
index 0000000000..45f90839d5
--- /dev/null
+++ b/backend/src/app/rpc/semaphore.clj
@@ -0,0 +1,68 @@
+;; This Source Code Form is subject to the terms of the Mozilla Public
+;; License, v. 2.0. If a copy of the MPL was not distributed with this
+;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
+;;
+;; Copyright (c) UXBOX Labs SL
+
+(ns app.rpc.semaphore
+ "Resource usage limits (in other words: semaphores)."
+ (:require
+ [app.common.data :as d]
+ [app.common.logging :as l]
+ [app.metrics :as mtx]
+ [app.util.locks :as locks]
+ [app.util.services :as-alias sv]
+ [promesa.core :as p]))
+
+(defprotocol IAsyncSemaphore
+ (acquire! [_])
+ (release! [_]))
+
+(defn create
+ [& {:keys [permits metrics name]}]
+ (let [name (d/name name)
+ used (volatile! 0)
+ queue (volatile! (d/queue))
+ labels (into-array String [name])
+ lock (locks/create)]
+
+ (reify IAsyncSemaphore
+ (acquire! [_]
+ (let [d (p/deferred)]
+ (locks/locking lock
+ (if (< @used permits)
+ (do
+ (vswap! used inc)
+ (p/resolve! d))
+ (vswap! queue conj d)))
+
+ (mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels })
+ (mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels})
+ (mtx/run! metrics {:id :rpc-semaphore-acquires-total :inc 1 :labels labels})
+ d))
+
+ (release! [_]
+ (locks/locking lock
+ (if-let [item (peek @queue)]
+ (do
+ (vswap! queue pop)
+ (p/resolve! item))
+ (when (pos? @used)
+ (vswap! used dec))))
+
+ (mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels})
+ (mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels})))))
+
+(defn wrap
+ [{:keys [metrics executors] :as cfg} f mdata]
+ (if-let [permits (::permits mdata)]
+ (let [sem (create {:permits permits
+ :metrics metrics
+ :name (::sv/name mdata)})]
+ (l/debug :hint "wrapping semaphore" :handler (::sv/name mdata) :permits permits)
+ (fn [cfg params]
+ (-> (acquire! sem)
+ (p/then (fn [_] (f cfg params)) (:default executors))
+ (p/finally (fn [_ _] (release! sem))))))
+ f))
+
diff --git a/backend/src/app/setup.clj b/backend/src/app/setup.clj
index 7bc0960ba5..1fe5dc7648 100644
--- a/backend/src/app/setup.clj
+++ b/backend/src/app/setup.clj
@@ -11,6 +11,7 @@
[app.common.uuid :as uuid]
[app.db :as db]
[app.setup.builtin-templates]
+ [app.setup.keys :as keys]
[buddy.core.codecs :as bc]
[buddy.core.nonce :as bn]
[clojure.spec.alpha :as s]
@@ -59,6 +60,8 @@
"all sessions on each restart, it is hightly recommeded setting up the "
"PENPOT_SECRET_KEY environment variable")))
- (let [stored (-> (retrieve-all conn)
- (assoc :secret-key (or key (generate-random-key))))]
- (update stored :instance-id handle-instance-id conn (db/read-only? pool)))))
+ (let [secret (or key (generate-random-key))]
+ (-> (retrieve-all conn)
+ (assoc :secret-key secret)
+ (assoc :tokens-key (keys/derive secret :salt "tokens" :size 32))
+ (update :instance-id handle-instance-id conn (db/read-only? pool))))))
diff --git a/backend/src/app/setup/builtin_templates.clj b/backend/src/app/setup/builtin_templates.clj
index d052550582..11cfe0fa92 100644
--- a/backend/src/app/setup/builtin_templates.clj
+++ b/backend/src/app/setup/builtin_templates.clj
@@ -14,7 +14,7 @@
[clojure.edn :as edn]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
- [datoteka.core :as fs]
+ [datoteka.fs :as fs]
[integrant.core :as ig]))
(declare download-all!)
diff --git a/backend/src/app/setup/keys.clj b/backend/src/app/setup/keys.clj
index 372b83a201..468081304a 100644
--- a/backend/src/app/setup/keys.clj
+++ b/backend/src/app/setup/keys.clj
@@ -6,24 +6,17 @@
(ns app.setup.keys
"Keys derivation service."
+ (:refer-clojure :exclude [derive])
(:require
[app.common.spec :as us]
- [buddy.core.kdf :as bk]
- [clojure.spec.alpha :as s]
- [integrant.core :as ig]))
-
-(s/def ::secret-key ::us/string)
-(s/def ::props (s/keys :req-un [::secret-key]))
-
-(defmethod ig/pre-init-spec :app.setup/keys [_]
- (s/keys :req-un [::props]))
-
-(defmethod ig/init-key :app.setup/keys
- [_ {:keys [props] :as cfg}]
- (fn [& {:keys [salt _]}]
- (let [engine (bk/engine {:key (:secret-key props)
- :salt salt
- :alg :hkdf
- :digest :blake2b-512})]
- (bk/get-bytes engine 32))))
+ [buddy.core.kdf :as bk]))
+(defn derive
+ "Derive a key from secret-key"
+ [secret-key & {:keys [salt size]}]
+ (us/assert! ::us/not-empty-string secret-key)
+ (let [engine (bk/engine {:key secret-key
+ :salt salt
+ :alg :hkdf
+ :digest :blake2b-512})]
+ (bk/get-bytes engine size)))
diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj
index 4fbf05a5a2..a4e7209ea9 100644
--- a/backend/src/app/storage.clj
+++ b/backend/src/app/storage.clj
@@ -20,7 +20,7 @@
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
- [datoteka.core :as fs]
+ [datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]))
diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj
index 4feaaf6242..fbfbc8369f 100644
--- a/backend/src/app/storage/fs.clj
+++ b/backend/src/app/storage/fs.clj
@@ -14,7 +14,7 @@
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
- [datoteka.core :as fs]
+ [datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px])
diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj
index 72480dd539..99113f833d 100644
--- a/backend/src/app/storage/s3.clj
+++ b/backend/src/app/storage/s3.clj
@@ -17,7 +17,7 @@
[app.worker :as wrk]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
- [datoteka.core :as fs]
+ [datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px])
diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj
index cdb1b0cc71..69503a4552 100644
--- a/backend/src/app/storage/tmp.clj
+++ b/backend/src/app/storage/tmp.clj
@@ -16,7 +16,7 @@
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
- [datoteka.core :as fs]
+ [datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.exec :as px]))
diff --git a/backend/src/app/tokens.clj b/backend/src/app/tokens.clj
index dc68c6897a..3a991609ee 100644
--- a/backend/src/app/tokens.clj
+++ b/backend/src/app/tokens.clj
@@ -5,7 +5,7 @@
;; Copyright (c) UXBOX Labs SL
(ns app.tokens
- "Tokens generation service."
+ "Tokens generation API."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
@@ -13,20 +13,22 @@
[app.common.transit :as t]
[app.util.time :as dt]
[buddy.sign.jwe :as jwe]
- [clojure.spec.alpha :as s]
- [integrant.core :as ig]))
+ [clojure.spec.alpha :as s]))
-(defn- generate
- [cfg claims]
+(s/def ::tokens-key bytes?)
+
+(defn generate
+ [{:keys [tokens-key]} claims]
+ (us/assert! ::tokens-key tokens-key)
(let [payload (-> claims
(assoc :iat (dt/now))
(d/without-nils)
(t/encode))]
- (jwe/encrypt payload (::secret cfg) {:alg :a256kw :enc :a256gcm})))
+ (jwe/encrypt payload tokens-key {:alg :a256kw :enc :a256gcm})))
-(defn- verify
- [cfg {:keys [token] :as params}]
- (let [payload (jwe/decrypt token (::secret cfg) {:alg :a256kw :enc :a256gcm})
+(defn verify
+ [{:keys [tokens-key]} {:keys [token] :as params}]
+ (let [payload (jwe/decrypt token tokens-key {:alg :a256kw :enc :a256gcm})
claims (t/decode payload)]
(when (and (dt/instant? (:exp claims))
(dt/is-before? (:exp claims) (dt/now)))
@@ -45,30 +47,7 @@
:params params))
claims))
-(defn- generate-predefined
- [cfg {:keys [iss profile-id] :as params}]
- (case iss
- :profile-identity
- (do
- (us/verify uuid? profile-id)
- (generate cfg (assoc params
- :exp (dt/in-future {:days 30}))))
- (ex/raise :type :internal
- :code :not-implemented
- :hint "no predefined token")))
-(s/def ::keys fn?)
-(defmethod ig/pre-init-spec ::tokens [_]
- (s/keys :req-un [::keys]))
-(defmethod ig/init-key ::tokens
- [_ {:keys [keys] :as cfg}]
- (let [secret (keys :salt "tokens" :size 32)
- cfg (assoc cfg ::secret secret)]
- (fn [action params]
- (case action
- :generate-predefined (generate-predefined cfg params)
- :verify (verify cfg params)
- :generate (generate cfg params)))))
diff --git a/backend/src/app/util/bytes.clj b/backend/src/app/util/bytes.clj
index 50a73d3350..5e5c2dca43 100644
--- a/backend/src/app/util/bytes.clj
+++ b/backend/src/app/util/bytes.clj
@@ -8,7 +8,7 @@
"Bytes & Byte Streams helpers"
(:require
[clojure.java.io :as io]
- [datoteka.core :as fs]
+ [datoteka.fs :as fs]
[yetti.adapter :as yt])
(:import
com.github.luben.zstd.ZstdInputStream
@@ -23,6 +23,8 @@
org.apache.commons.io.IOUtils
org.apache.commons.io.input.BoundedInputStream))
+;; TODO: migrate to datoteka.io
+
(set! *warn-on-reflection* true)
(def ^:const default-buffer-size
diff --git a/backend/src/app/util/locks.clj b/backend/src/app/util/locks.clj
new file mode 100644
index 0000000000..05a69166d9
--- /dev/null
+++ b/backend/src/app/util/locks.clj
@@ -0,0 +1,26 @@
+;; This Source Code Form is subject to the terms of the Mozilla Public
+;; License, v. 2.0. If a copy of the MPL was not distributed with this
+;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
+;;
+;; Copyright (c) UXBOX Labs SL
+
+(ns app.util.locks
+ "A syntactic helpers for using locks."
+ (:refer-clojure :exclude [locking])
+ (:import
+ java.util.concurrent.locks.ReentrantLock
+ java.util.concurrent.locks.Lock))
+
+(defn create
+ []
+ (ReentrantLock.))
+
+(defmacro locking
+ [lsym & body]
+ (let [lsym (vary-meta lsym assoc :tag `Lock)]
+ `(do
+ (.lock ~lsym)
+ (try
+ ~@body
+ (finally
+ (.unlock ~lsym))))))
diff --git a/backend/src/app/util/time.clj b/backend/src/app/util/time.clj
index 422c92fb3e..5858a46e69 100644
--- a/backend/src/app/util/time.clj
+++ b/backend/src/app/util/time.clj
@@ -27,16 +27,29 @@
;; Instant & Duration
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+(defn temporal-unit
+ [o]
+ (if (instance? TemporalUnit o)
+ o
+ (case o
+ :nanos ChronoUnit/NANOS
+ :millis ChronoUnit/MILLIS
+ :micros ChronoUnit/MICROS
+ :seconds ChronoUnit/SECONDS
+ :minutes ChronoUnit/MINUTES
+ :hours ChronoUnit/HOURS
+ :days ChronoUnit/DAYS
+ :weeks ChronoUnit/WEEKS
+ :monts ChronoUnit/MONTHS)))
+
;; --- DURATION
(defn- obj->duration
- [{:keys [days minutes seconds hours nanos millis]}]
- (cond-> (Duration/ofMillis (if (int? millis) ^long millis 0))
- (int? days) (.plusDays ^long days)
- (int? hours) (.plusHours ^long hours)
- (int? minutes) (.plusMinutes ^long minutes)
- (int? seconds) (.plusSeconds ^long seconds)
- (int? nanos) (.plusNanos ^long nanos)))
+ [params]
+ (reduce-kv (fn [o k v]
+ (.plus ^Duration o ^long v ^TemporalUnit (temporal-unit k)))
+ (Duration/ofMillis 0)
+ params))
(defn duration?
[v]
@@ -57,20 +70,17 @@
:else
(obj->duration ms-or-obj)))
+(defn ->seconds
+ [d]
+ (-> d inst-ms (/ 1000) int))
+
(defn diff
[t1 t2]
(Duration/between t1 t2))
(defn truncate
[o unit]
- (let [unit (if (instance? TemporalUnit unit)
- unit
- (case unit
- :nanos ChronoUnit/NANOS
- :millis ChronoUnit/MILLIS
- :micros ChronoUnit/MICROS
- :seconds ChronoUnit/SECONDS
- :minutes ChronoUnit/MINUTES))]
+ (let [unit (temporal-unit unit)]
(cond
(instance? Instant o)
(.truncatedTo ^Instant o ^TemporalUnit unit)
@@ -159,11 +169,11 @@
(defn in-future
[v]
- (plus (now) (duration v)))
+ (plus (now) v))
(defn in-past
[v]
- (minus (now) (duration v)))
+ (minus (now) v))
(defn instant->zoned-date-time
[v]
@@ -315,3 +325,13 @@
CronExpression
(-edn [o] (pr-str o)))
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;; Measurement Helpers
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+(defn tpoint
+ "Create a measurement checkpoint for time measurement of potentially
+ asynchronous flow."
+ []
+ (let [p1 (System/nanoTime)]
+ #(duration {:nanos (- (System/nanoTime) p1)})))
diff --git a/backend/test/app/bounce_handling_test.clj b/backend/test/app/bounce_handling_test.clj
index 3d423f73f3..490a8c1b9f 100644
--- a/backend/test/app/bounce_handling_test.clj
+++ b/backend/test/app/bounce_handling_test.clj
@@ -10,6 +10,7 @@
[app.emails :as emails]
[app.http.awsns :as awsns]
[app.test-helpers :as th]
+ [app.tokens :as tokens]
[app.util.time :as dt]
[clojure.pprint :refer [pprint]]
[clojure.test :as t]
@@ -100,11 +101,11 @@
(t/deftest test-parse-bounce-report
(let [profile (th/create-profile* 1)
- tokens (:app.tokens/tokens th/*system*)
- cfg {:tokens tokens}
- report (bounce-report {:token (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})})
+ sprops (:app.setup/props th/*system*)
+ cfg {:sprops sprops}
+ report (bounce-report {:token (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)})})
result (#'awsns/parse-notification cfg report)]
;; (pprint result)
@@ -117,11 +118,11 @@
(t/deftest test-parse-complaint-report
(let [profile (th/create-profile* 1)
- tokens (:app.tokens/tokens th/*system*)
- cfg {:tokens tokens}
- report (complaint-report {:token (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})})
+ sprops (:app.setup/props th/*system*)
+ cfg {:sprops sprops}
+ report (complaint-report {:token (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)})})
result (#'awsns/parse-notification cfg report)]
;; (pprint result)
(t/is (= "complaint" (:type result)))
@@ -132,8 +133,8 @@
))
(t/deftest test-parse-complaint-report-without-token
- (let [tokens (:app.tokens/tokens th/*system*)
- cfg {:tokens tokens}
+ (let [sprops (:app.setup/props th/*system*)
+ cfg {:sprops sprops}
report (complaint-report {:token ""})
result (#'awsns/parse-notification cfg report)]
(t/is (= "complaint" (:type result)))
@@ -145,12 +146,12 @@
(t/deftest test-process-bounce-report
(let [profile (th/create-profile* 1)
- tokens (:app.tokens/tokens th/*system*)
+ sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*)
- cfg {:tokens tokens :pool pool}
- report (bounce-report {:token (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})})
+ cfg {:sprops sprops :pool pool}
+ report (bounce-report {:token (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)]
(#'awsns/process-report cfg report)
@@ -174,12 +175,12 @@
(t/deftest test-process-complaint-report
(let [profile (th/create-profile* 1)
- tokens (:app.tokens/tokens th/*system*)
+ sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*)
- cfg {:tokens tokens :pool pool}
- report (complaint-report {:token (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})})
+ cfg {:sprops sprops :pool pool}
+ report (complaint-report {:token (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)]
(#'awsns/process-report cfg report)
@@ -205,13 +206,13 @@
(t/deftest test-process-bounce-report-to-self
(let [profile (th/create-profile* 1)
- tokens (:app.tokens/tokens th/*system*)
+ sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*)
- cfg {:tokens tokens :pool pool}
+ cfg {:sprops sprops :pool pool}
report (bounce-report {:email (:email profile)
- :token (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})})
+ :token (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)]
(#'awsns/process-report cfg report)
@@ -227,13 +228,13 @@
(t/deftest test-process-complaint-report-to-self
(let [profile (th/create-profile* 1)
- tokens (:app.tokens/tokens th/*system*)
+ sprops (:app.setup/props th/*system*)
pool (:app.db/pool th/*system*)
- cfg {:tokens tokens :pool pool}
+ cfg {:sprops sprops :pool pool}
report (complaint-report {:email (:email profile)
- :token (tokens :generate-predefined
- {:iss :profile-identity
- :profile-id (:id profile)})})
+ :token (tokens/generate sprops
+ {:iss :profile-identity
+ :profile-id (:id profile)})})
report (#'awsns/parse-notification cfg report)]
(#'awsns/process-report cfg report)
diff --git a/backend/test/app/services_profile_test.clj b/backend/test/app/services_profile_test.clj
index 68f14c3b48..e750ea6d25 100644
--- a/backend/test/app/services_profile_test.clj
+++ b/backend/test/app/services_profile_test.clj
@@ -9,9 +9,10 @@
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
- [app.rpc.mutations.profile :as profile]
[app.rpc.commands.auth :as cauth]
+ [app.rpc.mutations.profile :as profile]
[app.test-helpers :as th]
+ [app.tokens :as tokens]
[app.util.time :as dt]
[clojure.java.io :as io]
[clojure.test :as t]
@@ -196,13 +197,13 @@
(t/deftest prepare-and-register-with-invitation-and-disabled-registration-1
(with-redefs [app.config/flags [:disable-registration]]
- (let [tokens-fn (:app.tokens/tokens th/*system*)
- itoken (tokens-fn :generate
- {:iss :team-invitation
- :exp (dt/in-future "48h")
- :role :editor
- :team-id uuid/zero
- :member-email "user@example.com"})
+ (let [sprops (:app.setup/props th/*system*)
+ itoken (tokens/generate sprops
+ {:iss :team-invitation
+ :exp (dt/in-future "48h")
+ :role :editor
+ :team-id uuid/zero
+ :member-email "user@example.com"})
data {::th/type :prepare-register-profile
:invitation-token itoken
:email "user@example.com"
@@ -226,13 +227,13 @@
(t/deftest prepare-and-register-with-invitation-and-disabled-registration-2
(with-redefs [app.config/flags [:disable-registration]]
- (let [tokens-fn (:app.tokens/tokens th/*system*)
- itoken (tokens-fn :generate
- {:iss :team-invitation
- :exp (dt/in-future "48h")
- :role :editor
- :team-id uuid/zero
- :member-email "user2@example.com"})
+ (let [sprops (:app.setup/props th/*system*)
+ itoken (tokens/generate sprops
+ {:iss :team-invitation
+ :exp (dt/in-future "48h")
+ :role :editor
+ :team-id uuid/zero
+ :member-email "user2@example.com"})
data {::th/type :prepare-register-profile
:invitation-token itoken
diff --git a/backend/test/app/test_helpers.clj b/backend/test/app/test_helpers.clj
index 6b4e0ec470..a3dec3285b 100644
--- a/backend/test/app/test_helpers.clj
+++ b/backend/test/app/test_helpers.clj
@@ -59,7 +59,7 @@
:path (-> "app/test_files/template.penpot" io/resource fs/path)}]
config (-> main/system-config
(merge main/worker-config)
- (assoc-in [:app.msgbus/msgbus :redis-uri] (:redis-uri config))
+ (assoc-in [:app.redis/redis :uri] (:redis-uri config))
(assoc-in [:app.db/pool :uri] (:database-uri config))
(assoc-in [:app.db/pool :username] (:database-username config))
(assoc-in [:app.db/pool :password] (:database-password config))
diff --git a/common/deps.edn b/common/deps.edn
index 202f57b3e7..64ad27bf0a 100644
--- a/common/deps.edn
+++ b/common/deps.edn
@@ -28,7 +28,8 @@
:exclusions [org.clojure/data.json]}
frankiesardo/linked {:mvn/version "1.3.0"}
- commons-io/commons-io {:mvn/version "2.11.0"}
+
+ funcool/datoteka {:mvn/version "3.0.65"}
com.sun.mail/jakarta.mail {:mvn/version "2.0.1"}
;; exception printing
diff --git a/common/scripts/repl b/common/scripts/repl
index 4570f636f4..4317cc23ea 100755
--- a/common/scripts/repl
+++ b/common/scripts/repl
@@ -1,7 +1,7 @@
#!/usr/bin/env bash
export PENPOT_FLAGS="enable-asserts enable-audit-log $PENPOT_FLAGS"
-export OPTIONS="-A:dev -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -J-XX:+UseZGC -J-XX:ConcGCThreads=1 -J-XX:-OmitStackTraceInFastThrow -J-Xms50m -J-Xmx512m";
+export OPTIONS="-A:dev -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager -J-XX:+UseG1GC -J-XX:-OmitStackTraceInFastThrow -J-Xms50m -J-Xmx512m";
export OPTIONS_EVAL="nil"
# export OPTIONS_EVAL="(set! *warn-on-reflection* true)"
diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc
index d03a935a25..6c0a195d27 100644
--- a/common/src/app/common/data.cljc
+++ b/common/src/app/common/data.cljc
@@ -10,6 +10,7 @@
parse-double group-by iteration])
#?(:cljs
(:require-macros [app.common.data]))
+
(:require
[app.common.math :as mth]
[clojure.set :as set]
diff --git a/common/src/app/common/exceptions.cljc b/common/src/app/common/exceptions.cljc
index 424a594a85..142b67fffe 100644
--- a/common/src/app/common/exceptions.cljc
+++ b/common/src/app/common/exceptions.cljc
@@ -81,8 +81,8 @@
(deref [_] cause)))
-(ns-unmap 'app.common.exceptions '->WrappedException)
-(ns-unmap 'app.common.exceptions 'map->WrappedException)
+#?(:clj (ns-unmap 'app.common.exceptions '->WrappedException))
+#?(:clj (ns-unmap 'app.common.exceptions 'map->WrappedException))
(defn wrapped?
[o]
diff --git a/common/src/app/common/spec.cljc b/common/src/app/common/spec.cljc
index 654270206c..29d747aa0c 100644
--- a/common/src/app/common/spec.cljc
+++ b/common/src/app/common/spec.cljc
@@ -133,9 +133,9 @@
(dm/str v))]
(s/def ::rgb-color-str (s/conformer conformer unformer)))
-;; --- SPEC: set/vec of valid Keywords
+;; --- SPEC: set/vector of Keywords
-(letfn [(conform-fn [dest s]
+(letfn [(conformer-fn [dest s]
(let [xform (keep (fn [s]
(cond
(string? s) (keyword s)
@@ -144,17 +144,38 @@
(cond
(set? s) (into dest xform s)
(string? s) (into dest xform (str/words s))
- :else ::s/invalid)))]
+ :else ::s/invalid)))
+ (unformer-fn [v]
+ (str/join " " (map name v)))]
- (s/def ::set-of-valid-keywords
- (s/conformer
- (fn [s] (conform-fn #{} s))
- (fn [s] (str/join " " (map name s)))))
+ (s/def ::set-of-keywords
+ (s/conformer (partial conformer-fn #{}) unformer-fn))
- (s/def ::vec-of-valid-keywords
- (s/conformer
- (fn [s] (conform-fn [] s))
- (fn [s] (str/join " " (map name s))))))
+ (s/def ::vector-of-keywords
+ (s/conformer (partial conformer-fn []) unformer-fn)))
+
+;; --- SPEC: set/vector of strings
+
+(def non-empty-strings-xf
+ (comp
+ (filter string?)
+ (remove str/empty?)
+ (remove str/blank?)))
+
+(letfn [(conformer-fn [dest v]
+ (cond
+ (string? v) (into dest non-empty-strings-xf (str/split v #"[\s,]+"))
+ (vector? v) (into dest non-empty-strings-xf v)
+ (set? v) (into dest non-empty-strings-xf v)
+ :else ::s/invalid))
+ (unformer-fn [v]
+ (str/join "," v))]
+
+ (s/def ::set-of-strings
+ (s/conformer (partial conformer-fn #{}) unformer-fn))
+
+ (s/def ::vector-of-strings
+ (s/conformer (partial conformer-fn []) unformer-fn)))
;; --- SPEC: set-of-valid-emails
@@ -173,23 +194,15 @@
(str/join " " v))]
(s/def ::set-of-valid-emails (s/conformer conformer unformer)))
-;; --- SPEC: set-of-non-empty-strings
-
-(def non-empty-strings-xf
- (comp
- (filter string?)
- (remove str/empty?)
- (remove str/blank?)))
+;; --- SPEC: query-string
(letfn [(conformer [s]
- (cond
- (string? s) (->> (str/split s #"\s*,\s*")
- (into #{} non-empty-strings-xf))
- (set? s) (into #{} non-empty-strings-xf s)
- :else ::s/invalid))
+ (if (string? s)
+ (ex/try* #(u/query-string->map s) (constantly ::s/invalid))
+ s))
(unformer [s]
- (str/join "," s))]
- (s/def ::set-of-non-empty-strings (s/conformer conformer unformer)))
+ (u/map->query-string s))]
+ (s/def ::query-string (s/conformer conformer unformer)))
;; --- SPECS WITHOUT CONFORMER
diff --git a/frontend/deps.edn b/frontend/deps.edn
index 24e203ac30..e6090eaa82 100644
--- a/frontend/deps.edn
+++ b/frontend/deps.edn
@@ -32,7 +32,7 @@
:dev
{:extra-paths ["dev"]
:extra-deps
- {thheller/shadow-cljs {:mvn/version "2.19.8"}
+ {thheller/shadow-cljs {:mvn/version "2.19.9"}
org.clojure/tools.namespace {:mvn/version "RELEASE"}
cider/cider-nrepl {:mvn/version "0.28.4"}}}
diff --git a/frontend/package.json b/frontend/package.json
index 79a5e6b366..c526579593 100644
--- a/frontend/package.json
+++ b/frontend/package.json
@@ -48,7 +48,7 @@
"prettier": "^2.7.1",
"rimraf": "^3.0.0",
"sass": "^1.53.0",
- "shadow-cljs": "2.19.8"
+ "shadow-cljs": "2.19.9"
},
"dependencies": {
"@sentry/browser": "^6.17.4",
diff --git a/frontend/src/app/main/ui/comments.cljs b/frontend/src/app/main/ui/comments.cljs
index 31fe477678..cddc4c23a9 100644
--- a/frontend/src/app/main/ui/comments.cljs
+++ b/frontend/src/app/main/ui/comments.cljs
@@ -116,6 +116,7 @@
(some? position-modifier)
(gpt/transform position-modifier))
content (:content draft)
+
pos-x (* (:x position) zoom)
pos-y (* (:y position) zoom)
diff --git a/frontend/src/app/main/ui/settings.cljs b/frontend/src/app/main/ui/settings.cljs
index 334b3feb87..299cf92e56 100644
--- a/frontend/src/app/main/ui/settings.cljs
+++ b/frontend/src/app/main/ui/settings.cljs
@@ -14,8 +14,8 @@
[app.main.ui.settings.options :refer [options-page]]
[app.main.ui.settings.password :refer [password-page]]
[app.main.ui.settings.profile :refer [profile-page]]
- [app.main.ui.settings.sidebar :refer [sidebar]]
- [app.util.i18n :as i18n :refer [tr]]
+ [app.main.ui.settings.sidebar :refer [sidebar]]
+ [app.util.i18n :as i18n :refer [tr]]
[app.util.router :as rt]
[rumext.alpha :as mf]))
@@ -31,10 +31,11 @@
(let [section (get-in route [:data :name])
profile (mf/deref refs/profile)
locale (mf/deref i18n/locale)]
+
(mf/use-effect
#(when (nil? profile)
(st/emit! (rt/nav :auth-login))))
-
+
[:section.dashboard-layout
[:& sidebar {:profile profile
:locale locale
diff --git a/frontend/yarn.lock b/frontend/yarn.lock
index a563315694..6c1da5b6b9 100644
--- a/frontend/yarn.lock
+++ b/frontend/yarn.lock
@@ -4711,10 +4711,10 @@ shadow-cljs-jar@1.3.2:
resolved "https://registry.yarnpkg.com/shadow-cljs-jar/-/shadow-cljs-jar-1.3.2.tgz#97273afe1747b6a2311917c1c88d9e243c81957b"
integrity sha512-XmeffAZHv8z7451kzeq9oKh8fh278Ak+UIOGGrapyqrFBB773xN8vMQ3O7J7TYLnb9BUwcqadKkmgaq7q6fhZg==
-shadow-cljs@2.19.8:
- version "2.19.8"
- resolved "https://registry.yarnpkg.com/shadow-cljs/-/shadow-cljs-2.19.8.tgz#1ce96cab3e4903bed8d401ffbe88b8939f5454d3"
- integrity sha512-6qek3mcAP0hrnC5FxrTebBrgLGpOuhlnp06vdxp6g0M5Gl6w2Y0hzSwa1s2K8fMOkzE4/ciQor75b2y64INgaw==
+shadow-cljs@2.19.9:
+ version "2.19.9"
+ resolved "https://registry.yarnpkg.com/shadow-cljs/-/shadow-cljs-2.19.9.tgz#1e6b6115241666504c705ca8e6d6c9c1bc64add2"
+ integrity sha512-rLPv98HBIKf/4Kxjxpq0gXaNBVupJ/ii+OApHuqXFuwJOgU94DbNDSP4Bzjo+az2tTD11yTzXiYZsTqyAy+VBQ==
dependencies:
node-libs-browser "^2.2.1"
readline-sync "^1.4.7"