penpot/frontend/src/app/main/repo.cljs
Andrey Antukh 6fa440cf92 🎉 Add chunked upload API for large media and binary files
Introduce a purpose-agnostic three-step session-based upload API that
allows uploading large binary blobs (media files and .penpot imports)
without hitting multipart size limits.

Backend:
- Migration 0147: new `upload_session` table (profile_id, total_chunks,
  created_at) with indexes on profile_id and created_at.
- Three new RPC commands in media.clj:
    * `create-upload-session`  – allocates a session row; enforces
      `upload-sessions-per-profile` and `upload-chunks-per-session`
      quota limits (configurable in config.clj, defaults 5 / 20).
    * `upload-chunk`           – stores each slice as a storage object;
      validates chunk index bounds and profile ownership.
    * `assemble-file-media-object` – reassembles chunks via the shared
      `assemble-chunks!` helper and creates the final media object.
- `assemble-chunks!` is a public helper in media.clj shared by both
  `assemble-file-media-object` and `import-binfile`.
- `import-binfile` (binfile.clj): accepts an optional `upload-id` param;
  when provided, materialises the temp file from chunks instead of
  expecting an inline multipart body, removing the 200 MiB body limit
  on .penpot imports.  Schema updated with an `:and` validator requiring
  either `:file` or `:upload-id`.
- quotes.clj: new `upload-sessions-per-profile` quota check.
- Background GC task (`tasks/upload_session_gc.clj`): deletes stalled
  (never-completed) sessions older than 1 hour; scheduled daily at
  midnight via the cron system in main.clj.
- backend/AGENTS.md: document the background-task wiring pattern.

Frontend:
- New `app.main.data.uploads` namespace: generic `upload-blob-chunked`
  helper drives steps 1–2 (create session + upload all chunks with a
  concurrency cap of 2) and emits `{:session-id uuid}` for callers.
- `config.cljs`: expose `upload-chunk-size` (default 25 MiB, overridable
  via `penpotUploadChunkSize` global).
- `workspace/media.cljs`: blobs ≥ chunk-size go through the chunked path
  (`upload-blob-chunked` → `assemble-file-media-object`); smaller blobs
  use the existing direct `upload-file-media-object` path.
  `handle-media-error` simplified; `on-error` callback removed.
- `worker/import.cljs`: new `import-blob-via-upload` helper replaces the
  inline multipart approach for both binfile-v1 and binfile-v3 imports.
- `repo.cljs`: `:upload-chunk` derived as a `::multipart-upload`;
  `form-data?` removed from `import-binfile` (JSON params only).

Tests:
- Backend (rpc_media_test.clj): happy path, idempotency, permission
  isolation, invalid media type, missing chunks, session-not-found,
  chunk-index out-of-range, and quota-limit scenarios.
- Frontend (uploads_test.cljs): session creation and chunk-count
  correctness for `upload-blob-chunked`.
- Frontend (workspace_media_test.cljs): direct-upload path for small
  blobs, chunked path for large blobs, and chunk-count correctness for
  `process-blobs`.
- `helpers/http.cljs`: shared fetch-mock helpers (`install-fetch-mock!`,
  `make-json-response`, `make-transit-response`, `url->cmd`).

Signed-off-by: Andrey Antukh <niwi@niwi.nz>
2026-04-16 19:43:57 +02:00

289 lines
10 KiB
Clojure
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.main.repo
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.logging :as log]
[app.common.time :as ct]
[app.common.transit :as t]
[app.common.uri :as u]
[app.config :as cf]
[app.main.data.event :as-alias ev]
[app.util.http :as http]
[app.util.sse :as sse]
[beicon.v2.core :as rx]
[cuerdas.core :as str]))
(log/set-level! :info)
;; -- Retry helpers -----------------------------------------------------------
(def ^:private retryable-types
"Set of error types that are considered transient and safe to retry
for idempotent (GET) requests."
#{:network ; js/fetch network-level failure
:bad-gateway ; 502
:service-unavailable ; 503
:offline}) ; status 0 (browser offline)
(defn retryable-error?
"Return true when `error` represents a transient failure that is safe
to retry. Only errors whose `ex-data` `:type` belongs to
`retryable-types` qualify."
[error]
(contains? retryable-types (:type (ex-data error))))
(def default-retry-config
"Default configuration for the retry mechanism on idempotent requests."
{:max-retries 3
:base-delay-ms 1000})
(defn with-retry
"Wrap `observable-fn` (a zero-arg function returning an Observable) so
that retryable errors are retried up to `:max-retries` times with
exponential back-off. Non-retryable errors propagate immediately.
Accepts an optional `config` map with:
:max-retries maximum number of retries (default 3)
:base-delay-ms base delay in ms; doubles each attempt (default 1000)"
([observable-fn]
(with-retry observable-fn default-retry-config))
([observable-fn config]
(with-retry observable-fn config 0))
([observable-fn config attempt]
(let [{:keys [max-retries base-delay-ms]} (merge default-retry-config config)]
(->> (observable-fn)
(rx/catch
(fn [cause]
(if (and (retryable-error? cause)
(< attempt max-retries))
;; bit-shift-left 1 N is equivalent to 2^N: shift the bits of the
;; number 1 to the left N positions (e.g. 1 -> 2 -> 4 -> 8 -> 16),
;; producing exponential backoff delays of 1x, 2x, 4x, 8x, 16x.
(let [delay-ms (* base-delay-ms (bit-shift-left 1 attempt))]
(log/wrn :hint "retrying request"
:attempt (inc attempt)
:delay delay-ms
:error (ex-message cause))
(->> (rx/timer delay-ms)
(rx/mapcat (fn [_] (with-retry observable-fn config (inc attempt))))))
(rx/throw cause))))))))
;; -- Response handling -------------------------------------------------------
(defn handle-response
[{:keys [status body headers uri] :as response}]
(cond
(= 204 status)
;; We need to send "something" so the streams listening downstream can act
(rx/of nil)
(= 502 status)
(rx/throw (ex-info "http error" {:type :bad-gateway}))
(= 503 status)
(rx/throw (ex-info "http error" {:type :service-unavailable}))
(= 0 (:status response))
(rx/throw (ex-info "http error" {:type :offline}))
(= 200 status)
(rx/of body)
(= 413 status)
(rx/throw (ex-info "http error"
{:type :validation
:code :request-body-too-large}))
(and (= status 403)
(or (= "cloudflare" (get headers "server"))
(= "challenge" (get headers "cf-mitigated"))))
(rx/throw (ex-info "http error"
{:type :authorization
:code :challenge-required}))
(and (>= status 400) (map? body))
(rx/throw (ex-info "http error" (assoc body :uri uri :status status)))
:else
(rx/throw
(ex/error :type :internal
:code :unable-to-process-repository-response
:hint "unable to process repository response"
:uri uri
:status status
:headers headers
:data body))))
(def default-options
{:update-file {:query-params [:id]}
:get-raw-file {:rename-to :get-file :raw-transit? true}
:create-file-object-thumbnail
{:query-params [:file-id :object-id :tag]
:form-data? true}
:create-file-thumbnail
{:query-params [:file-id :revn]
:form-data? true}
::sse/export-binfile
{:stream? true}
::sse/clone-template
{:stream? true}
::sse/import-binfile
{:stream? true}
::sse/permanently-delete-team-files
{:stream? true}
::sse/restore-deleted-team-files
{:stream? true}
:export-binfile {:response-type :blob}
:retrieve-list-of-builtin-templates {:query-params :all}})
(defn- send!
[id params options]
(let [{:keys [response-type
stream?
form-data?
raw-transit?
query-params
rename-to]}
(-> (get default-options id)
(merge options))
decode-fn
(if raw-transit?
http/conditional-error-decode-transit
http/conditional-decode-transit)
id (or rename-to id)
nid (name id)
method (cond
(= query-params :all) :get
(str/starts-with? nid "get-") :get
:else :post)
response-type
(d/nilv response-type :text)
request
{:method method
:uri (u/join cf/public-uri "api/main/methods/" nid)
:credentials "include"
:headers {"accept" "application/transit+json,text/event-stream,*/*"
"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:body (when (= method :post)
(if form-data?
(http/form-data params)
(http/transit-data params)))
:query (if (= method :get)
params
(if query-params
(select-keys params query-params)
nil))
:response-type
(if stream? nil response-type)}
tpoint
(ct/tpoint-ms)]
(log/trc :hint "make request" :id id)
(let [make-request
(fn []
(->> (http/fetch request)
(rx/map http/response->map)
(rx/mapcat (fn [{:keys [headers body] :as response}]
(log/trc :hint "response received" :id id :elapsed (tpoint))
(let [ctype (get headers "content-type")
response-stream? (str/starts-with? ctype "text/event-stream")
tpoint (ct/tpoint-ms)]
(when (and response-stream? (not stream?))
(ex/raise :type :assertion
:code :unexpected-response
:hint "expected normal response, received sse stream"
:uri (:uri response)
:status (:status response)))
(if response-stream?
(-> (sse/create-stream body)
(sse/read-stream t/decode-str))
(->> response
(http/process-response-type response-type)
(rx/map decode-fn)
(rx/tap (fn [_]
(log/trc :hint "response decoded" :id id :elapsed (tpoint))))
(rx/mapcat handle-response))))))))]
;; Idempotent (GET) requests are automatically retried on
;; transient network / server errors. Mutations are never
;; retried to avoid unintended side-effects.
(if (= :get method)
(with-retry make-request)
(make-request)))))
(defmulti cmd! (fn [id _] id))
(defmethod cmd! :default
[id params]
(send! id params nil))
(defmethod cmd! :login-with-oidc
[_ params]
(let [uri (u/join cf/public-uri "api/auth/oidc")]
(->> (http/send! {:method :post
:uri uri
:credentials "include"
:headers {"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:query params})
(rx/map http/conditional-decode-transit)
(rx/mapcat handle-response))))
(defn- send-export
[{:keys [blob?] :as params}]
(->> (http/send! {:method :post
:uri (u/join cf/public-uri "api/export")
:body (http/transit-data (dissoc params :blob?))
:headers {"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:credentials "include"
:response-type (if blob? :blob :text)})
(rx/map http/conditional-decode-transit)
(rx/mapcat handle-response)))
(defmethod cmd! :export
[_ params]
(let [default {:wait false :blob? false}]
(send-export (merge default params))))
(derive :upload-file-media-object ::multipart-upload)
(derive :upload-chunk ::multipart-upload)
(derive :update-profile-photo ::multipart-upload)
(derive :update-team-photo ::multipart-upload)
(defmethod cmd! ::multipart-upload
[id params]
(->> (http/send! {:method :post
:uri (u/join cf/public-uri "api/main/methods/" (name id))
:credentials "include"
:headers {"x-external-session-id" (cf/external-session-id)
"x-event-origin" (::ev/origin (meta params))}
:body (http/form-data params)})
(rx/map http/conditional-decode-transit)
(rx/mapcat handle-response)))