diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 3b04c92b49..7195988761 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -241,7 +241,6 @@ [:assets-path {:optional true} :string] [:netty-io-threads {:optional true} ::sm/int] - [:executor-threads {:optional true} ::sm/int] [:nitrate-backend-uri {:optional true} ::sm/uri] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index fe7522d817..00f840e63f 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -162,8 +162,8 @@ ::wrk/netty-io-executor {:threads (cf/get :netty-io-threads)} - ::wrk/netty-executor - {:threads (cf/get :executor-threads)} + ::wrk/executor + {} :app.migrations/migrations {::db/pool (ig/ref ::db/pool)} @@ -178,9 +178,6 @@ {::rds/uri (cf/get :redis-uri) - ::wrk/netty-executor - (ig/ref ::wrk/netty-executor) - ::wrk/netty-io-executor (ig/ref ::wrk/netty-io-executor)} @@ -189,12 +186,12 @@ ::mtx/metrics (ig/ref ::mtx/metrics)} ::mbus/msgbus - {::wrk/executor (ig/ref ::wrk/netty-executor) + {::wrk/executor (ig/ref ::wrk/executor) ::rds/client (ig/ref ::rds/client) ::mtx/metrics (ig/ref ::mtx/metrics)} :app.storage.tmp/cleaner - {::wrk/executor (ig/ref ::wrk/netty-executor)} + {::wrk/executor (ig/ref ::wrk/executor)} ::sto.gc-deleted/handler {::db/pool (ig/ref ::db/pool) @@ -308,12 +305,12 @@ ::rpc/climit {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/executor (ig/ref ::wrk/netty-executor) + ::wrk/executor (ig/ref ::wrk/executor) ::climit/config (cf/get :rpc-climit-config) ::climit/enabled (contains? cf/flags :rpc-climit)} :app.rpc/rlimit - {::wrk/executor (ig/ref ::wrk/netty-executor) + {::wrk/executor (ig/ref ::wrk/executor) :app.loggers.mattermost/reporter (ig/ref :app.loggers.mattermost/reporter) @@ -326,7 +323,7 @@ ::db/pool (ig/ref ::db/pool) ::rds/pool (ig/ref ::rds/pool) :app.nitrate/client (ig/ref :app.nitrate/client) - ::wrk/executor (ig/ref ::wrk/netty-executor) + ::wrk/executor (ig/ref ::wrk/executor) ::session/manager (ig/ref ::session/manager) ::ldap/provider (ig/ref ::ldap/provider) ::sto/storage (ig/ref ::sto/storage) @@ -356,7 +353,7 @@ {::http.client/client (ig/ref ::http.client/client) ::db/pool (ig/ref ::db/pool) ::rds/pool (ig/ref ::rds/pool) - ::wrk/executor (ig/ref ::wrk/netty-executor) + ::wrk/executor (ig/ref ::wrk/executor) ::session/manager (ig/ref ::session/manager) ::sto/storage (ig/ref ::sto/storage) ::mtx/metrics (ig/ref ::mtx/metrics) diff --git a/backend/src/app/redis.clj b/backend/src/app/redis.clj index 6abd23f5f3..be9e331dda 100644 --- a/backend/src/app/redis.clj +++ b/backend/src/app/redis.clj @@ -43,7 +43,6 @@ io.lettuce.core.ScriptOutputType io.lettuce.core.SetArgs io.netty.channel.nio.NioEventLoopGroup - io.netty.util.concurrent.EventExecutorGroup io.netty.util.HashedWheelTimer io.netty.util.Timer java.lang.AutoCloseable @@ -527,7 +526,6 @@ (def ^:private schema:client-params [:map {:title "redis-params"} ::wrk/netty-io-executor - ::wrk/netty-executor [::uri ::sm/uri] [::timeout ::ct/duration]]) @@ -539,7 +537,7 @@ (check-client-params params)) (defmethod ig/init-key ::client - [_ {:keys [::uri ::wrk/netty-io-executor ::wrk/netty-executor] :as params}] + [_ {:keys [::uri ::wrk/netty-io-executor] :as params}] (l/inf :hint "initialize redis client" :uri (str uri)) @@ -547,7 +545,6 @@ cache (atom {}) resources (.. (DefaultClientResources/builder) - (eventExecutorGroup ^EventExecutorGroup netty-executor) ;; We provide lettuce with a shared event loop ;; group instance instead of letting lettuce to diff --git a/backend/src/app/util/shell.clj b/backend/src/app/util/shell.clj index 3ceecca17d..9502208a7f 100644 --- a/backend/src/app/util/shell.clj +++ b/backend/src/app/util/shell.clj @@ -50,37 +50,39 @@ (assert (vector? cmd) "a command parameter should be a vector") (assert (every? string? cmd) "the command should be a vector of strings") - (let [executor (::wrk/executor system) - builder (ProcessBuilder. ^List cmd) - env-map (.environment ^ProcessBuilder builder) - _ (reduce-kv set-env env-map env) - process (.start builder)] + (let [executor (::wrk/executor system)] + (assert (some? executor) "executor is required, check ::wrk/executor") - (if in - (px/run! executor - (fn [] - (with-open [^OutputStream stdin (.getOutputStream ^Process process)] - (io/write stdin in :encoding in-enc)))) - (io/close (.getOutputStream ^Process process))) + (let [builder (ProcessBuilder. ^List cmd) + env-map (.environment ^ProcessBuilder builder) + _ (reduce-kv set-env env-map env) + process (.start builder)] - (with-open [stdout (.getInputStream ^Process process) - stderr (.getErrorStream ^Process process)] - (let [out (px/submit! executor (fn [] (try (read-with-enc stdout out-enc) - (catch java.io.IOException _ "")))) - err (px/submit! executor (fn [] (try (read-as-string stderr) - (catch java.io.IOException _ "")))) - ext (if timeout - (let [completed (.waitFor ^Process process (long timeout) TimeUnit/SECONDS)] - (if completed - (.exitValue ^Process process) - (do - (.destroyForcibly ^Process process) - (ex/raise :type :internal - :code :process-timeout - :hint (str "process timed out after " timeout " seconds") - :cmd cmd - :timeout timeout)))) - (.waitFor ^Process process))] - {:exit ext - :out @out - :err @err})))) + (if in + (px/run! executor + (fn [] + (with-open [^OutputStream stdin (.getOutputStream ^Process process)] + (io/write stdin in :encoding in-enc)))) + (io/close (.getOutputStream ^Process process))) + + (with-open [stdout (.getInputStream ^Process process) + stderr (.getErrorStream ^Process process)] + (let [out (px/submit! executor (fn [] (try (read-with-enc stdout out-enc) + (catch java.io.IOException _ "")))) + err (px/submit! executor (fn [] (try (read-as-string stderr) + (catch java.io.IOException _ "")))) + ext (if timeout + (let [completed (.waitFor ^Process process (long timeout) TimeUnit/SECONDS)] + (if completed + (.exitValue ^Process process) + (do + (.destroyForcibly ^Process process) + (ex/raise :type :internal + :code :process-timeout + :hint (str "process timed out after " timeout " seconds") + :cmd cmd + :timeout timeout)))) + (.waitFor ^Process process))] + {:exit ext + :out @out + :err @err}))))) diff --git a/backend/src/app/worker/executor.clj b/backend/src/app/worker/executor.clj index 008d0576a9..b536e2709e 100644 --- a/backend/src/app/worker/executor.clj +++ b/backend/src/app/worker/executor.clj @@ -15,9 +15,7 @@ [promesa.exec :as px]) (:import io.netty.channel.nio.NioEventLoopGroup - io.netty.util.concurrent.DefaultEventExecutorGroup java.util.concurrent.ExecutorService - java.util.concurrent.ThreadFactory java.util.concurrent.TimeUnit)) (set! *warn-on-reflection* true) @@ -36,13 +34,6 @@ {:title "executor" :description "Instance of NioEventLoopGroup"}}) -(sm/register! - {:type ::wrk/netty-executor - :pred #(instance? DefaultEventExecutorGroup %) - :type-properties - {:title "executor" - :description "Instance of DefaultEventExecutorGroup"}}) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; IO Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -58,7 +49,7 @@ nthreads (or threads (mth/round (/ (px/get-available-processors) 2))) nthreads (max 2 nthreads)] (l/inf :hint "start netty io executor" :threads nthreads) - (NioEventLoopGroup. (int nthreads) ^ThreadFactory factory))) + (NioEventLoopGroup. (int nthreads) ^java.util.concurrent.ThreadFactory factory))) (defmethod ig/halt-key! ::wrk/netty-io-executor [_ instance] @@ -68,22 +59,15 @@ TimeUnit/MILLISECONDS))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; IO Offload Executor +;; Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defmethod ig/assert-key ::wrk/netty-executor - [_ {:keys [threads]}] - (assert (or (nil? threads) (int? threads)) - "expected valid threads value, revisit PENPOT_EXEC_THREADS environment variable")) +(defmethod ig/init-key ::wrk/executor + [_ _] + (let [factory (px/thread-factory :prefix "penpot/exec/")] + (l/inf :hint "start cached executor") + (px/cached-executor :factory factory))) -(defmethod ig/init-key ::wrk/netty-executor - [_ {:keys [threads]}] - (let [factory (px/thread-factory :prefix "penpot/exec/") - nthreads (or threads (mth/round (/ (px/get-available-processors) 2))) - nthreads (max 2 nthreads)] - (l/inf :hint "start default executor" :threads nthreads) - (DefaultEventExecutorGroup. (int nthreads) ^ThreadFactory factory))) - -(defmethod ig/halt-key! ::wrk/netty-executor +(defmethod ig/halt-key! ::wrk/executor [_ instance] (px/shutdown! instance))