diff --git a/backend/AGENTS.md b/backend/AGENTS.md index b4ac2ac1dd..913540f808 100644 --- a/backend/AGENTS.md +++ b/backend/AGENTS.md @@ -83,7 +83,52 @@ are config maps with `::ig/ref` for dependencies. Components implement `ig/init-key` / `ig/halt-key!`. -### Database Access +### Connecting to the Database + +Two PostgreSQL databases are used in this environment: + +| Database | Purpose | Connection string | +|---------------|--------------------|----------------------------------------------------| +| `penpot` | Development / app | `postgresql://penpot:penpot@postgres/penpot` | +| `penpot_test` | Test suite | `postgresql://penpot:penpot@postgres/penpot_test` | + +**Interactive psql session:** + +```bash +# development DB +psql "postgresql://penpot:penpot@postgres/penpot" + +# test DB +psql "postgresql://penpot:penpot@postgres/penpot_test" +``` + +**One-shot query (non-interactive):** + +```bash +psql "postgresql://penpot:penpot@postgres/penpot" -c "SELECT id, name FROM team LIMIT 5;" +``` + +**Useful psql meta-commands:** + +``` +\dt -- list all tables +\d -- describe a table (columns, types, constraints) +\di -- list indexes +\q -- quit +``` + +> **Migrations table:** Applied migrations are tracked in the `migrations` table +> with columns `module`, `step`, and `created_at`. When renaming a migration +> logical name, update this table in both databases to match the new name; +> otherwise the runner will attempt to re-apply the migration on next startup. + +```bash +# Example: fix a renamed migration entry in the test DB +psql "postgresql://penpot:penpot@postgres/penpot_test" \ + -c "UPDATE migrations SET step = 'new-name' WHERE step = 'old-name';" +``` + +### Database Access (Clojure) `app.db` wraps next.jdbc. Queries use a SQL builder that auto-converts kebab-case ↔ snake_case. @@ -146,3 +191,69 @@ optimized implementations: `src/app/config.clj` reads `PENPOT_*` environment variables, validated with Malli. Access anywhere via `(cf/get :smtp-host)`. Feature flags: `(cf/flags :enable-smtp)`. + + +### Background Tasks + +Background tasks live in `src/app/tasks/`. Each task is an Integrant component +that exposes a `::handler` key and follows this three-method pattern: + +```clojure +(defmethod ig/assert-key ::handler ;; validate config at startup + [_ params] + (assert (db/pool? (::db/pool params)) "expected a valid database pool")) + +(defmethod ig/expand-key ::handler ;; inject defaults before init + [k v] + {k (assoc v ::my-option default-value)}) + +(defmethod ig/init-key ::handler ;; return the task fn + [_ cfg] + (fn [_task] ;; receives the task row from the worker + (db/tx-run! cfg (fn [{:keys [::db/conn]}] + ;; … do work … + )))) +``` + +**Wiring a new task** requires two changes in `src/app/main.clj`: + +1. **Handler config** – add an entry in `system-config` with the dependencies: + +```clojure +:app.tasks.my-task/handler +{::db/pool (ig/ref ::db/pool)} +``` + +2. **Registry + cron** – register the handler name and schedule it: + +```clojure +;; in ::wrk/registry ::wrk/tasks map: +:my-task (ig/ref :app.tasks.my-task/handler) + +;; in worker-config ::wrk/cron ::wrk/entries vector: +{:cron #penpot/cron "0 0 0 * * ?" ;; daily at midnight + :task :my-task} +``` + +**Useful cron patterns** (Quartz format — six fields: s m h dom mon dow): + +| Expression | Meaning | +|------------------------------|--------------------| +| `"0 0 0 * * ?"` | Daily at midnight | +| `"0 0 */6 * * ?"` | Every 6 hours | +| `"0 */5 * * * ?"` | Every 5 minutes | + +**Time helpers** (`app.common.time`): + +```clojure +(ct/now) ;; current instant +(ct/duration {:hours 1}) ;; java.time.Duration +(ct/minus (ct/now) some-duration) ;; subtract duration from instant +``` + +`db/interval` converts a `Duration` (or millis / string) to a PostgreSQL +interval object suitable for use in SQL queries: + +```clojure +(db/interval (ct/duration {:hours 1})) ;; → PGInterval "3600.0 seconds" +``` diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index e9462e2c85..246d440c73 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -82,7 +82,10 @@ :initial-project-skey "initial-project" ;; time to avoid email sending after profile modification - :email-verify-threshold "15m"}) + :email-verify-threshold "15m" + + :quotes-upload-sessions-per-profile 5 + :quotes-upload-chunks-per-session 20}) (def schema:config (do #_sm/optional-keys @@ -154,6 +157,8 @@ [:quotes-snapshots-per-team {:optional true} ::sm/int] [:quotes-team-access-requests-per-team {:optional true} ::sm/int] [:quotes-team-access-requests-per-requester {:optional true} ::sm/int] + [:quotes-upload-sessions-per-profile {:optional true} ::sm/int] + [:quotes-upload-chunks-per-session {:optional true} ::sm/int] [:auth-token-cookie-name {:optional true} :string] [:auth-token-cookie-max-age {:optional true} ::ct/duration] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 383578531e..a1501e3ca0 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -388,6 +388,7 @@ :offload-file-data (ig/ref :app.tasks.offload-file-data/handler) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) + :upload-session-gc (ig/ref :app.tasks.upload-session-gc/handler) :storage-gc-deleted (ig/ref ::sto.gc-deleted/handler) :storage-gc-touched (ig/ref ::sto.gc-touched/handler) :session-gc (ig/ref ::session.tasks/gc) @@ -423,6 +424,9 @@ :app.tasks.tasks-gc/handler {::db/pool (ig/ref ::db/pool)} + :app.tasks.upload-session-gc/handler + {::db/pool (ig/ref ::db/pool)} + :app.tasks.objects-gc/handler {::db/pool (ig/ref ::db/pool) ::sto/storage (ig/ref ::sto/storage)} @@ -544,6 +548,9 @@ {:cron #penpot/cron "0 0 0 * * ?" ;; daily :task :tasks-gc} + {:cron #penpot/cron "0 0 0 * * ?" ;; daily + :task :upload-session-gc} + {:cron #penpot/cron "0 0 2 * * ?" ;; daily :task :file-gc-scheduler} diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 3464902128..be9dc4bace 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -469,7 +469,10 @@ :fn (mg/resource "app/migrations/sql/0145-mod-audit-log-table.sql")} {:name "0146-mod-access-token-table" - :fn (mg/resource "app/migrations/sql/0146-mod-access-token-table.sql")}]) + :fn (mg/resource "app/migrations/sql/0146-mod-access-token-table.sql")} + + {:name "0147-add-upload-session-table" + :fn (mg/resource "app/migrations/sql/0147-add-upload-session-table.sql")}]) (defn apply-migrations! [pool name migrations] diff --git a/backend/src/app/migrations/sql/0147-add-upload-session-table.sql b/backend/src/app/migrations/sql/0147-add-upload-session-table.sql new file mode 100644 index 0000000000..eda1964785 --- /dev/null +++ b/backend/src/app/migrations/sql/0147-add-upload-session-table.sql @@ -0,0 +1,14 @@ +CREATE TABLE upload_session ( + id uuid PRIMARY KEY, + + created_at timestamptz NOT NULL DEFAULT now(), + + profile_id uuid NOT NULL REFERENCES profile(id) ON DELETE CASCADE, + total_chunks integer NOT NULL +); + +CREATE INDEX upload_session__profile_id__idx + ON upload_session(profile_id); + +CREATE INDEX upload_session__created_at__idx + ON upload_session(created_at); diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index 743303c6a2..adb942bec8 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -22,6 +22,7 @@ [app.media :as media] [app.rpc :as-alias rpc] [app.rpc.commands.files :as files] + [app.rpc.commands.media :as media-cmd] [app.rpc.commands.projects :as projects] [app.rpc.commands.teams :as teams] [app.rpc.doc :as-alias doc] @@ -80,20 +81,33 @@ ;; --- Command: import-binfile (defn- import-binfile - [{:keys [::db/pool] :as cfg} {:keys [profile-id project-id version name file]}] - (let [team (teams/get-team pool - :profile-id profile-id - :project-id project-id) - cfg (-> cfg - (assoc ::bfc/features (cfeat/get-team-enabled-features cf/flags team)) - (assoc ::bfc/project-id project-id) - (assoc ::bfc/profile-id profile-id) - (assoc ::bfc/name name) - (assoc ::bfc/input (:path file))) + [{:keys [::db/pool] :as cfg} {:keys [profile-id project-id version name file upload-id]}] + (let [team + (teams/get-team pool + :profile-id profile-id + :project-id project-id) - result (case (int version) - 1 (bf.v1/import-files! cfg) - 3 (bf.v3/import-files! cfg))] + cfg + (-> cfg + (assoc ::bfc/features (cfeat/get-team-enabled-features cf/flags team)) + (assoc ::bfc/project-id project-id) + (assoc ::bfc/profile-id profile-id) + (assoc ::bfc/name name)) + + input-path (:path file) + owned? (some? upload-id) + + cfg + (assoc cfg ::bfc/input input-path) + + result + (try + (case (int version) + 1 (bf.v1/import-files! cfg) + 3 (bf.v3/import-files! cfg)) + (finally + (when owned? + (fs/delete input-path))))] (db/update! pool :project {:modified-at (ct/now)} @@ -103,13 +117,18 @@ result)) (def ^:private schema:import-binfile - [:map {:title "import-binfile"} - [:name [:or [:string {:max 250}] - [:map-of ::sm/uuid [:string {:max 250}]]]] - [:project-id ::sm/uuid] - [:file-id {:optional true} ::sm/uuid] - [:version {:optional true} ::sm/int] - [:file media/schema:upload]]) + [:and + [:map {:title "import-binfile"} + [:name [:or [:string {:max 250}] + [:map-of ::sm/uuid [:string {:max 250}]]]] + [:project-id ::sm/uuid] + [:file-id {:optional true} ::sm/uuid] + [:version {:optional true} ::sm/int] + [:file {:optional true} media/schema:upload] + [:upload-id {:optional true} ::sm/uuid]] + [:fn {:error/message "one of :file or :upload-id is required"} + (fn [{:keys [file upload-id]}] + (or (some? file) (some? upload-id)))]]) (sv/defmethod ::import-binfile "Import a penpot file in a binary format. If `file-id` is provided, @@ -117,28 +136,40 @@ The in-place imports are only supported for binfile-v3 and when a .penpot file only contains one penpot file. + + The file content may be provided either as a multipart `file` upload + or as an `upload-id` referencing a completed chunked-upload session, + which allows importing files larger than the multipart size limit. " {::doc/added "1.15" ::doc/changes ["1.20" "Add file-id param for in-place import" - "1.20" "Set default version to 3"] + "1.20" "Set default version to 3" + "2.15" "Add upload-id param for chunked upload support"] ::webhooks/event? true ::sse/stream? true ::sm/params schema:import-binfile} - [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id project-id version file-id file] :as params}] + [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id project-id version file-id upload-id] :as params}] (projects/check-edition-permissions! pool profile-id project-id) - (let [version (or version 3) - params (-> params - (assoc :profile-id profile-id) - (assoc :version version)) + (let [version (or version 3) + params (-> params + (assoc :profile-id profile-id) + (assoc :version version)) - cfg (cond-> cfg - (uuid? file-id) - (assoc ::bfc/file-id file-id)) + cfg (cond-> cfg + (uuid? file-id) + (assoc ::bfc/file-id file-id)) - manifest (case (int version) - 1 nil - 3 (bf.v3/get-manifest (:path file)))] + params + (if (some? upload-id) + (let [file (db/tx-run! cfg media-cmd/assemble-chunks upload-id)] + (assoc params :file file)) + params) + + manifest + (case (int version) + 1 nil + 3 (bf.v3/get-manifest (-> params :file :path)))] (with-meta (sse/response (partial import-binfile cfg params)) diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index 80e49b6366..5bea17d379 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -7,9 +7,11 @@ (ns app.rpc.commands.media (:require [app.common.data :as d] + [app.common.exceptions :as ex] [app.common.schema :as sm] [app.common.time :as ct] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.loggers.audit :as-alias audit] [app.media :as media] @@ -17,8 +19,13 @@ [app.rpc.climit :as climit] [app.rpc.commands.files :as files] [app.rpc.doc :as-alias doc] + [app.rpc.quotes :as quotes] [app.storage :as sto] - [app.util.services :as sv])) + [app.storage.tmp :as tmp] + [app.util.services :as sv] + [datoteka.io :as io]) + (:import + java.io.OutputStream)) (def thumbnail-options {:width 100 @@ -236,3 +243,182 @@ :width (:width mobj) :height (:height mobj) :mtype (:mtype mobj)}))) + +;; --- Chunked Upload: Create an upload session + +(def ^:private schema:create-upload-session + [:map {:title "create-upload-session"} + [:total-chunks ::sm/int]]) + +(def ^:private schema:create-upload-session-result + [:map {:title "create-upload-session-result"} + [:session-id ::sm/uuid]]) + +(sv/defmethod ::create-upload-session + {::doc/added "2.16" + ::sm/params schema:create-upload-session + ::sm/result schema:create-upload-session-result} + [{:keys [::db/pool] :as cfg} + {:keys [::rpc/profile-id total-chunks]}] + + (let [max-chunks (cf/get :quotes-upload-chunks-per-session)] + (when (> total-chunks max-chunks) + (ex/raise :type :restriction + :code :max-quote-reached + :target "upload-chunks-per-session" + :quote max-chunks + :count total-chunks))) + + (quotes/check! cfg {::quotes/id ::quotes/upload-sessions-per-profile + ::quotes/profile-id profile-id}) + + (let [session-id (uuid/next)] + (db/insert! pool :upload-session + {:id session-id + :profile-id profile-id + :total-chunks total-chunks}) + {:session-id session-id})) + +;; --- Chunked Upload: Upload a single chunk + +(def ^:private schema:upload-chunk + [:map {:title "upload-chunk"} + [:session-id ::sm/uuid] + [:index ::sm/int] + [:content media/schema:upload]]) + +(def ^:private schema:upload-chunk-result + [:map {:title "upload-chunk-result"} + [:session-id ::sm/uuid] + [:index ::sm/int]]) + +(sv/defmethod ::upload-chunk + {::doc/added "2.16" + ::sm/params schema:upload-chunk + ::sm/result schema:upload-chunk-result} + [{:keys [::db/pool] :as cfg} + {:keys [::rpc/profile-id session-id index content] :as _params}] + (let [session (db/get pool :upload-session {:id session-id :profile-id profile-id})] + (when (or (neg? index) (>= index (:total-chunks session))) + (ex/raise :type :validation + :code :invalid-chunk-index + :hint "chunk index is out of range for this session" + :session-id session-id + :total-chunks (:total-chunks session) + :index index))) + + (let [storage (sto/resolve cfg) + data (sto/content (:path content))] + (sto/put-object! storage + {::sto/content data + ::sto/deduplicate? false + ::sto/touch true + :content-type (:mtype content) + :bucket "tempfile" + :upload-id (str session-id) + :chunk-index index})) + + {:session-id session-id + :index index}) + +;; --- Chunked Upload: shared helpers + +(def ^:private sql:get-upload-chunks + "SELECT id, size, (metadata->>'~:chunk-index')::integer AS chunk_index + FROM storage_object + WHERE (metadata->>'~:upload-id') = ?::text + AND deleted_at IS NULL + ORDER BY (metadata->>'~:chunk-index')::integer ASC") + +(defn- get-upload-chunks + [conn session-id] + (db/exec! conn [sql:get-upload-chunks (str session-id)])) + +(defn- concat-chunks + "Reads all chunk storage objects in order and writes them to a single + temporary file on the local filesystem. Returns a path to that file." + [storage chunks] + (let [tmp (tmp/tempfile :prefix "penpot.chunked-upload.")] + (with-open [^OutputStream out (io/output-stream tmp)] + (doseq [{:keys [id]} chunks] + (let [sobj (sto/get-object storage id) + bytes (sto/get-object-bytes storage sobj)] + (.write out ^bytes bytes)))) + tmp)) + +(defn assemble-chunks + "Validates that all expected chunks are present for `session-id` and + concatenates them into a single temporary file. Returns a map + conforming to `media/schema:upload` with `:filename`, `:path` and + `:size`. + + Raises a :validation/:missing-chunks error when the number of stored + chunks does not match `:total-chunks` recorded in the session row. + Deletes the session row from `upload_session` on success." + [{:keys [::db/conn] :as cfg} session-id] + (let [session (db/get conn :upload-session {:id session-id}) + chunks (get-upload-chunks conn session-id)] + + (when (not= (count chunks) (:total-chunks session)) + (ex/raise :type :validation + :code :missing-chunks + :hint "number of stored chunks does not match expected total" + :session-id session-id + :expected (:total-chunks session) + :found (count chunks))) + + (let [storage (sto/resolve cfg ::db/reuse-conn true) + path (concat-chunks storage chunks) + size (reduce #(+ %1 (:size %2)) 0 chunks)] + + (db/delete! conn :upload-session {:id session-id}) + + {:filename "upload" + :path path + :size size}))) + +;; --- Chunked Upload: Assemble all chunks into a final media object + +(def ^:private schema:assemble-file-media-object + [:map {:title "assemble-file-media-object"} + [:session-id ::sm/uuid] + [:file-id ::sm/uuid] + [:is-local ::sm/boolean] + [:name [:string {:max 250}]] + [:mtype :string] + [:id {:optional true} ::sm/uuid]]) + +(sv/defmethod ::assemble-file-media-object + {::doc/added "2.16" + ::sm/params schema:assemble-file-media-object + ::climit/id [[:process-image/by-profile ::rpc/profile-id] + [:process-image/global]]} + [{:keys [::db/pool] :as cfg} + {:keys [::rpc/profile-id session-id file-id is-local name mtype id] :as params}] + (files/check-edition-permissions! pool profile-id file-id) + + (db/tx-run! cfg + (fn [{:keys [::db/conn] :as cfg}] + (let [{:keys [path size]} (assemble-chunks cfg session-id) + content {:filename "upload" + :size size + :path path + :mtype mtype} + _ (media/validate-media-type! content) + mobj (create-file-media-object cfg (assoc params + :id (or id (uuid/next)) + :content content))] + + (db/update! conn :file + {:modified-at (ct/now) + :has-media-trimmed false} + {:id file-id} + {::db/return-keys false}) + + (with-meta mobj + {::audit/replace-props + {:name name + :file-id file-id + :is-local is-local + :mtype mtype}}))))) + diff --git a/backend/src/app/rpc/quotes.clj b/backend/src/app/rpc/quotes.clj index d5903744b3..1fe00e62d9 100644 --- a/backend/src/app/rpc/quotes.clj +++ b/backend/src/app/rpc/quotes.clj @@ -522,6 +522,30 @@ (assoc ::count-sql [sql:get-team-access-requests-per-requester profile-id]) (generic-check!))) +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; QUOTE: UPLOAD-SESSIONS-PER-PROFILE +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def ^:private schema:upload-sessions-per-profile + [:map [::profile-id ::sm/uuid]]) + +(def ^:private valid-upload-sessions-per-profile-quote? + (sm/lazy-validator schema:upload-sessions-per-profile)) + +(def ^:private sql:get-upload-sessions-per-profile + "SELECT count(*) AS total + FROM upload_session + WHERE profile_id = ?") + +(defmethod check-quote ::upload-sessions-per-profile + [{:keys [::profile-id ::target] :as quote}] + (assert (valid-upload-sessions-per-profile-quote? quote) "invalid quote parameters") + (-> quote + (assoc ::default (cf/get :quotes-upload-sessions-per-profile Integer/MAX_VALUE)) + (assoc ::quote-sql [sql:get-quotes-1 target profile-id]) + (assoc ::count-sql [sql:get-upload-sessions-per-profile profile-id]) + (generic-check!))) + ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; QUOTE: DEFAULT ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/backend/src/app/storage/gc_touched.clj b/backend/src/app/storage/gc_touched.clj index 33158600c2..f00140d04e 100644 --- a/backend/src/app/storage/gc_touched.clj +++ b/backend/src/app/storage/gc_touched.clj @@ -149,7 +149,7 @@ :status "delete" :bucket bucket) (recur to-freeze (conj to-delete id) (rest objects)))) - (let [deletion-delay (if (= bucket "tempfile") + (let [deletion-delay (if (= "tempfile" bucket) (ct/duration {:hours 2}) (cf/get-deletion-delay))] (some->> (seq to-freeze) (mark-freeze-in-bulk! conn)) diff --git a/backend/src/app/tasks/upload_session_gc.clj b/backend/src/app/tasks/upload_session_gc.clj new file mode 100644 index 0000000000..c733bbd64e --- /dev/null +++ b/backend/src/app/tasks/upload_session_gc.clj @@ -0,0 +1,41 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.tasks.upload-session-gc + "A maintenance task that deletes stalled (incomplete) upload sessions. + + An upload session is considered stalled when it was created more than + `max-age` ago without being completed (i.e. the session row still + exists because `assemble-chunks` was never called to clean it up). + The default max-age is 1 hour." + (:require + [app.common.logging :as l] + [app.common.time :as ct] + [app.db :as db] + [integrant.core :as ig])) + +(def ^:private sql:delete-stalled-sessions + "DELETE FROM upload_session + WHERE created_at < ?::timestamptz") + +(defmethod ig/assert-key ::handler + [_ params] + (assert (db/pool? (::db/pool params)) "expected a valid database pool")) + +(defmethod ig/expand-key ::handler + [k v] + {k (merge {::max-age (ct/duration {:hours 1})} v)}) + +(defmethod ig/init-key ::handler + [_ {:keys [::max-age] :as cfg}] + (fn [_] + (db/tx-run! cfg + (fn [{:keys [::db/conn]}] + (let [threshold (ct/minus (ct/now) max-age) + result (-> (db/exec-one! conn [sql:delete-stalled-sessions threshold]) + (db/get-update-count))] + (l/debug :hint "task finished" :deleted result) + {:deleted result}))))) diff --git a/backend/test/backend_tests/rpc_media_test.clj b/backend/test/backend_tests/rpc_media_test.clj index 79df6d38b4..070a105a1b 100644 --- a/backend/test/backend_tests/rpc_media_test.clj +++ b/backend/test/backend_tests/rpc_media_test.clj @@ -6,9 +6,7 @@ (ns backend-tests.rpc-media-test (:require - [app.common.time :as ct] [app.common.uuid :as uuid] - [app.db :as db] [app.http.client :as http] [app.media :as media] [app.rpc :as-alias rpc] @@ -16,7 +14,10 @@ [backend-tests.helpers :as th] [clojure.test :as t] [datoteka.fs :as fs] - [mockery.core :refer [with-mocks]])) + [datoteka.io :as io] + [mockery.core :refer [with-mocks]]) + (:import + java.io.RandomAccessFile)) (t/use-fixtures :once th/state-init) (t/use-fixtures :each th/database-reset) @@ -260,7 +261,7 @@ :is-shared false}) _ (th/db-update! :file - {:deleted-at (ct/now)} + {:deleted-at (app.common.time/now)} {:id (:id file)}) mfile {:filename "sample.jpg" @@ -378,3 +379,325 @@ (t/is (some? err)) (t/is (= :validation (:type (ex-data err)))) (t/is (= :unable-to-download-image (:code (ex-data err)))))))) + +;; -------------------------------------------------------------------- +;; Helpers for chunked-upload tests +;; -------------------------------------------------------------------- + +(defn- split-file-into-chunks + "Splits the file at `path` into byte-array chunks of at most + `chunk-size` bytes. Returns a vector of byte arrays." + [path chunk-size] + (let [file (RandomAccessFile. (str path) "r") + length (.length file)] + (try + (loop [offset 0 chunks []] + (if (>= offset length) + chunks + (let [remaining (- length offset) + size (min chunk-size remaining) + buf (byte-array size)] + (.seek file offset) + (.readFully file buf) + (recur (+ offset size) (conj chunks buf))))) + (finally + (.close file))))) + +(defn- make-chunk-mfile + "Writes `data` (byte array) to a tempfile and returns a map + compatible with `media/schema:upload`." + [data mtype] + (let [tmp (fs/create-tempfile :dir "/tmp/penpot" :prefix "test-chunk-")] + (io/write* tmp data) + {:filename "chunk" + :path tmp + :mtype mtype + :size (alength data)})) + +;; -------------------------------------------------------------------- +;; Chunked-upload tests +;; -------------------------------------------------------------------- + +(defn- create-session! + "Creates an upload session for `prof` with `total-chunks`. Returns the session-id UUID." + [prof total-chunks] + (let [out (th/command! {::th/type :create-upload-session + ::rpc/profile-id (:id prof) + :total-chunks total-chunks})] + (t/is (nil? (:error out))) + (:session-id (:result out)))) + +(t/deftest chunked-upload-happy-path + (let [prof (th/create-profile* 1) + _ (th/create-project* 1 {:profile-id (:id prof) + :team-id (:default-team-id prof)}) + file (th/create-file* 1 {:profile-id (:id prof) + :project-id (:default-project-id prof) + :is-shared false}) + source-path (th/tempfile "backend_tests/test_files/sample.jpg") + chunks (split-file-into-chunks source-path 110000) ; ~107 KB each + mtype "image/jpeg" + total-size (reduce + (map alength chunks)) + session-id (create-session! prof (count chunks))] + + (t/is (= 3 (count chunks))) + + ;; --- 1. Upload chunks --- + (doseq [[idx chunk-data] (map-indexed vector chunks)] + (let [mfile (make-chunk-mfile chunk-data mtype) + out (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof) + :session-id session-id + :index idx + :content mfile})] + (t/is (nil? (:error out))) + (t/is (= session-id (:session-id (:result out)))) + (t/is (= idx (:index (:result out)))))) + + ;; --- 2. Assemble --- + (let [assemble-out (th/command! {::th/type :assemble-file-media-object + ::rpc/profile-id (:id prof) + :session-id session-id + :file-id (:id file) + :is-local true + :name "assembled-image" + :mtype mtype})] + + (t/is (nil? (:error assemble-out))) + (let [{:keys [media-id thumbnail-id] :as result} (:result assemble-out)] + (t/is (= (:id file) (:file-id result))) + (t/is (= 800 (:width result))) + (t/is (= 800 (:height result))) + (t/is (= mtype (:mtype result))) + (t/is (uuid? media-id)) + (t/is (uuid? thumbnail-id)) + + (let [storage (:app.storage/storage th/*system*) + mobj1 (sto/get-object storage media-id) + mobj2 (sto/get-object storage thumbnail-id)] + (t/is (sto/object? mobj1)) + (t/is (sto/object? mobj2)) + (t/is (= total-size (:size mobj1)))))))) + +(t/deftest chunked-upload-idempotency + (let [prof (th/create-profile* 1) + _ (th/create-project* 1 {:profile-id (:id prof) + :team-id (:default-team-id prof)}) + file (th/create-file* 1 {:profile-id (:id prof) + :project-id (:default-project-id prof) + :is-shared false}) + media-id (uuid/next) + source-path (th/tempfile "backend_tests/test_files/sample.jpg") + chunks (split-file-into-chunks source-path 312043) ; single chunk = whole file + mtype "image/jpeg" + mfile (make-chunk-mfile (first chunks) mtype) + session-id (create-session! prof 1)] + + (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof) + :session-id session-id + :index 0 + :content mfile}) + + ;; First assemble succeeds; session row is deleted afterwards + (let [out1 (th/command! {::th/type :assemble-file-media-object + ::rpc/profile-id (:id prof) + :session-id session-id + :file-id (:id file) + :is-local true + :name "sample" + :mtype mtype + :id media-id})] + (t/is (nil? (:error out1))) + (t/is (= media-id (:id (:result out1))))) + + ;; Second assemble with the same session-id must fail because the + ;; session row has been deleted after the first assembly + (let [out2 (th/command! {::th/type :assemble-file-media-object + ::rpc/profile-id (:id prof) + :session-id session-id + :file-id (:id file) + :is-local true + :name "sample" + :mtype mtype + :id media-id})] + (t/is (some? (:error out2))) + (t/is (= :not-found (-> out2 :error ex-data :type))) + (t/is (= :object-not-found (-> out2 :error ex-data :code)))))) + +(t/deftest chunked-upload-no-permission + ;; A second profile must not be able to upload chunks into a session + ;; that belongs to another profile: the DB lookup includes profile-id, + ;; so the session will not be found. + (let [prof1 (th/create-profile* 1) + prof2 (th/create-profile* 2) + session-id (create-session! prof1 1) + source-path (th/tempfile "backend_tests/test_files/sample.jpg") + mfile {:filename "sample.jpg" + :path source-path + :mtype "image/jpeg" + :size 312043} + + ;; prof2 tries to upload a chunk into prof1's session + out (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof2) + :session-id session-id + :index 0 + :content mfile})] + + (t/is (some? (:error out))) + (t/is (= :not-found (-> out :error ex-data :type))))) + +(t/deftest chunked-upload-invalid-media-type + (let [prof (th/create-profile* 1) + _ (th/create-project* 1 {:profile-id (:id prof) + :team-id (:default-team-id prof)}) + file (th/create-file* 1 {:profile-id (:id prof) + :project-id (:default-project-id prof) + :is-shared false}) + session-id (create-session! prof 1) + source-path (th/tempfile "backend_tests/test_files/sample.jpg") + mfile {:filename "sample.jpg" + :path source-path + :mtype "image/jpeg" + :size 312043}] + + (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof) + :session-id session-id + :index 0 + :content mfile}) + + ;; Assemble with a wrong mtype should fail validation + (let [out (th/command! {::th/type :assemble-file-media-object + ::rpc/profile-id (:id prof) + :session-id session-id + :file-id (:id file) + :is-local true + :name "bad-type" + :mtype "application/octet-stream"})] + (t/is (some? (:error out))) + (t/is (= :validation (-> out :error ex-data :type)))))) + +(t/deftest chunked-upload-missing-chunks + (let [prof (th/create-profile* 1) + _ (th/create-project* 1 {:profile-id (:id prof) + :team-id (:default-team-id prof)}) + file (th/create-file* 1 {:profile-id (:id prof) + :project-id (:default-project-id prof) + :is-shared false}) + ;; Session expects 3 chunks + session-id (create-session! prof 3) + source-path (th/tempfile "backend_tests/test_files/sample.jpg") + mfile {:filename "sample.jpg" + :path source-path + :mtype "image/jpeg" + :size 312043}] + + ;; Upload only 1 chunk + (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof) + :session-id session-id + :index 0 + :content mfile}) + + ;; Assemble: session says 3 expected, only 1 stored → :missing-chunks + (let [out (th/command! {::th/type :assemble-file-media-object + ::rpc/profile-id (:id prof) + :session-id session-id + :file-id (:id file) + :is-local true + :name "incomplete" + :mtype "image/jpeg"})] + (t/is (some? (:error out))) + (t/is (= :validation (-> out :error ex-data :type))) + (t/is (= :missing-chunks (-> out :error ex-data :code)))))) + +(t/deftest chunked-upload-session-not-found + (let [prof (th/create-profile* 1) + _ (th/create-project* 1 {:profile-id (:id prof) + :team-id (:default-team-id prof)}) + file (th/create-file* 1 {:profile-id (:id prof) + :project-id (:default-project-id prof) + :is-shared false}) + bogus-id (uuid/next)] + + ;; Assemble with a session-id that was never created + (let [out (th/command! {::th/type :assemble-file-media-object + ::rpc/profile-id (:id prof) + :session-id bogus-id + :file-id (:id file) + :is-local true + :name "ghost" + :mtype "image/jpeg"})] + (t/is (some? (:error out))) + (t/is (= :not-found (-> out :error ex-data :type))) + (t/is (= :object-not-found (-> out :error ex-data :code)))))) + +(t/deftest chunked-upload-over-chunk-limit + ;; Verify that requesting more chunks than the configured maximum + ;; (quotes-upload-chunks-per-session) raises a :restriction error. + (with-mocks [mock {:target 'app.config/get + :return (th/config-get-mock + {:quotes-upload-chunks-per-session 3})}] + (let [prof (th/create-profile* 1) + out (th/command! {::th/type :create-upload-session + ::rpc/profile-id (:id prof) + :total-chunks 4})] + + (t/is (some? (:error out))) + (t/is (= :restriction (-> out :error ex-data :type))) + (t/is (= :max-quote-reached (-> out :error ex-data :code))) + (t/is (= "upload-chunks-per-session" (-> out :error ex-data :target)))))) + +(t/deftest chunked-upload-invalid-chunk-index + ;; Both a negative index and an index >= total-chunks must be + ;; rejected with a :validation / :invalid-chunk-index error. + (let [prof (th/create-profile* 1) + session-id (create-session! prof 2) + source-path (th/tempfile "backend_tests/test_files/sample.jpg") + mfile {:filename "sample.jpg" + :path source-path + :mtype "image/jpeg" + :size 312043}] + + ;; index == total-chunks (out of range) + (let [out (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof) + :session-id session-id + :index 2 + :content mfile})] + (t/is (some? (:error out))) + (t/is (= :validation (-> out :error ex-data :type))) + (t/is (= :invalid-chunk-index (-> out :error ex-data :code)))) + + ;; negative index + (let [out (th/command! {::th/type :upload-chunk + ::rpc/profile-id (:id prof) + :session-id session-id + :index -1 + :content mfile})] + (t/is (some? (:error out))) + (t/is (= :validation (-> out :error ex-data :type))) + (t/is (= :invalid-chunk-index (-> out :error ex-data :code)))))) + +(t/deftest chunked-upload-sessions-per-profile-quota + ;; With the session limit set to 2, creating a third session for the + ;; same profile must fail with :restriction / :max-quote-reached. + ;; The :quotes flag is already enabled by the test fixture. + (with-mocks [mock {:target 'app.config/get + :return (th/config-get-mock + {:quotes-upload-sessions-per-profile 2})}] + (let [prof (th/create-profile* 1)] + + ;; First two sessions succeed + (create-session! prof 1) + (create-session! prof 1) + + ;; Third session must be rejected + (let [out (th/command! {::th/type :create-upload-session + ::rpc/profile-id (:id prof) + :total-chunks 1})] + (t/is (some? (:error out))) + (t/is (= :restriction (-> out :error ex-data :type))) + (t/is (= :max-quote-reached (-> out :error ex-data :code))))))) diff --git a/frontend/src/app/config.cljs b/frontend/src/app/config.cljs index 5059820ede..efda9a9356 100644 --- a/frontend/src/app/config.cljs +++ b/frontend/src/app/config.cljs @@ -160,6 +160,7 @@ (def plugins-list-uri (obj/get global "penpotPluginsListUri" "https://penpot.app/penpothub/plugins")) (def plugins-whitelist (into #{} (obj/get global "penpotPluginsWhitelist" []))) (def templates-uri (obj/get global "penpotTemplatesUri" "https://penpot.github.io/penpot-files/")) +(def upload-chunk-size (obj/get global "penpotUploadChunkSize" (* 1024 1024 25))) ;; 25 MiB ;; We set the current parsed flags under common for make ;; it available for common code without the need to pass diff --git a/frontend/src/app/main/data/uploads.cljs b/frontend/src/app/main/data/uploads.cljs new file mode 100644 index 0000000000..06e87f02d9 --- /dev/null +++ b/frontend/src/app/main/data/uploads.cljs @@ -0,0 +1,70 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns app.main.data.uploads + "Generic chunked-upload helpers. + + Provides a purpose-agnostic three-step session API that can be used + by any feature that needs to upload large binary blobs: + + 1. create-upload-session – obtain a session-id + 2. upload-chunk – upload each slice (max-parallel-chunk-uploads in-flight) + 3. caller-specific step – e.g. assemble-file-media-object or import-binfile + + `upload-blob-chunked` drives steps 1 and 2 and emits the completed + `{:session-id …}` map so that the caller can proceed with its own + step 3." + (:require + [app.common.data.macros :as dm] + [app.common.uuid :as uuid] + [app.config :as cf] + [app.main.repo :as rp] + [beicon.v2.core :as rx])) + +;; Size of each upload chunk in bytes. Reads the penpotUploadChunkSize global +;; variable at startup; defaults to 25 MiB (overridden in production). +(def ^:private chunk-size cf/upload-chunk-size) + +(def ^:private max-parallel-chunk-uploads + "Maximum number of chunk upload requests that may be in-flight at the + same time within a single chunked upload session." + 2) + +(defn upload-blob-chunked + "Uploads `blob` via the three-step chunked session API. + + Steps performed: + 1. Creates an upload session (`create-upload-session`). + 2. Slices `blob` and uploads every chunk (`upload-chunk`), + with at most `max-parallel-chunk-uploads` concurrent requests. + + Returns an observable that emits exactly one map: + `{:session-id }` + + The caller is responsible for the final step (assemble / import)." + [blob] + (let [total-size (.-size blob) + total-chunks (js/Math.ceil (/ total-size chunk-size))] + (->> (rp/cmd! :create-upload-session + {:total-chunks total-chunks}) + (rx/mapcat + (fn [{raw-session-id :session-id}] + (let [session-id (cond-> raw-session-id + (string? raw-session-id) uuid/uuid) + chunk-uploads + (->> (range total-chunks) + (map (fn [idx] + (let [start (* idx chunk-size) + end (min (+ start chunk-size) total-size) + chunk (.slice blob start end)] + (rp/cmd! :upload-chunk + {:session-id session-id + :index idx + :content (list chunk (dm/str "chunk-" idx))})))))] + (->> (rx/from chunk-uploads) + (rx/merge-all max-parallel-chunk-uploads) + (rx/last) + (rx/map (fn [_] {:session-id session-id}))))))))) diff --git a/frontend/src/app/main/data/workspace/media.cljs b/frontend/src/app/main/data/workspace/media.cljs index 0d1e1c6e32..bcffef8378 100644 --- a/frontend/src/app/main/data/workspace/media.cljs +++ b/frontend/src/app/main/data/workspace/media.cljs @@ -24,6 +24,7 @@ [app.main.data.helpers :as dsh] [app.main.data.media :as dmm] [app.main.data.notifications :as ntf] + [app.main.data.uploads :as uploads] [app.main.data.workspace.shapes :as dwsh] [app.main.data.workspace.svg-upload :as svg] [app.main.repo :as rp] @@ -103,6 +104,26 @@ :url url :is-local true})) +;; Size of each upload chunk in bytes — read from config directly, +;; same source used by the uploads namespace. +(def ^:private chunk-size cf/upload-chunk-size) + +(defn- upload-blob-chunked + "Uploads `blob` to `file-id` as a chunked media object using the + three-step session API. Returns an observable that emits the + assembled file-media-object map." + [{:keys [file-id name is-local blob]}] + (let [mtype (.-type blob)] + (->> (uploads/upload-blob-chunked blob) + (rx/mapcat + (fn [{:keys [session-id]}] + (rp/cmd! :assemble-file-media-object + {:session-id session-id + :file-id file-id + :is-local is-local + :name name + :mtype mtype})))))) + (defn process-uris [{:keys [file-id local? name uris mtype on-image on-svg]}] (letfn [(svg-url? [url] @@ -143,12 +164,18 @@ (and (not force-media) (= (.-type blob) "image/svg+xml"))) - (prepare-blob [blob] - (let [name (or name (if (dmm/file? blob) (media/strip-image-extension (.-name blob)) "blob"))] - {:file-id file-id - :name name - :is-local local? - :content blob})) + (upload-blob [blob] + (let [params {:file-id file-id + :name (or name (if (dmm/file? blob) (media/strip-image-extension (.-name blob)) "blob")) + :is-local local? + :blob blob}] + (if (>= (.-size blob) chunk-size) + (upload-blob-chunked params) + (rp/cmd! :upload-file-media-object + {:file-id file-id + :name (:name params) + :is-local local? + :content blob})))) (extract-content [blob] (let [name (or name (.-name blob))] @@ -159,8 +186,7 @@ (->> (rx/from blobs) (rx/map dmm/validate-file) (rx/filter (comp not svg-blob?)) - (rx/map prepare-blob) - (rx/mapcat #(rp/cmd! :upload-file-media-object %)) + (rx/mapcat upload-blob) (rx/tap on-image)) (->> (rx/from blobs) @@ -170,9 +196,10 @@ (rx/merge-map svg->clj) (rx/tap on-svg))))) -(defn handle-media-error [error on-error] - (if (ex/ex-info? error) - (handle-media-error (ex-data error) on-error) +(defn handle-media-error + [cause] + (ex/print-throwable cause) + (let [error (ex-data cause)] (cond (= (:code error) :invalid-svg-file) (rx/of (ntf/error (tr "errors.media-type-not-allowed"))) @@ -195,13 +222,8 @@ (= (:code error) :unable-to-optimize) (rx/of (ntf/error (:hint error))) - (fn? on-error) - (on-error error) - :else - (do - (.error js/console "ERROR" error) - (rx/of (ntf/error (tr "errors.cannot-upload"))))))) + (rx/of (ntf/error (tr "errors.cannot-upload")))))) (def ^:private @@ -215,7 +237,7 @@ [:mtype {:optional true} :string]]) (defn- process-media-objects - [{:keys [uris on-error] :as params}] + [{:keys [uris] :as params}] (dm/assert! (and (sm/check schema:process-media-objects params) (or (contains? params :blobs) @@ -238,7 +260,7 @@ ;; Every stream has its own sideeffect. We need to ignore the result (rx/ignore) - (rx/catch #(handle-media-error % on-error)) + (rx/catch handle-media-error) (rx/finalize #(st/emit! (ntf/hide :tag :media-loading)))))))) (defn upload-media-workspace @@ -278,8 +300,6 @@ (rx/tap on-upload-success) (rx/catch handle-media-error)))))) -;; --- Upload File Media objects - (defn create-shapes-svg "Convert svg elements into penpot shapes." [file-id objects pos svg-data] diff --git a/frontend/src/app/main/repo.cljs b/frontend/src/app/main/repo.cljs index 6f264e5d02..1e6dd417a6 100644 --- a/frontend/src/app/main/repo.cljs +++ b/frontend/src/app/main/repo.cljs @@ -139,8 +139,7 @@ {:stream? true} ::sse/import-binfile - {:stream? true - :form-data? true} + {:stream? true} ::sse/permanently-delete-team-files {:stream? true} @@ -273,6 +272,7 @@ (send-export (merge default params)))) (derive :upload-file-media-object ::multipart-upload) +(derive :upload-chunk ::multipart-upload) (derive :update-profile-photo ::multipart-upload) (derive :update-team-photo ::multipart-upload) diff --git a/frontend/src/app/worker/import.cljs b/frontend/src/app/worker/import.cljs index a191b9466f..20c314f012 100644 --- a/frontend/src/app/worker/import.cljs +++ b/frontend/src/app/worker/import.cljs @@ -11,6 +11,7 @@ [app.common.logging :as log] [app.common.schema :as sm] [app.common.uuid :as uuid] + [app.main.data.uploads :as uploads] [app.main.repo :as rp] [app.util.http :as http] [app.util.i18n :as i18n :refer [tr]] @@ -129,6 +130,23 @@ (->> (rx/from files) (rx/merge-map analyze-file))) +(defn- import-blob-via-upload + "Fetches `uri` as a Blob, uploads it using the generic chunked-upload + session API and calls `import-binfile` with the resulting upload-id. + Returns an observable of SSE events from the import stream." + [uri {:keys [name version project-id]}] + (->> (slurp-uri uri :blob) + (rx/mapcat + (fn [blob] + (->> (uploads/upload-blob-chunked blob) + (rx/mapcat + (fn [{:keys [session-id]}] + (rp/cmd! ::sse/import-binfile + {:name name + :upload-id session-id + :version version + :project-id project-id})))))))) + (defmethod impl/handler :import-files [{:keys [project-id files]}] (let [binfile-v1 (filter #(= :binfile-v1 (:type %)) files) @@ -138,31 +156,22 @@ (->> (rx/from binfile-v1) (rx/merge-map (fn [data] - (->> (http/send! - {:uri (:uri data) - :response-type :blob - :method :get}) - (rx/map :body) - (rx/mapcat - (fn [file] - (->> (rp/cmd! ::sse/import-binfile - {:name (str/replace (:name data) #".penpot$" "") - :file file - :version 1 - :project-id project-id}) - (rx/tap (fn [event] - (let [payload (sse/get-payload event) - type (sse/get-type event)] - (if (= type "progress") - (log/dbg :hint "import-binfile: progress" - :section (:section payload) - :name (:name payload)) - (log/dbg :hint "import-binfile: end"))))) - (rx/filter sse/end-of-stream?) - (rx/map (fn [_] - {:status :finish - :file-id (:file-id data)}))))) - + (->> (import-blob-via-upload (:uri data) + {:name (str/replace (:name data) #".penpot$" "") + :version 1 + :project-id project-id}) + (rx/tap (fn [event] + (let [payload (sse/get-payload event) + type (sse/get-type event)] + (if (= type "progress") + (log/dbg :hint "import-binfile: progress" + :section (:section payload) + :name (:name payload)) + (log/dbg :hint "import-binfile: end"))))) + (rx/filter sse/end-of-stream?) + (rx/map (fn [_] + {:status :finish + :file-id (:file-id data)})) (rx/catch (fn [cause] (log/error :hint "unexpected error on import process" @@ -179,29 +188,24 @@ (rx/mapcat identity) (rx/merge-map (fn [[uri entries]] - (->> (slurp-uri uri :blob) - (rx/mapcat (fn [content] - ;; FIXME: implement the naming and filtering - (->> (rp/cmd! ::sse/import-binfile - {:name (-> entries first :name) - :file content - :version 3 - :project-id project-id}) - (rx/tap (fn [event] - (let [payload (sse/get-payload event) - type (sse/get-type event)] - (if (= type "progress") - (log/dbg :hint "import-binfile: progress" - :section (:section payload) - :name (:name payload)) - (log/dbg :hint "import-binfile: end"))))) - (rx/filter sse/end-of-stream?) - (rx/mapcat (fn [_] - (->> (rx/from entries) - (rx/map (fn [entry] - {:status :finish - :file-id (:file-id entry)})))))))) - + (->> (import-blob-via-upload uri + {:name (-> entries first :name) + :version 3 + :project-id project-id}) + (rx/tap (fn [event] + (let [payload (sse/get-payload event) + type (sse/get-type event)] + (if (= type "progress") + (log/dbg :hint "import-binfile: progress" + :section (:section payload) + :name (:name payload)) + (log/dbg :hint "import-binfile: end"))))) + (rx/filter sse/end-of-stream?) + (rx/mapcat (fn [_] + (->> (rx/from entries) + (rx/map (fn [entry] + {:status :finish + :file-id (:file-id entry)}))))) (rx/catch (fn [cause] (log/error :hint "unexpected error on import process" @@ -213,5 +217,3 @@ {:status :error :error (ex-message cause) :file-id (:file-id entry)})))))))))))) - - diff --git a/frontend/test/frontend_tests/data/uploads_test.cljs b/frontend/test/frontend_tests/data/uploads_test.cljs new file mode 100644 index 0000000000..1512fcb90b --- /dev/null +++ b/frontend/test/frontend_tests/data/uploads_test.cljs @@ -0,0 +1,117 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns frontend-tests.data.uploads-test + "Integration tests for the generic chunked-upload logic in + app.main.data.uploads." + (:require + [app.common.uuid :as uuid] + [app.config :as cf] + [app.main.data.uploads :as uploads] + [beicon.v2.core :as rx] + [cljs.test :as t :include-macros true] + [frontend-tests.helpers.http :as http])) + +;; --------------------------------------------------------------------------- +;; Local helpers +;; --------------------------------------------------------------------------- + +(defn- make-blob + "Creates a JS Blob of exactly `size` bytes." + [size] + (let [buf (js/Uint8Array. size)] + (js/Blob. #js [buf] #js {:type "application/octet-stream"}))) + +;; --------------------------------------------------------------------------- +;; upload-blob-chunked tests +;; --------------------------------------------------------------------------- + +(t/deftest upload-blob-chunked-creates-session-and-uploads-chunks + (t/testing "upload-blob-chunked calls create-upload-session then upload-chunk for each slice" + (t/async done + (let [session-id (uuid/next) + chunk-size cf/upload-chunk-size + ;; Exactly two full chunks + blob-size (* 2 chunk-size) + blob (make-blob blob-size) + calls (atom []) + + fetch-mock + (fn [url _opts] + (let [cmd (http/url->cmd url)] + (swap! calls conj cmd) + (js/Promise.resolve + (case cmd + :create-upload-session + (http/make-transit-response + {:session-id session-id}) + + :upload-chunk + (http/make-transit-response + {:session-id session-id :index 0}) + + (http/make-json-response + {:error (str "unexpected cmd: " cmd)}))))) + + orig (http/install-fetch-mock! fetch-mock)] + + (->> (uploads/upload-blob-chunked blob) + (rx/subs! + (fn [{:keys [session-id]}] + (t/is (uuid? session-id))) + (fn [err] + (t/is false (str "unexpected error: " (ex-message err))) + (done)) + (fn [] + (http/restore-fetch! orig) + (let [cmd-seq @calls] + ;; First call must create the session + (t/is (= :create-upload-session (first cmd-seq))) + ;; Two chunk uploads + (t/is (= 2 (count (filter #(= :upload-chunk %) cmd-seq)))) + ;; No assemble call here — that's the caller's responsibility + (t/is (not (some #(= :assemble-file-media-object %) cmd-seq)))) + (done)))))))) + +(t/deftest upload-blob-chunked-chunk-count-matches-blob + (t/testing "number of upload-chunk calls equals ceil(blob-size / chunk-size)" + (t/async done + (let [session-id (uuid/next) + chunk-size cf/upload-chunk-size + ;; Three chunks: 2 full + 1 partial + blob-size (+ (* 2 chunk-size) 1) + blob (make-blob blob-size) + chunk-calls (atom 0) + + fetch-mock + (fn [url _opts] + (let [cmd (http/url->cmd url)] + (js/Promise.resolve + (case cmd + :create-upload-session + (http/make-transit-response + {:session-id session-id}) + + :upload-chunk + (do (swap! chunk-calls inc) + (http/make-transit-response + {:session-id session-id :index 0})) + + (http/make-json-response + {:error (str "unexpected cmd: " cmd)}))))) + + orig (http/install-fetch-mock! fetch-mock)] + + (->> (uploads/upload-blob-chunked blob) + (rx/subs! + (fn [_] nil) + (fn [err] + (t/is false (str "unexpected error: " (ex-message err))) + (done)) + (fn [] + (http/restore-fetch! orig) + (t/is (= 3 @chunk-calls)) + (done)))))))) diff --git a/frontend/test/frontend_tests/data/workspace_media_test.cljs b/frontend/test/frontend_tests/data/workspace_media_test.cljs new file mode 100644 index 0000000000..915adb203b --- /dev/null +++ b/frontend/test/frontend_tests/data/workspace_media_test.cljs @@ -0,0 +1,189 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns frontend-tests.data.workspace-media-test + "Integration tests for the chunked-upload logic in + app.main.data.workspace.media." + (:require + [app.common.uuid :as uuid] + [app.config :as cf] + [app.main.data.workspace.media :as media] + [beicon.v2.core :as rx] + [cljs.test :as t :include-macros true] + [frontend-tests.helpers.http :as http])) + +;; --------------------------------------------------------------------------- +;; Local helpers +;; --------------------------------------------------------------------------- + +(defn- make-blob + "Creates a JS Blob of exactly `size` bytes with the given `mtype`." + [size mtype] + (let [buf (js/Uint8Array. size)] + (js/Blob. #js [buf] #js {:type mtype}))) + +;; --------------------------------------------------------------------------- +;; Small-file path: direct upload (no chunking) +;; --------------------------------------------------------------------------- + +(t/deftest small-file-uses-direct-upload + (t/testing "blobs below chunk-size use :upload-file-media-object directly" + (t/async done + (let [file-id (uuid/next) + ;; One byte below the threshold so the blob takes the direct path + blob-size (dec cf/upload-chunk-size) + blob (make-blob blob-size "image/jpeg") + calls (atom []) + + fetch-mock + (fn [url _opts] + (let [cmd (http/url->cmd url)] + (swap! calls conj cmd) + (js/Promise.resolve + (http/make-json-response + {:id (str (uuid/next)) + :name "img" + :width 100 + :height 100 + :mtype "image/jpeg" + :file-id (str file-id)})))) + + orig (http/install-fetch-mock! fetch-mock)] + + (->> (media/process-blobs + {:file-id file-id + :local? true + :blobs [blob] + :on-image (fn [_] nil) + :on-svg (fn [_] nil)}) + (rx/subs! + (fn [_] nil) + (fn [err] + (t/is false (str "unexpected error: " (ex-message err))) + (done)) + (fn [] + (http/restore-fetch! orig) + ;; Should call :upload-file-media-object, NOT the chunked API + (t/is (= 1 (count @calls))) + (t/is (= :upload-file-media-object (first @calls))) + (done)))))))) + +;; --------------------------------------------------------------------------- +;; Large-file path: chunked upload via uploads namespace +;; --------------------------------------------------------------------------- + +(t/deftest large-file-uses-chunked-upload + (t/testing "blobs at or above chunk-size use the three-step session API" + (t/async done + (let [file-id (uuid/next) + session-id (uuid/next) + chunk-size cf/upload-chunk-size + ;; Exactly two full chunks + blob-size (* 2 chunk-size) + blob (make-blob blob-size "image/jpeg") + calls (atom []) + + fetch-mock + (fn [url _opts] + (let [cmd (http/url->cmd url)] + (swap! calls conj cmd) + (js/Promise.resolve + (http/make-json-response + (case cmd + :create-upload-session + {:session-id (str session-id)} + + :upload-chunk + {:session-id (str session-id) :index 0} + + :assemble-file-media-object + {:id (str (uuid/next)) + :name "img" + :width 100 + :height 100 + :mtype "image/jpeg" + :file-id (str file-id)} + + ;; Default: return an error response + {:error (str "unexpected cmd: " cmd)}))))) + + orig (http/install-fetch-mock! fetch-mock)] + + (->> (media/process-blobs + {:file-id file-id + :local? true + :blobs [blob] + :on-image (fn [_] nil) + :on-svg (fn [_] nil)}) + (rx/subs! + (fn [_] nil) + (fn [err] + (t/is false (str "unexpected error: " (ex-message err))) + (done)) + (fn [] + (http/restore-fetch! orig) + (let [cmd-seq @calls] + ;; First call must create the session + (t/is (= :create-upload-session (first cmd-seq))) + ;; Two chunk uploads + (t/is (= 2 (count (filter #(= :upload-chunk %) cmd-seq)))) + ;; Last call must assemble + (t/is (= :assemble-file-media-object (last cmd-seq))) + ;; Direct upload must NOT be called + (t/is (not (some #(= :upload-file-media-object %) cmd-seq)))) + (done)))))))) + +(t/deftest chunked-upload-chunk-count-matches-blob + (t/testing "number of chunk upload calls equals ceil(blob-size / chunk-size)" + (t/async done + (let [file-id (uuid/next) + session-id (uuid/next) + chunk-size cf/upload-chunk-size + ;; Three chunks: 2 full + 1 partial + blob-size (+ (* 2 chunk-size) 1) + blob (make-blob blob-size "image/jpeg") + chunk-calls (atom 0) + + fetch-mock + (fn [url _opts] + (let [cmd (http/url->cmd url)] + (js/Promise.resolve + (http/make-json-response + (case cmd + :create-upload-session + {:session-id (str session-id)} + + :upload-chunk + (do (swap! chunk-calls inc) + {:session-id (str session-id) :index 0}) + + :assemble-file-media-object + {:id (str (uuid/next)) + :name "img" + :width 100 + :height 100 + :mtype "image/jpeg" + :file-id (str file-id)} + + {:error (str "unexpected cmd: " cmd)}))))) + + orig (http/install-fetch-mock! fetch-mock)] + + (->> (media/process-blobs + {:file-id file-id + :local? true + :blobs [blob] + :on-image (fn [_] nil) + :on-svg (fn [_] nil)}) + (rx/subs! + (fn [_] nil) + (fn [err] + (t/is false (str "unexpected error: " (ex-message err))) + (done)) + (fn [] + (http/restore-fetch! orig) + (t/is (= 3 @chunk-calls)) + (done)))))))) diff --git a/frontend/test/frontend_tests/helpers/http.cljs b/frontend/test/frontend_tests/helpers/http.cljs new file mode 100644 index 0000000000..28895f4049 --- /dev/null +++ b/frontend/test/frontend_tests/helpers/http.cljs @@ -0,0 +1,61 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) KALEIDOS INC + +(ns frontend-tests.helpers.http + "Helpers for intercepting and mocking the global `fetch` function in + ClojureScript tests. The underlying HTTP layer (`app.util.http`) calls + `(js/fetch url params)` directly, so replacing `globalThis.fetch` is the + correct interception point." + (:require + [app.common.transit :as t] + [clojure.string :as str])) + +(defn install-fetch-mock! + "Replaces the global `js/fetch` with `handler-fn`. + + `handler-fn` is called with `[url opts]` where `url` is a plain string + such as `\"http://localhost/api/main/methods/some-cmd\"`. It must return + a JS Promise that resolves to a fetch Response object. + + Returns the previous `globalThis.fetch` value so callers can restore it + with [[restore-fetch!]]." + [handler-fn] + (let [prev (.-fetch js/globalThis)] + (set! (.-fetch js/globalThis) handler-fn) + prev)) + +(defn restore-fetch! + "Restores `globalThis.fetch` to `orig` (the value returned by + [[install-fetch-mock!]])." + [orig] + (set! (.-fetch js/globalThis) orig)) + +(defn make-json-response + "Creates a minimal fetch `Response` that returns `body-clj` serialised as + plain JSON with HTTP status 200." + [body-clj] + (let [json-str (.stringify js/JSON (clj->js body-clj)) + headers (js/Headers. #js {"content-type" "application/json"})] + (js/Response. json-str #js {:status 200 :headers headers}))) + +(defn make-transit-response + "Creates a minimal fetch `Response` that returns `body-clj` serialised as + Transit+JSON with HTTP status 200. Use this helper when the code under + test inspects typed values (UUIDs, keywords, etc.) from the response body, + since the HTTP layer only decodes transit+json content automatically." + [body-clj] + (let [transit-str (t/encode-str body-clj {:type :json-verbose}) + headers (js/Headers. #js {"content-type" "application/transit+json"})] + (js/Response. transit-str #js {:status 200 :headers headers}))) + +(defn url->cmd + "Extracts the RPC command keyword from a URL string. + + Example: `\"http://…/api/main/methods/create-upload-session\"` + → `:create-upload-session`." + [url] + (when (string? url) + (keyword (last (str/split url #"/"))))) diff --git a/frontend/test/frontend_tests/runner.cljs b/frontend/test/frontend_tests/runner.cljs index 003e68264c..ff7a1f0699 100644 --- a/frontend/test/frontend_tests/runner.cljs +++ b/frontend/test/frontend_tests/runner.cljs @@ -3,8 +3,10 @@ [cljs.test :as t] [frontend-tests.basic-shapes-test] [frontend-tests.data.repo-test] + [frontend-tests.data.uploads-test] [frontend-tests.data.viewer-test] [frontend-tests.data.workspace-colors-test] + [frontend-tests.data.workspace-media-test] [frontend-tests.data.workspace-texts-test] [frontend-tests.data.workspace-thumbnails-test] [frontend-tests.helpers-shapes-test] @@ -43,8 +45,10 @@ 'frontend-tests.basic-shapes-test 'frontend-tests.data.repo-test 'frontend-tests.main-errors-test + 'frontend-tests.data.uploads-test 'frontend-tests.data.viewer-test 'frontend-tests.data.workspace-colors-test + 'frontend-tests.data.workspace-media-test 'frontend-tests.data.workspace-texts-test 'frontend-tests.data.workspace-thumbnails-test 'frontend-tests.helpers-shapes-test