diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index 5ea7f6751..b36bfa4ce 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -80,7 +80,7 @@ async def langgraph_runtime(app: FastAPI) -> AsyncGenerator[None, None]: # --------------------------------------------------------------------------- -# Getters -- called by routers per-request +# Getters – called by routers per-request # --------------------------------------------------------------------------- @@ -113,6 +113,37 @@ def get_store(request: Request): get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store") +def get_run_context(request: Request) -> RunContext: + """Build a :class:`RunContext` from ``app.state`` singletons. + + Returns a *base* context with infrastructure dependencies. Callers that + need per-run fields (e.g. ``follow_up_to_run_id``) should use + ``dataclasses.replace(ctx, follow_up_to_run_id=...)`` before passing it + to :func:`run_agent`. + """ + from deerflow.config import get_app_config + + return RunContext( + checkpointer=get_checkpointer(request), + store=get_store(request), + event_store=get_run_event_store(request), + run_events_config=getattr(get_app_config(), "run_events", None), + thread_meta_repo=get_thread_meta_repo(request), + ) + + +async def get_current_user(request: Request) -> str | None: + """Extract user identity from request. + + Phase 2: always returns None (no authentication). + Phase 3: extract user_id from JWT / session / API key header. + """ + return None + + +get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store") + + def get_run_context(request: Request) -> RunContext: """Build a :class:`RunContext` from ``app.state`` singletons. diff --git a/backend/app/gateway/routers/feedback.py b/backend/app/gateway/routers/feedback.py index 2bf631d01..579b29a9e 100644 --- a/backend/app/gateway/routers/feedback.py +++ b/backend/app/gateway/routers/feedback.py @@ -12,7 +12,6 @@ from typing import Any from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field -from app.gateway.authz import require_permission from app.gateway.deps import get_current_user, get_feedback_repo, get_run_store logger = logging.getLogger(__name__) @@ -54,7 +53,6 @@ class FeedbackStatsResponse(BaseModel): @router.post("/{thread_id}/runs/{run_id}/feedback", response_model=FeedbackResponse) -@require_permission("threads", "write", owner_check=True, require_existing=True) async def create_feedback( thread_id: str, run_id: str, @@ -87,7 +85,6 @@ async def create_feedback( @router.get("/{thread_id}/runs/{run_id}/feedback", response_model=list[FeedbackResponse]) -@require_permission("threads", "read", owner_check=True) async def list_feedback( thread_id: str, run_id: str, @@ -99,7 +96,6 @@ async def list_feedback( @router.get("/{thread_id}/runs/{run_id}/feedback/stats", response_model=FeedbackStatsResponse) -@require_permission("threads", "read", owner_check=True) async def feedback_stats( thread_id: str, run_id: str, @@ -111,7 +107,6 @@ async def feedback_stats( @router.delete("/{thread_id}/runs/{run_id}/feedback/{feedback_id}") -@require_permission("threads", "delete", owner_check=True, require_existing=True) async def delete_feedback( thread_id: str, run_id: str, diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py index 71af2e653..983ae873c 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py @@ -164,6 +164,30 @@ Skip simple one-off tasks. """ +def _skill_mutability_label(category: str) -> str: + return "[custom, editable]" if category == "custom" else "[built-in]" + + +def clear_skills_system_prompt_cache() -> None: + _get_cached_skills_prompt_section.cache_clear() + + +def _build_skill_evolution_section(skill_evolution_enabled: bool) -> str: + if not skill_evolution_enabled: + return "" + return """ +## Skill Self-Evolution +After completing a task, consider creating or updating a skill when: +- The task required 5+ tool calls to resolve +- You overcame non-obvious errors or pitfalls +- The user corrected your approach and the corrected version worked +- You discovered a non-trivial, recurring workflow +If you used a skill and encountered issues not covered by it, patch it immediately. +Prefer patch over edit. Before creating a new skill, confirm with the user first. +Skip simple one-off tasks. +""" + + def _build_subagent_section(max_concurrent: int) -> str: """Build the subagent system prompt section with dynamic concurrency limit. diff --git a/backend/packages/harness/deerflow/persistence/feedback/sql.py b/backend/packages/harness/deerflow/persistence/feedback/sql.py index 903124953..eae2f9997 100644 --- a/backend/packages/harness/deerflow/persistence/feedback/sql.py +++ b/backend/packages/harness/deerflow/persistence/feedback/sql.py @@ -12,7 +12,6 @@ from sqlalchemy import case, func, select from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.feedback.model import FeedbackRow -from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_owner_id class FeedbackRepository: @@ -33,19 +32,18 @@ class FeedbackRepository: run_id: str, thread_id: str, rating: int, - owner_id: str | None | _AutoSentinel = AUTO, + owner_id: str | None = None, message_id: str | None = None, comment: str | None = None, ) -> dict: """Create a feedback record. rating must be +1 or -1.""" if rating not in (1, -1): raise ValueError(f"rating must be +1 or -1, got {rating}") - resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.create") row = FeedbackRow( feedback_id=str(uuid.uuid4()), run_id=run_id, thread_id=thread_id, - owner_id=resolved_owner_id, + owner_id=owner_id, message_id=message_id, rating=rating, comment=comment, @@ -57,67 +55,28 @@ class FeedbackRepository: await session.refresh(row) return self._row_to_dict(row) - async def get( - self, - feedback_id: str, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> dict | None: - resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.get") + async def get(self, feedback_id: str) -> dict | None: async with self._sf() as session: row = await session.get(FeedbackRow, feedback_id) - if row is None: - return None - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return None - return self._row_to_dict(row) + return self._row_to_dict(row) if row else None - async def list_by_run( - self, - thread_id: str, - run_id: str, - *, - limit: int = 100, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> list[dict]: - resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.list_by_run") - stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id, FeedbackRow.run_id == run_id) - if resolved_owner_id is not None: - stmt = stmt.where(FeedbackRow.owner_id == resolved_owner_id) - stmt = stmt.order_by(FeedbackRow.created_at.asc()).limit(limit) + async def list_by_run(self, thread_id: str, run_id: str, *, limit: int = 100) -> list[dict]: + stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id, FeedbackRow.run_id == run_id).order_by(FeedbackRow.created_at.asc()).limit(limit) async with self._sf() as session: result = await session.execute(stmt) return [self._row_to_dict(r) for r in result.scalars()] - async def list_by_thread( - self, - thread_id: str, - *, - limit: int = 100, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> list[dict]: - resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.list_by_thread") - stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id) - if resolved_owner_id is not None: - stmt = stmt.where(FeedbackRow.owner_id == resolved_owner_id) - stmt = stmt.order_by(FeedbackRow.created_at.asc()).limit(limit) + async def list_by_thread(self, thread_id: str, *, limit: int = 100) -> list[dict]: + stmt = select(FeedbackRow).where(FeedbackRow.thread_id == thread_id).order_by(FeedbackRow.created_at.asc()).limit(limit) async with self._sf() as session: result = await session.execute(stmt) return [self._row_to_dict(r) for r in result.scalars()] - async def delete( - self, - feedback_id: str, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> bool: - resolved_owner_id = resolve_owner_id(owner_id, method_name="FeedbackRepository.delete") + async def delete(self, feedback_id: str) -> bool: async with self._sf() as session: row = await session.get(FeedbackRow, feedback_id) if row is None: return False - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return False await session.delete(row) await session.commit() return True diff --git a/backend/packages/harness/deerflow/persistence/run/sql.py b/backend/packages/harness/deerflow/persistence/run/sql.py index 5d8656509..fac88d968 100644 --- a/backend/packages/harness/deerflow/persistence/run/sql.py +++ b/backend/packages/harness/deerflow/persistence/run/sql.py @@ -16,7 +16,6 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.run.model import RunRow from deerflow.runtime.runs.store.base import RunStore -from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_owner_id class RunRepository(RunStore): @@ -69,7 +68,7 @@ class RunRepository(RunStore): *, thread_id, assistant_id=None, - owner_id: str | None | _AutoSentinel = AUTO, + owner_id=None, status="pending", multitask_strategy="reject", metadata=None, @@ -78,13 +77,12 @@ class RunRepository(RunStore): created_at=None, follow_up_to_run_id=None, ): - resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.put") now = datetime.now(UTC) row = RunRow( run_id=run_id, thread_id=thread_id, assistant_id=assistant_id, - owner_id=resolved_owner_id, + owner_id=owner_id, status=status, multitask_strategy=multitask_strategy, metadata_json=self._safe_json(metadata) or {}, @@ -98,32 +96,15 @@ class RunRepository(RunStore): session.add(row) await session.commit() - async def get( - self, - run_id, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ): - resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.get") + async def get(self, run_id): async with self._sf() as session: row = await session.get(RunRow, run_id) - if row is None: - return None - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return None - return self._row_to_dict(row) + return self._row_to_dict(row) if row else None - async def list_by_thread( - self, - thread_id, - *, - owner_id: str | None | _AutoSentinel = AUTO, - limit=100, - ): - resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.list_by_thread") + async def list_by_thread(self, thread_id, *, owner_id=None, limit=100): stmt = select(RunRow).where(RunRow.thread_id == thread_id) - if resolved_owner_id is not None: - stmt = stmt.where(RunRow.owner_id == resolved_owner_id) + if owner_id is not None: + stmt = stmt.where(RunRow.owner_id == owner_id) stmt = stmt.order_by(RunRow.created_at.desc()).limit(limit) async with self._sf() as session: result = await session.execute(stmt) @@ -137,21 +118,12 @@ class RunRepository(RunStore): await session.execute(update(RunRow).where(RunRow.run_id == run_id).values(**values)) await session.commit() - async def delete( - self, - run_id, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ): - resolved_owner_id = resolve_owner_id(owner_id, method_name="RunRepository.delete") + async def delete(self, run_id): async with self._sf() as session: row = await session.get(RunRow, run_id) - if row is None: - return - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return - await session.delete(row) - await session.commit() + if row is not None: + await session.delete(row) + await session.commit() async def list_pending(self, *, before=None): if before is None: diff --git a/backend/packages/harness/deerflow/persistence/thread_meta/sql.py b/backend/packages/harness/deerflow/persistence/thread_meta/sql.py index 5a149e5d6..86c73030e 100644 --- a/backend/packages/harness/deerflow/persistence/thread_meta/sql.py +++ b/backend/packages/harness/deerflow/persistence/thread_meta/sql.py @@ -10,7 +10,6 @@ from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker from deerflow.persistence.thread_meta.base import ThreadMetaStore from deerflow.persistence.thread_meta.model import ThreadMetaRow -from deerflow.runtime.user_context import AUTO, _AutoSentinel, resolve_owner_id class ThreadMetaRepository(ThreadMetaStore): @@ -32,18 +31,15 @@ class ThreadMetaRepository(ThreadMetaStore): thread_id: str, *, assistant_id: str | None = None, - owner_id: str | None | _AutoSentinel = AUTO, + owner_id: str | None = None, display_name: str | None = None, metadata: dict | None = None, ) -> dict: - # Auto-resolve owner_id from contextvar when AUTO; explicit None - # creates an orphan row (used by migration scripts). - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.create") now = datetime.now(UTC) row = ThreadMetaRow( thread_id=thread_id, assistant_id=assistant_id, - owner_id=resolved_owner_id, + owner_id=owner_id, display_name=display_name, metadata_json=metadata or {}, created_at=now, @@ -55,21 +51,10 @@ class ThreadMetaRepository(ThreadMetaStore): await session.refresh(row) return self._row_to_dict(row) - async def get( - self, - thread_id: str, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> dict | None: - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.get") + async def get(self, thread_id: str) -> dict | None: async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) - if row is None: - return None - # Enforce owner filter unless explicitly bypassed (owner_id=None). - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return None - return self._row_to_dict(row) + return self._row_to_dict(row) if row else None async def list_by_owner(self, owner_id: str, *, limit: int = 100, offset: int = 0) -> list[dict]: stmt = select(ThreadMetaRow).where(ThreadMetaRow.owner_id == owner_id).order_by(ThreadMetaRow.updated_at.desc()).limit(limit).offset(offset) @@ -77,32 +62,16 @@ class ThreadMetaRepository(ThreadMetaStore): result = await session.execute(stmt) return [self._row_to_dict(r) for r in result.scalars()] - async def check_access(self, thread_id: str, owner_id: str, *, require_existing: bool = False) -> bool: - """Check if ``owner_id`` has access to ``thread_id``. + async def check_access(self, thread_id: str, owner_id: str) -> bool: + """Check if owner_id has access to thread_id. - Two modes — one row, two distinct semantics depending on what - the caller is about to do: - - - ``require_existing=False`` (default, permissive): - Returns True for: row missing (untracked legacy thread), - ``row.owner_id`` is None (shared / pre-auth data), - or ``row.owner_id == owner_id``. Use for **read-style** - decorators where treating an untracked thread as accessible - preserves backward-compat. - - - ``require_existing=True`` (strict): - Returns True **only** when the row exists AND - (``row.owner_id == owner_id`` OR ``row.owner_id is None``). - Use for **destructive / mutating** decorators (DELETE, PATCH, - state-update) so a thread that has *already been deleted* - cannot be re-targeted by any caller — closing the - delete-idempotence cross-user gap where the row vanishing - made every other user appear to "own" it. + Returns True if: row doesn't exist (untracked thread), owner_id + is None on the row (shared thread), or owner_id matches. """ async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) if row is None: - return not require_existing + return True if row.owner_id is None: return True return row.owner_id == owner_id @@ -114,17 +83,9 @@ class ThreadMetaRepository(ThreadMetaStore): status: str | None = None, limit: int = 100, offset: int = 0, - owner_id: str | None | _AutoSentinel = AUTO, ) -> list[dict]: - """Search threads with optional metadata and status filters. - - Owner filter is enforced by default: caller must be in a user - context. Pass ``owner_id=None`` to bypass (migration/CLI). - """ - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.search") + """Search threads with optional metadata and status filters.""" stmt = select(ThreadMetaRow).order_by(ThreadMetaRow.updated_at.desc()) - if resolved_owner_id is not None: - stmt = stmt.where(ThreadMetaRow.owner_id == resolved_owner_id) if status: stmt = stmt.where(ThreadMetaRow.status == status) @@ -144,80 +105,36 @@ class ThreadMetaRepository(ThreadMetaStore): result = await session.execute(stmt) return [self._row_to_dict(r) for r in result.scalars()] - async def _check_ownership(self, session: AsyncSession, thread_id: str, resolved_owner_id: str | None) -> bool: - """Return True if the row exists and is owned (or filter bypassed).""" - if resolved_owner_id is None: - return True # explicit bypass - row = await session.get(ThreadMetaRow, thread_id) - return row is not None and row.owner_id == resolved_owner_id - - async def update_display_name( - self, - thread_id: str, - display_name: str, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> None: + async def update_display_name(self, thread_id: str, display_name: str) -> None: """Update the display_name (title) for a thread.""" - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.update_display_name") async with self._sf() as session: - if not await self._check_ownership(session, thread_id, resolved_owner_id): - return await session.execute(update(ThreadMetaRow).where(ThreadMetaRow.thread_id == thread_id).values(display_name=display_name, updated_at=datetime.now(UTC))) await session.commit() - async def update_status( - self, - thread_id: str, - status: str, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> None: - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.update_status") + async def update_status(self, thread_id: str, status: str) -> None: async with self._sf() as session: - if not await self._check_ownership(session, thread_id, resolved_owner_id): - return await session.execute(update(ThreadMetaRow).where(ThreadMetaRow.thread_id == thread_id).values(status=status, updated_at=datetime.now(UTC))) await session.commit() - async def update_metadata( - self, - thread_id: str, - metadata: dict, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> None: + async def update_metadata(self, thread_id: str, metadata: dict) -> None: """Merge ``metadata`` into ``metadata_json``. Read-modify-write inside a single session/transaction so concurrent - callers see consistent state. No-op if the row does not exist or - the owner_id check fails. + callers see consistent state. No-op if the row does not exist. """ - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.update_metadata") async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) if row is None: return - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return merged = dict(row.metadata_json or {}) merged.update(metadata) row.metadata_json = merged row.updated_at = datetime.now(UTC) await session.commit() - async def delete( - self, - thread_id: str, - *, - owner_id: str | None | _AutoSentinel = AUTO, - ) -> None: - resolved_owner_id = resolve_owner_id(owner_id, method_name="ThreadMetaRepository.delete") + async def delete(self, thread_id: str) -> None: async with self._sf() as session: row = await session.get(ThreadMetaRow, thread_id) - if row is None: - return - if resolved_owner_id is not None and row.owner_id != resolved_owner_id: - return - await session.delete(row) - await session.commit() + if row is not None: + await session.delete(row) + await session.commit() diff --git a/backend/packages/harness/deerflow/runtime/runs/worker.py b/backend/packages/harness/deerflow/runtime/runs/worker.py index efa306b0b..9335dc2a9 100644 --- a/backend/packages/harness/deerflow/runtime/runs/worker.py +++ b/backend/packages/harness/deerflow/runtime/runs/worker.py @@ -143,6 +143,35 @@ async def run_agent( content = human_msg.content journal.set_first_human_message(content if isinstance(content, str) else str(content)) + # Initialize RunJournal for event capture + journal = None + if event_store is not None: + from deerflow.runtime.journal import RunJournal + + journal = RunJournal( + run_id=run_id, + thread_id=thread_id, + event_store=event_store, + track_token_usage=getattr(run_events_config, "track_token_usage", True), + ) + + # Write human_message event (model_dump format, aligned with checkpoint) + human_msg = _extract_human_message(graph_input) + if human_msg is not None: + msg_metadata = {} + if follow_up_to_run_id: + msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id + await event_store.put( + thread_id=thread_id, + run_id=run_id, + event_type="human_message", + category="message", + content=human_msg.model_dump(), + metadata=msg_metadata or None, + ) + content = human_msg.content + journal.set_first_human_message(content if isinstance(content, str) else str(content)) + # Track whether "events" was requested but skipped if "events" in requested_modes: logger.info(