♻️ Rename ::wrk/netty-executor to ::wrk/executor with cached pool

Replace DefaultEventExecutorGroup (fixed Netty thread pool) with a
cached thread pool (px/cached-executor) for general async task
offloading. The cached pool creates threads on demand and reuses
idle ones, which is more appropriate for blocking I/O workloads
(shell commands, message bus, rate limiting, etc.).

Changes:
- Rename ::wrk/netty-executor to ::wrk/executor in worker/executor.clj
- Switch implementation from DefaultEventExecutorGroup to px/cached-executor
- Update all ig/ref wiring in main.clj (msgbus, tmp cleaner, climit, rlimit, rpc)
- Remove ::wrk/netty-executor from redis.clj (let lettuce create its own
  eventExecutorGroup instead of sharing a Netty executor)
- Assert executor is present in shell/exec! to prevent silent nil usage
- Remove executor-threads config (no longer needed for cached pool)

The ::wrk/netty-io-executor (NioEventLoopGroup) remains unchanged as it
handles actual non-blocking network I/O for Redis and S3.

Co-authored-by: mimo-v2.5-pro <mimo-v2.5-pro@penpot.app>
This commit is contained in:
Andrey Antukh 2026-06-16 16:35:57 +00:00
parent c5c90504c5
commit fa89624ae2
5 changed files with 51 additions and 72 deletions

View File

@ -241,7 +241,6 @@
[:assets-path {:optional true} :string]
[:netty-io-threads {:optional true} ::sm/int]
[:executor-threads {:optional true} ::sm/int]
[:nitrate-backend-uri {:optional true} ::sm/uri]

View File

@ -162,8 +162,8 @@
::wrk/netty-io-executor
{:threads (cf/get :netty-io-threads)}
::wrk/netty-executor
{:threads (cf/get :executor-threads)}
::wrk/executor
{}
:app.migrations/migrations
{::db/pool (ig/ref ::db/pool)}
@ -178,9 +178,6 @@
{::rds/uri
(cf/get :redis-uri)
::wrk/netty-executor
(ig/ref ::wrk/netty-executor)
::wrk/netty-io-executor
(ig/ref ::wrk/netty-io-executor)}
@ -189,12 +186,12 @@
::mtx/metrics (ig/ref ::mtx/metrics)}
::mbus/msgbus
{::wrk/executor (ig/ref ::wrk/netty-executor)
{::wrk/executor (ig/ref ::wrk/executor)
::rds/client (ig/ref ::rds/client)
::mtx/metrics (ig/ref ::mtx/metrics)}
:app.storage.tmp/cleaner
{::wrk/executor (ig/ref ::wrk/netty-executor)}
{::wrk/executor (ig/ref ::wrk/executor)}
::sto.gc-deleted/handler
{::db/pool (ig/ref ::db/pool)
@ -308,12 +305,12 @@
::rpc/climit
{::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/executor (ig/ref ::wrk/netty-executor)
::wrk/executor (ig/ref ::wrk/executor)
::climit/config (cf/get :rpc-climit-config)
::climit/enabled (contains? cf/flags :rpc-climit)}
:app.rpc/rlimit
{::wrk/executor (ig/ref ::wrk/netty-executor)
{::wrk/executor (ig/ref ::wrk/executor)
:app.loggers.mattermost/reporter
(ig/ref :app.loggers.mattermost/reporter)
@ -326,7 +323,7 @@
::db/pool (ig/ref ::db/pool)
::rds/pool (ig/ref ::rds/pool)
:app.nitrate/client (ig/ref :app.nitrate/client)
::wrk/executor (ig/ref ::wrk/netty-executor)
::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager)
::ldap/provider (ig/ref ::ldap/provider)
::sto/storage (ig/ref ::sto/storage)
@ -356,7 +353,7 @@
{::http.client/client (ig/ref ::http.client/client)
::db/pool (ig/ref ::db/pool)
::rds/pool (ig/ref ::rds/pool)
::wrk/executor (ig/ref ::wrk/netty-executor)
::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager)
::sto/storage (ig/ref ::sto/storage)
::mtx/metrics (ig/ref ::mtx/metrics)

View File

@ -43,7 +43,6 @@
io.lettuce.core.ScriptOutputType
io.lettuce.core.SetArgs
io.netty.channel.nio.NioEventLoopGroup
io.netty.util.concurrent.EventExecutorGroup
io.netty.util.HashedWheelTimer
io.netty.util.Timer
java.lang.AutoCloseable
@ -527,7 +526,6 @@
(def ^:private schema:client-params
[:map {:title "redis-params"}
::wrk/netty-io-executor
::wrk/netty-executor
[::uri ::sm/uri]
[::timeout ::ct/duration]])
@ -539,7 +537,7 @@
(check-client-params params))
(defmethod ig/init-key ::client
[_ {:keys [::uri ::wrk/netty-io-executor ::wrk/netty-executor] :as params}]
[_ {:keys [::uri ::wrk/netty-io-executor] :as params}]
(l/inf :hint "initialize redis client" :uri (str uri))
@ -547,7 +545,6 @@
cache (atom {})
resources (.. (DefaultClientResources/builder)
(eventExecutorGroup ^EventExecutorGroup netty-executor)
;; We provide lettuce with a shared event loop
;; group instance instead of letting lettuce to

View File

@ -50,37 +50,39 @@
(assert (vector? cmd) "a command parameter should be a vector")
(assert (every? string? cmd) "the command should be a vector of strings")
(let [executor (::wrk/executor system)
builder (ProcessBuilder. ^List cmd)
env-map (.environment ^ProcessBuilder builder)
_ (reduce-kv set-env env-map env)
process (.start builder)]
(let [executor (::wrk/executor system)]
(assert (some? executor) "executor is required, check ::wrk/executor")
(if in
(px/run! executor
(fn []
(with-open [^OutputStream stdin (.getOutputStream ^Process process)]
(io/write stdin in :encoding in-enc))))
(io/close (.getOutputStream ^Process process)))
(let [builder (ProcessBuilder. ^List cmd)
env-map (.environment ^ProcessBuilder builder)
_ (reduce-kv set-env env-map env)
process (.start builder)]
(with-open [stdout (.getInputStream ^Process process)
stderr (.getErrorStream ^Process process)]
(let [out (px/submit! executor (fn [] (try (read-with-enc stdout out-enc)
(catch java.io.IOException _ ""))))
err (px/submit! executor (fn [] (try (read-as-string stderr)
(catch java.io.IOException _ ""))))
ext (if timeout
(let [completed (.waitFor ^Process process (long timeout) TimeUnit/SECONDS)]
(if completed
(.exitValue ^Process process)
(do
(.destroyForcibly ^Process process)
(ex/raise :type :internal
:code :process-timeout
:hint (str "process timed out after " timeout " seconds")
:cmd cmd
:timeout timeout))))
(.waitFor ^Process process))]
{:exit ext
:out @out
:err @err}))))
(if in
(px/run! executor
(fn []
(with-open [^OutputStream stdin (.getOutputStream ^Process process)]
(io/write stdin in :encoding in-enc))))
(io/close (.getOutputStream ^Process process)))
(with-open [stdout (.getInputStream ^Process process)
stderr (.getErrorStream ^Process process)]
(let [out (px/submit! executor (fn [] (try (read-with-enc stdout out-enc)
(catch java.io.IOException _ ""))))
err (px/submit! executor (fn [] (try (read-as-string stderr)
(catch java.io.IOException _ ""))))
ext (if timeout
(let [completed (.waitFor ^Process process (long timeout) TimeUnit/SECONDS)]
(if completed
(.exitValue ^Process process)
(do
(.destroyForcibly ^Process process)
(ex/raise :type :internal
:code :process-timeout
:hint (str "process timed out after " timeout " seconds")
:cmd cmd
:timeout timeout))))
(.waitFor ^Process process))]
{:exit ext
:out @out
:err @err})))))

View File

@ -15,9 +15,7 @@
[promesa.exec :as px])
(:import
io.netty.channel.nio.NioEventLoopGroup
io.netty.util.concurrent.DefaultEventExecutorGroup
java.util.concurrent.ExecutorService
java.util.concurrent.ThreadFactory
java.util.concurrent.TimeUnit))
(set! *warn-on-reflection* true)
@ -36,13 +34,6 @@
{:title "executor"
:description "Instance of NioEventLoopGroup"}})
(sm/register!
{:type ::wrk/netty-executor
:pred #(instance? DefaultEventExecutorGroup %)
:type-properties
{:title "executor"
:description "Instance of DefaultEventExecutorGroup"}})
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; IO Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -58,7 +49,7 @@
nthreads (or threads (mth/round (/ (px/get-available-processors) 2)))
nthreads (max 2 nthreads)]
(l/inf :hint "start netty io executor" :threads nthreads)
(NioEventLoopGroup. (int nthreads) ^ThreadFactory factory)))
(NioEventLoopGroup. (int nthreads) ^java.util.concurrent.ThreadFactory factory)))
(defmethod ig/halt-key! ::wrk/netty-io-executor
[_ instance]
@ -68,22 +59,15 @@
TimeUnit/MILLISECONDS)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; IO Offload Executor
;; Executor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmethod ig/assert-key ::wrk/netty-executor
[_ {:keys [threads]}]
(assert (or (nil? threads) (int? threads))
"expected valid threads value, revisit PENPOT_EXEC_THREADS environment variable"))
(defmethod ig/init-key ::wrk/executor
[_ _]
(let [factory (px/thread-factory :prefix "penpot/exec/")]
(l/inf :hint "start cached executor")
(px/cached-executor :factory factory)))
(defmethod ig/init-key ::wrk/netty-executor
[_ {:keys [threads]}]
(let [factory (px/thread-factory :prefix "penpot/exec/")
nthreads (or threads (mth/round (/ (px/get-available-processors) 2)))
nthreads (max 2 nthreads)]
(l/inf :hint "start default executor" :threads nthreads)
(DefaultEventExecutorGroup. (int nthreads) ^ThreadFactory factory)))
(defmethod ig/halt-key! ::wrk/netty-executor
(defmethod ig/halt-key! ::wrk/executor
[_ instance]
(px/shutdown! instance))