penpot/frontend/src/app/main/repo.cljs
Andrey Antukh 6eccffb8bb
🐛 Fix incorrect handlig of version restore operation (#9041)
- Add session ID tracking to RPC layer (backend and frontend)
- Send session ID header with RPC requests for request correlation
- Rename file-restore to file-restored for consistency
- Extract initialize-file function from initialize-workspace flow
- Improve file restoration initialization with wait-for-persistence
- Extract initialize-version event handler for version restoration
- Fix viewport key generation with file version numbers for proper re-renders
- Update layout item schema and constraints to use internal sizing state
- Add v-sizing state retrieval in layout-size-constraints component
- Refactor file-change notifications stream handling with rx/map
- Fix team-id lookup in restore-version-from-plugins

Improves request traceability across frontend/backend sessions and streamlines
the workspace initialization flow for file restoration scenarios.

Signed-off-by: Andrey Antukh <niwi@niwi.nz>
2026-04-21 19:19:51 +02:00

290 lines
10 KiB
Clojure
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.main.repo
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as log]
[app.common.time :as ct]
[app.common.transit :as t]
[app.common.uri :as u]
[app.config :as cf]
[app.main.data.event :as-alias ev]
[app.util.http :as http]
[app.util.sse :as sse]
[beicon.v2.core :as rx]
[cuerdas.core :as str]))
(log/set-level! :info)
;; -- Retry helpers -----------------------------------------------------------
(def ^:private retryable-types
"Set of error types that are considered transient and safe to retry
for idempotent (GET) requests."
#{:network ; js/fetch network-level failure
:bad-gateway ; 502
:service-unavailable ; 503
:offline}) ; status 0 (browser offline)
(defn retryable-error?
"Return true when `error` represents a transient failure that is safe
to retry. Only errors whose `ex-data` `:type` belongs to
`retryable-types` qualify."
[error]
(contains? retryable-types (:type (ex-data error))))
(def default-retry-config
"Default configuration for the retry mechanism on idempotent requests."
{:max-retries 3
:base-delay-ms 1000})
(defn with-retry
"Wrap `observable-fn` (a zero-arg function returning an Observable) so
that retryable errors are retried up to `:max-retries` times with
exponential back-off. Non-retryable errors propagate immediately.
Accepts an optional `config` map with:
:max-retries maximum number of retries (default 3)
:base-delay-ms base delay in ms; doubles each attempt (default 1000)"
([observable-fn]
(with-retry observable-fn default-retry-config))
([observable-fn config]
(with-retry observable-fn config 0))
([observable-fn config attempt]
(let [{:keys [max-retries base-delay-ms]} (merge default-retry-config config)]
(->> (observable-fn)
(rx/catch
(fn [cause]
(if (and (retryable-error? cause)
(< attempt max-retries))
;; bit-shift-left 1 N is equivalent to 2^N: shift the bits of the
;; number 1 to the left N positions (e.g. 1 -> 2 -> 4 -> 8 -> 16),
;; producing exponential backoff delays of 1x, 2x, 4x, 8x, 16x.
(let [delay-ms (* base-delay-ms (bit-shift-left 1 attempt))]
(log/wrn :hint "retrying request"
:attempt (inc attempt)
:delay delay-ms
:error (ex-message cause))
(->> (rx/timer delay-ms)
(rx/mapcat (fn [_] (with-retry observable-fn config (inc attempt))))))
(rx/throw cause))))))))
;; -- Response handling -------------------------------------------------------
(defn handle-response
[{:keys [status body headers uri] :as response}]
(cond
(= 204 status)
;; We need to send "something" so the streams listening downstream can act
(rx/of nil)
(= 502 status)
(rx/throw (ex-info "http error" {:type :bad-gateway}))
(= 503 status)
(rx/throw (ex-info "http error" {:type :service-unavailable}))
(= 0 (:status response))
(rx/throw (ex-info "http error" {:type :offline}))
(= 200 status)
(rx/of body)
(= 413 status)
(rx/throw (ex-info "http error"
{:type :validation
:code :request-body-too-large}))
(and (= status 403)
(or (= "cloudflare" (get headers "server"))
(= "challenge" (get headers "cf-mitigated"))))
(rx/throw (ex-info "http error"
{:type :authorization
:code :challenge-required}))
(and (>= status 400) (map? body))
(rx/throw (ex-info "http error" (assoc body :uri uri :status status)))
:else
(rx/throw
(ex/error :type :internal
:code :unable-to-process-repository-response
:hint "unable to process repository response"
:uri uri
:status status
:headers headers
:data body))))
(def default-options
{:update-file {:query-params [:id]}
:get-raw-file {:rename-to :get-file :raw-transit? true}
:create-file-object-thumbnail
{:query-params [:file-id :object-id :tag]
:form-data? true}
:create-file-thumbnail
{:query-params [:file-id :revn]
:form-data? true}
::sse/export-binfile
{:stream? true}
::sse/clone-template
{:stream? true}
::sse/import-binfile
{:stream? true}
::sse/permanently-delete-team-files
{:stream? true}
::sse/restore-deleted-team-files
{:stream? true}
:export-binfile {:response-type :blob}
:retrieve-list-of-builtin-templates {:query-params :all}})
(defn- send!
[id params options]
(let [{:keys [response-type
stream?
form-data?
raw-transit?
query-params
rename-to]}
(-> (get default-options id)
(merge options))
decode-fn
(if raw-transit?
http/conditional-error-decode-transit
http/conditional-decode-transit)
id (or rename-to id)
nid (name id)
method (cond
(= query-params :all) :get
(str/starts-with? nid "get-") :get
:else :post)
response-type
(d/nilv response-type :text)
request
{:method method
:uri (u/join cf/public-uri "api/main/methods/" nid)
:credentials "include"
:headers {"accept" "application/transit+json,text/event-stream,*/*"
"x-external-session-id" (cf/external-session-id)
"x-session-id" (str cf/session-id)
"x-event-origin" (::ev/origin (meta params))}
:body (when (= method :post)
(if form-data?
(http/form-data params)
(http/transit-data params)))
:query (if (= method :get)
params
(if query-params
(select-keys params query-params)
nil))
:response-type
(if stream? nil response-type)}
tpoint
(ct/tpoint-ms)]
(log/trc :hint "make request" :id id)
(let [make-request
(fn []
(->> (http/fetch request)
(rx/map http/response->map)
(rx/mapcat (fn [{:keys [headers body] :as response}]
(log/trc :hint "response received" :id id :elapsed (tpoint))
(let [ctype (get headers "content-type")
response-stream? (str/starts-with? ctype "text/event-stream")
tpoint (ct/tpoint-ms)]
(when (and response-stream? (not stream?))
(ex/raise :type :assertion
:code :unexpected-response
:hint "expected normal response, received sse stream"
:uri (:uri response)
:status (:status response)))
(if response-stream?
(-> (sse/create-stream body)
(sse/read-stream t/decode-str))
(->> response
(http/process-response-type response-type)
(rx/map decode-fn)
(rx/tap (fn [_]
(log/trc :hint "response decoded" :id id :elapsed (tpoint))))
(rx/mapcat handle-response))))))))]
;; Idempotent (GET) requests are automatically retried on
;; transient network / server errors. Mutations are never
;; retried to avoid unintended side-effects.
(if (= :get method)
(with-retry make-request)
(make-request)))))
(defmulti cmd! (fn [id _] id))
(defmethod cmd! :default
[id params]
(send! id params nil))
(defmethod cmd! :login-with-oidc
[_ params]
(let [uri (u/join cf/public-uri "api/auth/oidc")]
(->> (http/send! {:method :post
:uri uri
:credentials "include"
:headers {"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:query params})
(rx/map http/conditional-decode-transit)
(rx/mapcat handle-response))))
(defn- send-export
[{:keys [blob?] :as params}]
(->> (http/send! {:method :post
:uri (u/join cf/public-uri "api/export")
:body (http/transit-data (dissoc params :blob?))
:headers {"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:credentials "include"
:response-type (if blob? :blob :text)})
(rx/map http/conditional-decode-transit)
(rx/mapcat handle-response)))
(defmethod cmd! :export
[_ params]
(let [default {:wait false :blob? false}]
(send-export (merge default params))))
(derive :upload-file-media-object ::multipart-upload)
(derive :upload-chunk ::multipart-upload)
(derive :update-profile-photo ::multipart-upload)
(derive :update-team-photo ::multipart-upload)
(defmethod cmd! ::multipart-upload
[id params]
(->> (http/send! {:method :post
:uri (u/join cf/public-uri "api/main/methods/" (name id))
:credentials "include"
:headers {"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:body (http/form-data params)})
(rx/map http/conditional-decode-transit)
(rx/mapcat handle-response)))