mirror of
https://github.com/penpot/penpot.git
synced 2026-04-25 11:18:36 +00:00
597 lines
17 KiB
Clojure
597 lines
17 KiB
Clojure
;; This Source Code Form is subject to the terms of the Mozilla Public
|
|
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
;;
|
|
;; Copyright (c) KALEIDOS INC
|
|
|
|
(ns app.redis
|
|
"The msgbus abstraction implemented using redis as underlying backend."
|
|
(:refer-clojure :exclude [eval get set run!])
|
|
(:require
|
|
[app.common.data :as d]
|
|
[app.common.exceptions :as ex]
|
|
[app.common.generic-pool :as gpool]
|
|
[app.common.logging :as l]
|
|
[app.common.schema :as sm]
|
|
[app.common.time :as ct]
|
|
[app.metrics :as mtx]
|
|
[app.redis.script :as-alias rscript]
|
|
[app.worker :as wrk]
|
|
[app.worker.executor]
|
|
[clojure.core :as c]
|
|
[clojure.java.io :as io]
|
|
[cuerdas.core :as str]
|
|
[integrant.core :as ig])
|
|
(:import
|
|
clojure.lang.MapEntry
|
|
io.lettuce.core.api.StatefulRedisConnection
|
|
io.lettuce.core.api.sync.RedisCommands
|
|
io.lettuce.core.api.sync.RedisScriptingCommands
|
|
io.lettuce.core.codec.RedisCodec
|
|
io.lettuce.core.codec.StringCodec
|
|
io.lettuce.core.KeyValue
|
|
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
|
|
io.lettuce.core.pubsub.RedisPubSubListener
|
|
io.lettuce.core.pubsub.StatefulRedisPubSubConnection
|
|
io.lettuce.core.RedisClient
|
|
io.lettuce.core.RedisCommandInterruptedException
|
|
io.lettuce.core.RedisCommandTimeoutException
|
|
io.lettuce.core.RedisException
|
|
io.lettuce.core.RedisURI
|
|
io.lettuce.core.resource.ClientResources
|
|
io.lettuce.core.resource.DefaultClientResources
|
|
io.lettuce.core.ScriptOutputType
|
|
io.lettuce.core.SetArgs
|
|
io.netty.channel.nio.NioEventLoopGroup
|
|
io.netty.util.concurrent.EventExecutorGroup
|
|
io.netty.util.HashedWheelTimer
|
|
io.netty.util.Timer
|
|
java.lang.AutoCloseable
|
|
java.time.Duration))
|
|
|
|
(set! *warn-on-reflection* true)
|
|
|
|
(def ^:const MAX-EVAL-RETRIES 18)
|
|
|
|
(def default-timeout
|
|
(ct/duration "10s"))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
;; IMPL & PRIVATE API
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(defprotocol IConnection
|
|
(-set-timeout [_ timeout] "set connection timeout")
|
|
(-get-timeout [_] "get current timeout")
|
|
(-reset-timeout [_] "reset to default timeout"))
|
|
|
|
(defprotocol IDefaultConnection
|
|
"Public API of default redis connection"
|
|
(-publish [_ topic message])
|
|
(-rpush [_ key payload])
|
|
(-blpop [_ timeout keys])
|
|
(-eval [_ script])
|
|
(-get [_ key])
|
|
(-set [_ key val args])
|
|
(-del [_ key-or-keys])
|
|
(-ping [_]))
|
|
|
|
(defprotocol IPubSubConnection
|
|
(-add-listener [_ listener])
|
|
(-subscribe [_ topics])
|
|
(-unsubscribe [_ topics]))
|
|
|
|
(def ^:private default-codec
|
|
(RedisCodec/of StringCodec/UTF8 StringCodec/UTF8))
|
|
|
|
(defn- impl-eval
|
|
[cmd cache metrics script]
|
|
(let [keys (into-array String (map str (::rscript/keys script)))
|
|
vals (into-array String (map str (::rscript/vals script)))
|
|
sname (::rscript/name script)
|
|
|
|
read-script
|
|
(fn []
|
|
(-> script ::rscript/path io/resource slurp))
|
|
|
|
load-script
|
|
(fn []
|
|
(let [id (.scriptLoad ^RedisScriptingCommands cmd
|
|
^String (read-script))]
|
|
(swap! cache assoc sname id)
|
|
(l/trc :hint "load script" :name sname :id id)
|
|
|
|
id))
|
|
|
|
eval-script
|
|
(fn [id]
|
|
(try
|
|
(let [tpoint (ct/tpoint)
|
|
result (.evalsha ^RedisScriptingCommands cmd
|
|
^String id
|
|
^ScriptOutputType ScriptOutputType/MULTI
|
|
^"[Ljava.lang.String;" keys
|
|
^"[Ljava.lang.String;" vals)
|
|
elapsed (tpoint)]
|
|
|
|
(mtx/run! metrics {:id :redis-eval-timing
|
|
:labels [(name sname)]
|
|
:val (inst-ms elapsed)})
|
|
|
|
(l/trc :hint "eval script"
|
|
:name (name sname)
|
|
:id id
|
|
:params (str/join "," (::rscript/vals script))
|
|
:elapsed (ct/format-duration elapsed))
|
|
|
|
result)
|
|
|
|
(catch io.lettuce.core.RedisNoScriptException _cause
|
|
::load)
|
|
|
|
(catch Throwable cause
|
|
(when-let [on-error (::rscript/on-error script)]
|
|
(on-error cause))
|
|
(throw cause))))
|
|
|
|
eval-script'
|
|
(fn [id]
|
|
(loop [id id
|
|
retries 0]
|
|
(if (> retries MAX-EVAL-RETRIES)
|
|
(ex/raise :type :internal
|
|
:code ::max-eval-retries-reached
|
|
:hint (str "unable to eval redis script " sname))
|
|
(let [result (eval-script id)]
|
|
(if (= result ::load)
|
|
(recur (load-script)
|
|
(inc retries))
|
|
result)))))]
|
|
|
|
(if-let [id (c/get @cache sname)]
|
|
(eval-script' id)
|
|
(-> (load-script)
|
|
(eval-script')))))
|
|
|
|
(deftype Connection [^StatefulRedisConnection conn
|
|
^RedisCommands cmd
|
|
^Duration timeout
|
|
cache metrics]
|
|
AutoCloseable
|
|
(close [_]
|
|
(ex/ignoring (.close conn)))
|
|
|
|
IConnection
|
|
(-set-timeout [_ timeout]
|
|
(.setTimeout conn ^Duration timeout))
|
|
|
|
(-reset-timeout [_]
|
|
(.setTimeout conn timeout))
|
|
|
|
(-get-timeout [_]
|
|
(.getTimeout conn))
|
|
|
|
IDefaultConnection
|
|
(-publish [_ topic message]
|
|
(.publish cmd ^String topic ^String message))
|
|
|
|
(-rpush [_ key elements]
|
|
(try
|
|
(let [vals (make-array String (count elements))]
|
|
(loop [i 0 xs (seq elements)]
|
|
(when xs
|
|
(aset ^"[[Ljava.lang.String;" vals i ^String (first xs))
|
|
(recur (inc i) (next xs))))
|
|
|
|
(.rpush cmd
|
|
^String key
|
|
^"[[Ljava.lang.String;" vals))
|
|
|
|
(catch RedisCommandInterruptedException cause
|
|
(throw (InterruptedException. (ex-message cause))))))
|
|
|
|
(-blpop [_ keys timeout]
|
|
(try
|
|
(let [keys (into-array String keys)]
|
|
(when-let [res (.blpop cmd
|
|
^double timeout
|
|
^"[Ljava.lang.String;" keys)]
|
|
(MapEntry/create
|
|
(.getKey ^KeyValue res)
|
|
(.getValue ^KeyValue res))))
|
|
(catch RedisCommandInterruptedException cause
|
|
(throw (InterruptedException. (ex-message cause))))))
|
|
|
|
(-get [_ key]
|
|
(assert (string? key) "key expected to be string")
|
|
(.get cmd ^String key))
|
|
|
|
(-set [_ key val args]
|
|
(.set cmd
|
|
^String key
|
|
^bytes val
|
|
^SetArgs args))
|
|
|
|
(-del [_ keys]
|
|
(let [keys (into-array String keys)]
|
|
(.del cmd ^String/1 keys)))
|
|
|
|
(-ping [_]
|
|
(.ping cmd))
|
|
|
|
(-eval [_ script]
|
|
(impl-eval cmd cache metrics script)))
|
|
|
|
|
|
(deftype SubscriptionConnection [^StatefulRedisPubSubConnection conn
|
|
^RedisPubSubCommands cmd
|
|
^Duration timeout]
|
|
AutoCloseable
|
|
(close [_]
|
|
(ex/ignoring (.close conn)))
|
|
|
|
IConnection
|
|
(-set-timeout [_ timeout]
|
|
(.setTimeout conn ^Duration timeout))
|
|
|
|
(-reset-timeout [_]
|
|
(.setTimeout conn timeout))
|
|
|
|
(-get-timeout [_]
|
|
(.getTimeout conn))
|
|
|
|
IPubSubConnection
|
|
(-add-listener [_ listener]
|
|
(.addListener conn ^RedisPubSubListener listener))
|
|
|
|
(-subscribe [_ topics]
|
|
(try
|
|
(let [topics (into-array String topics)]
|
|
(.subscribe cmd topics))
|
|
(catch RedisCommandInterruptedException cause
|
|
(throw (InterruptedException. (ex-message cause))))))
|
|
|
|
(-unsubscribe [_ topics]
|
|
(try
|
|
(let [topics (into-array String topics)]
|
|
(.unsubscribe cmd topics))
|
|
(catch RedisCommandInterruptedException cause
|
|
(throw (InterruptedException. (ex-message cause)))))))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
;; PUBLIC API
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(defn build-set-args
|
|
[options]
|
|
(reduce-kv (fn [^SetArgs args k v]
|
|
(case k
|
|
:ex (if (instance? Duration v)
|
|
(.ex args ^Duration v)
|
|
(.ex args (long v)))
|
|
:px (.px args (long v))
|
|
:nx (if v (.nx args) args)
|
|
:keep-ttl (if v (.keepttl args) args)))
|
|
(SetArgs.)
|
|
options))
|
|
|
|
(defn pubsub-listener
|
|
[& {:keys [on-message on-subscribe on-unsubscribe]}]
|
|
(reify RedisPubSubListener
|
|
(message [_ pattern topic message]
|
|
(when on-message
|
|
(on-message pattern topic message)))
|
|
|
|
(message [_ topic message]
|
|
(when on-message
|
|
(on-message nil topic message)))
|
|
|
|
(psubscribed [_ pattern count]
|
|
(when on-subscribe
|
|
(on-subscribe pattern nil count)))
|
|
|
|
(punsubscribed [_ pattern count]
|
|
(when on-unsubscribe
|
|
(on-unsubscribe pattern nil count)))
|
|
|
|
(subscribed [_ topic count]
|
|
(when on-subscribe
|
|
(on-subscribe nil topic count)))
|
|
|
|
(unsubscribed [_ topic count]
|
|
(when on-unsubscribe
|
|
(on-unsubscribe nil topic count)))))
|
|
|
|
(defn connect
|
|
[cfg & {:as options}]
|
|
(assert (contains? cfg ::mtx/metrics) "missing ::mtx/metrics on provided system")
|
|
(assert (contains? cfg ::client) "missing ::rds/client on provided system")
|
|
|
|
(let [state (::client cfg)
|
|
|
|
cache (::cache state)
|
|
client (::client state)
|
|
timeout (or (some-> (:timeout options) ct/duration)
|
|
(::timeout state))
|
|
|
|
conn (.connect ^RedisClient client
|
|
^RedisCodec default-codec)
|
|
cmd (.sync ^StatefulRedisConnection conn)]
|
|
|
|
(.setTimeout ^StatefulRedisConnection conn ^Duration timeout)
|
|
(->Connection conn cmd timeout cache (::mtx/metrics cfg))))
|
|
|
|
(defn connect-pubsub
|
|
[cfg & {:as options}]
|
|
(let [state (::client cfg)
|
|
client (::client state)
|
|
|
|
timeout (or (some-> (:timeout options) ct/duration)
|
|
(::timeout state))
|
|
conn (.connectPubSub ^RedisClient client
|
|
^RedisCodec default-codec)
|
|
cmd (.sync ^StatefulRedisPubSubConnection conn)]
|
|
|
|
|
|
(.setTimeout ^StatefulRedisPubSubConnection conn
|
|
^Duration timeout)
|
|
(->SubscriptionConnection conn cmd timeout)))
|
|
|
|
(defn get
|
|
[conn key]
|
|
(assert (string? key) "key must be string instance")
|
|
(try
|
|
(-get conn key)
|
|
(catch RedisCommandTimeoutException cause
|
|
(l/err :hint "timeout on get redis key" :key key :cause cause)
|
|
nil)))
|
|
|
|
(defn set
|
|
([conn key val]
|
|
(set conn key val nil))
|
|
([conn key val args]
|
|
(assert (string? key) "key must be string instance")
|
|
(assert (string? val) "val must be string instance")
|
|
(let [args (cond
|
|
(or (instance? SetArgs args)
|
|
(nil? args))
|
|
args
|
|
|
|
(map? args)
|
|
(build-set-args args)
|
|
|
|
:else
|
|
(throw (IllegalArgumentException. "invalid args")))]
|
|
|
|
(try
|
|
(-set conn key val args)
|
|
(catch RedisCommandTimeoutException cause
|
|
(l/err :hint "timeout on set redis key" :key key :cause cause)
|
|
nil)))))
|
|
|
|
(defn del
|
|
[conn key-or-keys]
|
|
(let [keys (if (vector? key-or-keys) key-or-keys [key-or-keys])]
|
|
(assert (every? string? keys) "only string keys allowed")
|
|
(try
|
|
(-del conn keys)
|
|
(catch RedisCommandTimeoutException cause
|
|
(l/err :hint "timeout on del redis key" :key key :cause cause)
|
|
nil))))
|
|
|
|
(defn ping
|
|
[conn]
|
|
(-ping conn))
|
|
|
|
(defn blpop
|
|
[conn key-or-keys timeout]
|
|
(let [keys (if (vector? key-or-keys) key-or-keys [key-or-keys])
|
|
timeout (cond
|
|
(ct/duration? timeout)
|
|
(/ (double (inst-ms timeout)) 1000.0)
|
|
|
|
(double? timeout)
|
|
timeout
|
|
|
|
(int? timeout)
|
|
(/ (double timeout) 1000.0)
|
|
|
|
:else
|
|
0)]
|
|
|
|
(assert (every? string? keys) "only string keys allowed")
|
|
(-blpop conn keys timeout)))
|
|
|
|
(defn rpush
|
|
[conn key elements]
|
|
(assert (string? key) "key must be string instance")
|
|
(assert (every? string? elements) "elements should be all strings")
|
|
(let [elements (vec elements)]
|
|
(-rpush conn key elements)))
|
|
|
|
(defn publish
|
|
[conn topic payload]
|
|
(assert (string? topic) "expected topic to be string")
|
|
(assert (string? payload) "expected message to be a byte array")
|
|
(-publish conn topic payload))
|
|
|
|
(def ^:private schema:script
|
|
[:map {:title "script"}
|
|
[::rscript/name qualified-keyword?]
|
|
[::rscript/path ::sm/text]
|
|
[::rscript/keys {:optional true} [:vector :any]]
|
|
[::rscript/vals {:optional true} [:vector :any]]])
|
|
|
|
(def ^:private valid-script?
|
|
(sm/lazy-validator schema:script))
|
|
|
|
(defn eval
|
|
[conn script]
|
|
(assert (valid-script? script) "expected valid script")
|
|
(-eval conn script))
|
|
|
|
(defn add-listener
|
|
[conn listener]
|
|
(let [listener (cond
|
|
(map? listener)
|
|
(pubsub-listener listener)
|
|
|
|
(instance? RedisPubSubListener listener)
|
|
listener
|
|
|
|
:else
|
|
(throw (IllegalArgumentException. "invalid listener provided")))]
|
|
|
|
(-add-listener conn listener)))
|
|
|
|
(defn subscribe
|
|
[conn topic-or-topics]
|
|
(let [topics (if (vector? topic-or-topics) topic-or-topics [topic-or-topics])]
|
|
(assert (every? string? topics))
|
|
(-subscribe conn topics)))
|
|
|
|
(defn unsubscribe
|
|
[conn topic-or-topics]
|
|
(let [topics (if (vector? topic-or-topics) topic-or-topics [topic-or-topics])]
|
|
(assert (every? string? topics))
|
|
(-unsubscribe conn topics)))
|
|
|
|
(defn set-timeout
|
|
[conn timeout]
|
|
(let [timeout (ct/duration timeout)]
|
|
(-set-timeout conn timeout)))
|
|
|
|
(defn get-timeout
|
|
[conn]
|
|
(-get-timeout conn))
|
|
|
|
(defn reset-timeout
|
|
[conn]
|
|
(-reset-timeout conn))
|
|
|
|
(defn timeout-exception?
|
|
[cause]
|
|
(instance? RedisCommandTimeoutException cause))
|
|
|
|
(defn exception?
|
|
[cause]
|
|
(instance? RedisException cause))
|
|
|
|
(defn get-pooled
|
|
[cfg]
|
|
(let [pool (::pool cfg)]
|
|
(gpool/get pool)))
|
|
|
|
(defn close
|
|
[o]
|
|
(.close ^AutoCloseable o))
|
|
|
|
(defn pool
|
|
[cfg & {:as options}]
|
|
(gpool/create :create-fn (partial connect cfg options)
|
|
:destroy-fn close
|
|
:dispose-fn -reset-timeout))
|
|
|
|
(defn run!
|
|
[cfg f & args]
|
|
(if (gpool/pool? cfg)
|
|
(apply f {::pool cfg} f args)
|
|
(let [pool (::pool cfg)]
|
|
(with-open [^AutoCloseable conn (gpool/get pool)]
|
|
(apply f (assoc cfg ::conn @conn) args)))))
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
;; INITIALIZATION
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
(defmethod ig/expand-key ::client
|
|
[k v]
|
|
{k (-> (d/without-nils v)
|
|
(assoc ::timeout (ct/duration "10s")))})
|
|
|
|
(def ^:private schema:client
|
|
[:map {:title "RedisClient"}
|
|
[::timer [:fn #(instance? HashedWheelTimer %)]]
|
|
[::cache ::sm/atom]
|
|
[::timeout ::ct/duration]
|
|
[::resources [:fn #(instance? DefaultClientResources %)]]])
|
|
|
|
(def check-client
|
|
(sm/check-fn schema:client))
|
|
|
|
(sm/register! ::client schema:client)
|
|
(sm/register!
|
|
{:type ::pool
|
|
:pred gpool/pool?})
|
|
|
|
(def ^:private schema:client-params
|
|
[:map {:title "redis-params"}
|
|
::wrk/netty-io-executor
|
|
::wrk/netty-executor
|
|
[::uri ::sm/uri]
|
|
[::timeout ::ct/duration]])
|
|
|
|
(def ^:private check-client-params
|
|
(sm/check-fn schema:client-params))
|
|
|
|
(defmethod ig/assert-key ::client
|
|
[_ params]
|
|
(check-client-params params))
|
|
|
|
(defmethod ig/init-key ::client
|
|
[_ {:keys [::uri ::wrk/netty-io-executor ::wrk/netty-executor] :as params}]
|
|
|
|
(l/inf :hint "initialize redis client" :uri (str uri))
|
|
|
|
(let [timer (HashedWheelTimer.)
|
|
cache (atom {})
|
|
|
|
resources (.. (DefaultClientResources/builder)
|
|
(eventExecutorGroup ^EventExecutorGroup netty-executor)
|
|
|
|
;; We provide lettuce with a shared event loop
|
|
;; group instance instead of letting lettuce to
|
|
;; create its own
|
|
(eventLoopGroupProvider
|
|
(reify io.lettuce.core.resource.EventLoopGroupProvider
|
|
(allocate [_ _] netty-io-executor)
|
|
(threadPoolSize [_]
|
|
(.executorCount ^NioEventLoopGroup netty-io-executor))
|
|
(release [_ _ _ _ _]
|
|
;; Do nothing
|
|
)
|
|
(shutdown [_ _ _ _]
|
|
;; Do nothing
|
|
)))
|
|
|
|
(timer ^Timer timer)
|
|
(build))
|
|
|
|
redis-uri (RedisURI/create ^String (str uri))
|
|
client (RedisClient/create ^ClientResources resources
|
|
^RedisURI redis-uri)]
|
|
|
|
{::client client
|
|
::cache cache
|
|
::timer timer
|
|
::timeout default-timeout
|
|
::resources resources}))
|
|
|
|
(defmethod ig/halt-key! ::client
|
|
[_ {:keys [::client ::timer ::resources]}]
|
|
(ex/ignoring (.shutdown ^RedisClient client))
|
|
(ex/ignoring (.shutdown ^ClientResources resources))
|
|
(ex/ignoring (.stop ^Timer timer)))
|
|
|
|
(defmethod ig/assert-key ::pool
|
|
[_ {:keys [::client]}]
|
|
(check-client client))
|
|
|
|
(defmethod ig/init-key ::pool
|
|
[_ cfg]
|
|
(pool cfg {:timeout (ct/duration 2000)}))
|
|
|
|
(defmethod ig/halt-key! ::pool
|
|
[_ instance]
|
|
(.close ^java.lang.AutoCloseable instance))
|