diff --git a/backend/app/gateway/routers/feedback.py b/backend/app/gateway/routers/feedback.py index 579b29a9e..2bf631d01 100644 --- a/backend/app/gateway/routers/feedback.py +++ b/backend/app/gateway/routers/feedback.py @@ -12,6 +12,7 @@ from typing import Any from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field +from app.gateway.authz import require_permission from app.gateway.deps import get_current_user, get_feedback_repo, get_run_store logger = logging.getLogger(__name__) @@ -53,6 +54,7 @@ class FeedbackStatsResponse(BaseModel): @router.post("/{thread_id}/runs/{run_id}/feedback", response_model=FeedbackResponse) +@require_permission("threads", "write", owner_check=True, require_existing=True) async def create_feedback( thread_id: str, run_id: str, @@ -85,6 +87,7 @@ async def create_feedback( @router.get("/{thread_id}/runs/{run_id}/feedback", response_model=list[FeedbackResponse]) +@require_permission("threads", "read", owner_check=True) async def list_feedback( thread_id: str, run_id: str, @@ -96,6 +99,7 @@ async def list_feedback( @router.get("/{thread_id}/runs/{run_id}/feedback/stats", response_model=FeedbackStatsResponse) +@require_permission("threads", "read", owner_check=True) async def feedback_stats( thread_id: str, run_id: str, @@ -107,6 +111,7 @@ async def feedback_stats( @router.delete("/{thread_id}/runs/{run_id}/feedback/{feedback_id}") +@require_permission("threads", "delete", owner_check=True, require_existing=True) async def delete_feedback( thread_id: str, run_id: str, diff --git a/backend/packages/harness/deerflow/persistence/feedback/sql.py b/backend/packages/harness/deerflow/persistence/feedback/sql.py index eae2f9997..903124953 100644 --- a/backend/packages/harness/deerflow/persistence/feedback/sql.py +++ b/backend/packages/harness/deerflow/persistence/feedback/sql.py @@ -12,6 +12,7 @@ from sqlalchemy import case, func, select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.feedback.model import FeedbackRow +from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_owner_id class FeedbackRepository: @@ -32,18 +33,19 @@ class FeedbackRepository: run_id: str, thread_id: str, rating: int, - owner_id: str | None = None, + owner_id: str | None | _AutoSentinel = AUTO, message_id: str | None = None, comment: str | None = None, ) -> dict: """Create a feedback record. rating must be +1 or -1.""" if rating not in (1, -1): raise ValueError(f"rating must be +1 or -1, got {rating}") + resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.create") row = FeedbackRow( feedback_id=str(uuid.uuid4()), run_id=run_id, thread_id=thread_id, - owner_id=owner_id, + owner_id=resolved_owner_id, message_id=message_id, rating=rating, comment=comment, @@ -55,27 +57,66 @@ class FeedbackRepository: await session.refresh(row) return self._row_to_dict(row) - async def get(self, feedback_id: str) -> dict | None: - async with self._sf() as session: - row = await session.get(FeedbackRow, feedback_id) - return self._row_to_dict(row) if row else None - - async def list_by_run(self, thread_id: str, run_id: str, *, limit: int = 100) -> list[dict]: - stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id, FeedbackRow.run_id == run_id).order_by(FeedbackRow.created_at.asc()).limit(limit) - async with self._sf() as session: - result = await session.execute(stmt) - return [self._row_to_dict(r) for r in result.scalars()] - - async def list_by_thread(self, thread_id: str, *, limit: int = 100) -> list[dict]: - stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id).order_by(FeedbackRow.created_at.asc()).limit(limit) - async with self._sf() as session: - result = await session.execute(stmt) - return [self._row_to_dict(r) for r in result.scalars()] - - async def delete(self, feedback_id: str) -> bool: + async def get( + self, + feedback_id: str, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> dict | None: + resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.get") async with self._sf() as session: row = await session.get(FeedbackRow, feedback_id) if row is None: + return None + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: + return None + return self._row_to_dict(row) + + async def list_by_run( + self, + thread_id: str, + run_id: str, + *, + limit: int = 100, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> list[dict]: + resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.list_by_run") + stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id, FeedbackRow.run_id == run_id) + if resolved_owner_id is not None: + stmt = stmt.where(FeedbackRow.owner_id == resolved_owner_id) + stmt = stmt.order_by(FeedbackRow.created_at.asc()).limit(limit) + async with self._sf() as session: + result = await session.execute(stmt) + return [self._row_to_dict(r) for r in result.scalars()] + + async def list_by_thread( + self, + thread_id: str, + *, + limit: int = 100, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> list[dict]: + resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.list_by_thread") + stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id) + if resolved_owner_id is not None: + stmt = stmt.where(FeedbackRow.owner_id == resolved_owner_id) + stmt = stmt.order_by(FeedbackRow.created_at.asc()).limit(limit) + async with self._sf() as session: + result = await session.execute(stmt) + return [self._row_to_dict(r) for r in result.scalars()] + + async def delete( + self, + feedback_id: str, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> bool: + resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.delete") + async with self._sf() as session: + row = await session.get(FeedbackRow, feedback_id) + if row is None: + return False + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: return False await session.delete(row) await session.commit() diff --git a/backend/packages/harness/deerflow/persistence/run/sql.py b/backend/packages/harness/deerflow/persistence/run/sql.py index fac88d968..5d8656509 100644 --- a/backend/packages/harness/deerflow/persistence/run/sql.py +++ b/backend/packages/harness/deerflow/persistence/run/sql.py @@ -16,6 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.run.model import RunRow from deerflow.runtime.runs.store.base import RunStore +from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_owner_id class RunRepository(RunStore): @@ -68,7 +69,7 @@ class RunRepository(RunStore): *, thread_id, assistant_id=None, - owner_id=None, + owner_id: str | None | _AutoSentinel = AUTO, status="pending", multitask_strategy="reject", metadata=None, @@ -77,12 +78,13 @@ class RunRepository(RunStore): created_at=None, follow_up_to_run_id=None, ): + resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.put") now = datetime.now(UTC) row = RunRow( run_id=run_id, thread_id=thread_id, assistant_id=assistant_id, - owner_id=owner_id, + owner_id=resolved_owner_id, status=status, multitask_strategy=multitask_strategy, metadata_json=self._safe_json(metadata) or {}, @@ -96,15 +98,32 @@ class RunRepository(RunStore): session.add(row) await session.commit() - async def get(self, run_id): + async def get( + self, + run_id, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ): + resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.get") async with self._sf() as session: row = await session.get(RunRow, run_id) - return self._row_to_dict(row) if row else None + if row is None: + return None + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: + return None + return self._row_to_dict(row) - async def list_by_thread(self, thread_id, *, owner_id=None, limit=100): + async def list_by_thread( + self, + thread_id, + *, + owner_id: str | None | _AutoSentinel = AUTO, + limit=100, + ): + resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.list_by_thread") stmt = select(RunRow).where(RunRow.thread_id == thread_id) - if owner_id is not None: - stmt = stmt.where(RunRow.owner_id == owner_id) + if resolved_owner_id is not None: + stmt = stmt.where(RunRow.owner_id == resolved_owner_id) stmt = stmt.order_by(RunRow.created_at.desc()).limit(limit) async with self._sf() as session: result = await session.execute(stmt) @@ -118,12 +137,21 @@ class RunRepository(RunStore): await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values)) await session.commit() - async def delete(self, run_id): + async def delete( + self, + run_id, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ): + resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.delete") async with self._sf() as session: row = await session.get(RunRow, run_id) - if row is not None: - await session.delete(row) - await session.commit() + if row is None: + return + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: + return + await session.delete(row) + await session.commit() async def list_pending(self, *, before=None): if before is None: diff --git a/backend/packages/harness/deerflow/persistence/thread_meta/sql.py b/backend/packages/harness/deerflow/persistence/thread_meta/sql.py index 86c73030e..5a149e5d6 100644 --- a/backend/packages/harness/deerflow/persistence/thread_meta/sql.py +++ b/backend/packages/harness/deerflow/persistence/thread_meta/sql.py @@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.thread_meta.base import ThreadMetaStore from deerflow.persistence.thread_meta.model import ThreadMetaRow +from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_owner_id class ThreadMetaRepository(ThreadMetaStore): @@ -31,15 +32,18 @@ class ThreadMetaRepository(ThreadMetaStore): thread_id: str, *, assistant_id: str | None = None, - owner_id: str | None = None, + owner_id: str | None | _AutoSentinel = AUTO, display_name: str | None = None, metadata: dict | None = None, ) -> dict: + # Auto-resolve owner_id from contextvar when AUTO; explicit None + # creates an orphan row (used by migration scripts). + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.create") now = datetime.now(UTC) row = ThreadMetaRow( thread_id=thread_id, assistant_id=assistant_id, - owner_id=owner_id, + owner_id=resolved_owner_id, display_name=display_name, metadata_json=metadata or {}, created_at=now, @@ -51,10 +55,21 @@ class ThreadMetaRepository(ThreadMetaStore): await session.refresh(row) return self._row_to_dict(row) - async def get(self, thread_id: str) -> dict | None: + async def get( + self, + thread_id: str, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> dict | None: + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.get") async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) - return self._row_to_dict(row) if row else None + if row is None: + return None + # Enforce owner filter unless explicitly bypassed (owner_id=None). + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: + return None + return self._row_to_dict(row) async def list_by_owner(self, owner_id: str, *, limit: int = 100, offset: int = 0) -> list[dict]: stmt = select(ThreadMetaRow).where(ThreadMetaRow.owner_id == owner_id).order_by(ThreadMetaRow.updated_at.desc()).limit(limit).offset(offset) @@ -62,16 +77,32 @@ class ThreadMetaRepository(ThreadMetaStore): result = await session.execute(stmt) return [self._row_to_dict(r) for r in result.scalars()] - async def check_access(self, thread_id: str, owner_id: str) -> bool: - """Check if owner_id has access to thread_id. + async def check_access(self, thread_id: str, owner_id: str, *, require_existing: bool = False) -> bool: + """Check if ``owner_id`` has access to ``thread_id``. - Returns True if: row doesn't exist (untracked thread), owner_id - is None on the row (shared thread), or owner_id matches. + Two modes — one row, two distinct semantics depending on what + the caller is about to do: + + - ``require_existing=False`` (default, permissive): + Returns True for: row missing (untracked legacy thread), + ``row.owner_id`` is None (shared / pre-auth data), + or ``row.owner_id == owner_id``. Use for **read-style** + decorators where treating an untracked thread as accessible + preserves backward-compat. + + - ``require_existing=True`` (strict): + Returns True **only** when the row exists AND + (``row.owner_id == owner_id`` OR ``row.owner_id is None``). + Use for **destructive / mutating** decorators (DELETE, PATCH, + state-update) so a thread that has *already been deleted* + cannot be re-targeted by any caller — closing the + delete-idempotence cross-user gap where the row vanishing + made every other user appear to "own" it. """ async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) if row is None: - return True + return not require_existing if row.owner_id is None: return True return row.owner_id == owner_id @@ -83,9 +114,17 @@ class ThreadMetaRepository(ThreadMetaStore): status: str | None = None, limit: int = 100, offset: int = 0, + owner_id: str | None | _AutoSentinel = AUTO, ) -> list[dict]: - """Search threads with optional metadata and status filters.""" + """Search threads with optional metadata and status filters. + + Owner filter is enforced by default: caller must be in a user + context. Pass ``owner_id=None`` to bypass (migration/CLI). + """ + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.search") stmt = select(ThreadMetaRow).order_by(ThreadMetaRow.updated_at.desc()) + if resolved_owner_id is not None: + stmt = stmt.where(ThreadMetaRow.owner_id == resolved_owner_id) if status: stmt = stmt.where(ThreadMetaRow.status == status) @@ -105,36 +144,80 @@ class ThreadMetaRepository(ThreadMetaStore): result = await session.execute(stmt) return [self._row_to_dict(r) for r in result.scalars()] - async def update_display_name(self, thread_id: str, display_name: str) -> None: + async def _check_ownership(self, session: AsyncSession, thread_id: str, resolved_owner_id: str | None) -> bool: + """Return True if the row exists and is owned (or filter bypassed).""" + if resolved_owner_id is None: + return True # explicit bypass + row = await session.get(ThreadMetaRow, thread_id) + return row is not None and row.owner_id == resolved_owner_id + + async def update_display_name( + self, + thread_id: str, + display_name: str, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> None: """Update the display_name (title) for a thread.""" + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.update_display_name") async with self._sf() as session: + if not await self._check_ownership(session, thread_id, resolved_owner_id): + return await session.execute(update(ThreadMetaRow).where(ThreadMetaRow.thread_id == thread_id).values(display_name=display_name, updated_at=datetime.now(UTC))) await session.commit() - async def update_status(self, thread_id: str, status: str) -> None: + async def update_status( + self, + thread_id: str, + status: str, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> None: + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.update_status") async with self._sf() as session: + if not await self._check_ownership(session, thread_id, resolved_owner_id): + return await session.execute(update(ThreadMetaRow).where(ThreadMetaRow.thread_id == thread_id).values(status=status, updated_at=datetime.now(UTC))) await session.commit() - async def update_metadata(self, thread_id: str, metadata: dict) -> None: + async def update_metadata( + self, + thread_id: str, + metadata: dict, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> None: """Merge ``metadata`` into ``metadata_json``. Read-modify-write inside a single session/transaction so concurrent - callers see consistent state. No-op if the row does not exist. + callers see consistent state. No-op if the row does not exist or + the owner_id check fails. """ + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.update_metadata") async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) if row is None: return + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: + return merged = dict(row.metadata_json or {}) merged.update(metadata) row.metadata_json = merged row.updated_at = datetime.now(UTC) await session.commit() - async def delete(self, thread_id: str) -> None: + async def delete( + self, + thread_id: str, + *, + owner_id: str | None | _AutoSentinel = AUTO, + ) -> None: + resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.delete") async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) - if row is not None: - await session.delete(row) - await session.commit() + if row is None: + return + if resolved_owner_id is not None and row.owner_id != resolved_owner_id: + return + await session.delete(row) + await session.commit()