;; This Source Code Form is subject to the terms of the Mozilla Public ;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; ;; Copyright (c) KALEIDOS INC (ns app.main.repo (:require [app.common.data :as d] [app.common.exceptions :as ex] [app.common.logging :as log] [app.common.time :as ct] [app.common.transit :as t] [app.common.uri :as u] [app.config :as cf] [app.main.data.event :as-alias ev] [app.util.http :as http] [app.util.sse :as sse] [beicon.v2.core :as rx] [cuerdas.core :as str])) (log/set-level! :info) ;; -- Retry helpers ----------------------------------------------------------- (def ^:private retryable-types "Set of error types that are considered transient and safe to retry for idempotent (GET) requests." #{:network ; js/fetch network-level failure :bad-gateway ; 502 :service-unavailable ; 503 :offline}) ; status 0 (browser offline) (defn retryable-error? "Return true when `error` represents a transient failure that is safe to retry. Only errors whose `ex-data` `:type` belongs to `retryable-types` qualify." [error] (contains? retryable-types (:type (ex-data error)))) (def default-retry-config "Default configuration for the retry mechanism on idempotent requests." {:max-retries 3 :base-delay-ms 1000}) (defn with-retry "Wrap `observable-fn` (a zero-arg function returning an Observable) so that retryable errors are retried up to `:max-retries` times with exponential back-off. Non-retryable errors propagate immediately. Accepts an optional `config` map with: :max-retries – maximum number of retries (default 3) :base-delay-ms – base delay in ms; doubles each attempt (default 1000)" ([observable-fn] (with-retry observable-fn default-retry-config)) ([observable-fn config] (with-retry observable-fn config 0)) ([observable-fn config attempt] (let [{:keys [max-retries base-delay-ms]} (merge default-retry-config config)] (->> (observable-fn) (rx/catch (fn [cause] (if (and (retryable-error? cause) (< attempt max-retries)) ;; bit-shift-left 1 N is equivalent to 2^N: shift the bits of the ;; number 1 to the left N positions (e.g. 1 -> 2 -> 4 -> 8 -> 16), ;; producing exponential backoff delays of 1x, 2x, 4x, 8x, 16x. (let [delay-ms (* base-delay-ms (bit-shift-left 1 attempt))] (log/wrn :hint "retrying request" :attempt (inc attempt) :delay delay-ms :error (ex-message cause)) (->> (rx/timer delay-ms) (rx/mapcat (fn [_] (with-retry observable-fn config (inc attempt)))))) (rx/throw cause)))))))) ;; -- Response handling ------------------------------------------------------- (defn handle-response [{:keys [status body headers uri] :as response}] (cond (= 204 status) ;; We need to send "something" so the streams listening downstream can act (rx/of nil) (= 502 status) (rx/throw (ex-info "http error" {:type :bad-gateway})) (= 503 status) (rx/throw (ex-info "http error" {:type :service-unavailable})) (= 0 (:status response)) (rx/throw (ex-info "http error" {:type :offline})) (= 200 status) (rx/of body) (= 413 status) (rx/throw (ex-info "http error" {:type :validation :code :request-body-too-large})) (and (= status 403) (or (= "cloudflare" (get headers "server")) (= "challenge" (get headers "cf-mitigated")))) (rx/throw (ex-info "http error" {:type :authorization :code :challenge-required})) (and (>= status 400) (map? body)) (rx/throw (ex-info "http error" (assoc body :uri uri :status status))) :else (rx/throw (ex/error :type :internal :code :unable-to-process-repository-response :hint "unable to process repository response" :uri uri :status status :headers headers :data body)))) (def default-options {:update-file {:query-params [:id]} :get-raw-file {:rename-to :get-file :raw-transit? true} :create-file-object-thumbnail {:query-params [:file-id :object-id :tag] :form-data? true} :create-file-thumbnail {:query-params [:file-id :revn] :form-data? true} ::sse/export-binfile {:stream? true} ::sse/clone-template {:stream? true} ::sse/import-binfile {:stream? true} ::sse/permanently-delete-team-files {:stream? true} ::sse/restore-deleted-team-files {:stream? true} :export-binfile {:response-type :blob} :retrieve-list-of-builtin-templates {:query-params :all}}) (defn- send! [id params options] (let [{:keys [response-type stream? form-data? raw-transit? query-params rename-to]} (-> (get default-options id) (merge options)) decode-fn (if raw-transit? http/conditional-error-decode-transit http/conditional-decode-transit) id (or rename-to id) nid (name id) method (cond (= query-params :all) :get (str/starts-with? nid "get-") :get :else :post) response-type (d/nilv response-type :text) request {:method method :uri (u/join cf/public-uri "api/main/methods/" nid) :credentials "include" :headers {"accept" "application/transit+json,text/event-stream,*/*" "x-external-session-id" (cf/external-session-id) "x-session-id" (str cf/session-id) "x-event-origin" (::ev/origin (meta params))} :body (when (= method :post) (if form-data? (http/form-data params) (http/transit-data params))) :query (if (= method :get) params (if query-params (select-keys params query-params) nil)) :response-type (if stream? nil response-type)} tpoint (ct/tpoint-ms)] (log/trc :hint "make request" :id id) (let [make-request (fn [] (->> (http/fetch request) (rx/map http/response->map) (rx/mapcat (fn [{:keys [headers body] :as response}] (log/trc :hint "response received" :id id :elapsed (tpoint)) (let [ctype (get headers "content-type") response-stream? (str/starts-with? ctype "text/event-stream") tpoint (ct/tpoint-ms)] (when (and response-stream? (not stream?)) (ex/raise :type :assertion :code :unexpected-response :hint "expected normal response, received sse stream" :uri (:uri response) :status (:status response))) (if response-stream? (-> (sse/create-stream body) (sse/read-stream t/decode-str)) (->> response (http/process-response-type response-type) (rx/map decode-fn) (rx/tap (fn [_] (log/trc :hint "response decoded" :id id :elapsed (tpoint)))) (rx/mapcat handle-response))))))))] ;; Idempotent (GET) requests are automatically retried on ;; transient network / server errors. Mutations are never ;; retried to avoid unintended side-effects. (if (= :get method) (with-retry make-request) (make-request))))) (defmulti cmd! (fn [id _] id)) (defmethod cmd! :default [id params] (send! id params nil)) (defmethod cmd! :login-with-oidc [_ params] (let [uri (u/join cf/public-uri "api/auth/oidc")] (->> (http/send! {:method :post :uri uri :credentials "include" :headers {"x-external-session-id" (cf/external-session-id) "x-event-origin" (::ev/origin (meta params))} :query params}) (rx/map http/conditional-decode-transit) (rx/mapcat handle-response)))) (defn- send-export [{:keys [blob?] :as params}] (->> (http/send! {:method :post :uri (u/join cf/public-uri "api/export") :body (http/transit-data (dissoc params :blob?)) :headers {"x-external-session-id" (cf/external-session-id) "x-event-origin" (::ev/origin (meta params))} :credentials "include" :response-type (if blob? :blob :text)}) (rx/map http/conditional-decode-transit) (rx/mapcat handle-response))) (defmethod cmd! :export [_ params] (let [default {:wait false :blob? false}] (send-export (merge default params)))) (derive :upload-file-media-object ::multipart-upload) (derive :upload-chunk ::multipart-upload) (derive :update-profile-photo ::multipart-upload) (derive :update-team-photo ::multipart-upload) (defmethod cmd! ::multipart-upload [id params] (->> (http/send! {:method :post :uri (u/join cf/public-uri "api/main/methods/" (name id)) :credentials "include" :headers {"x-external-session-id" (cf/external-session-id) "x-event-origin" (::ev/origin (meta params))} :body (http/form-data params)}) (rx/map http/conditional-decode-transit) (rx/mapcat handle-response)))