From 500cdfc8e4317cbd1bbfdc3c1c25e37a9b41dc09 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sun, 12 Apr 2026 09:30:39 +0800 Subject: [PATCH] fix(rebase): remove duplicate definitions and update stale module paths Rebase left duplicate function blocks in worker.py (triple human_message write causing 3x user messages in /history), deps.py, and prompt.py. Also update checkpointer imports from the old deerflow.agents.checkpointer path to deerflow.runtime.checkpointer, and clean up orphaned feedback props in the frontend message components. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/deps.py | 30 ---------- .../deerflow/agents/lead_agent/prompt.py | 24 -------- backend/packages/harness/deerflow/client.py | 4 +- .../harness/deerflow/runtime/runs/worker.py | 58 ------------------- backend/tests/test_checkpointer.py | 8 +-- backend/tests/test_client.py | 4 +- .../workspace/messages/message-list-item.tsx | 17 +----- .../workspace/messages/message-list.tsx | 11 ---- 8 files changed, 9 insertions(+), 147 deletions(-) diff --git a/backend/app/gateway/deps.py b/backend/app/gateway/deps.py index afdfea15e..f4fdad473 100644 --- a/backend/app/gateway/deps.py +++ b/backend/app/gateway/deps.py @@ -137,36 +137,6 @@ def get_run_context(request: Request) -> RunContext: ) -async def get_current_user(request: Request) -> str | None: - """Extract user identity from request. - - Phase 2: always returns None (no authentication). - Phase 3: extract user_id from JWT / session / API key header. - """ - return None - - -get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store") - - -def get_run_context(request: Request) -> RunContext: - """Build a :class:`RunContext` from ``app.state`` singletons. - - Returns a *base* context with infrastructure dependencies. Callers that - need per-run fields (e.g. ``follow_up_to_run_id``) should use - ``dataclasses.replace(ctx, follow_up_to_run_id=...)`` before passing it - to :func:`run_agent`. - """ - from deerflow.config import get_app_config - - return RunContext( - checkpointer=get_checkpointer(request), - store=get_store(request), - event_store=get_run_event_store(request), - run_events_config=getattr(get_app_config(), "run_events", None), - thread_meta_repo=get_thread_meta_repo(request), - ) - # --------------------------------------------------------------------------- # Auth helpers (used by authz.py and auth middleware) diff --git a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py index 983ae873c..71af2e653 100644 --- a/backend/packages/harness/deerflow/agents/lead_agent/prompt.py +++ b/backend/packages/harness/deerflow/agents/lead_agent/prompt.py @@ -164,30 +164,6 @@ Skip simple one-off tasks. """ -def _skill_mutability_label(category: str) -> str: - return "[custom, editable]" if category == "custom" else "[built-in]" - - -def clear_skills_system_prompt_cache() -> None: - _get_cached_skills_prompt_section.cache_clear() - - -def _build_skill_evolution_section(skill_evolution_enabled: bool) -> str: - if not skill_evolution_enabled: - return "" - return """ -## Skill Self-Evolution -After completing a task, consider creating or updating a skill when: -- The task required 5+ tool calls to resolve -- You overcame non-obvious errors or pitfalls -- The user corrected your approach and the corrected version worked -- You discovered a non-trivial, recurring workflow -If you used a skill and encountered issues not covered by it, patch it immediately. -Prefer patch over edit. Before creating a new skill, confirm with the user first. -Skip simple one-off tasks. -""" - - def _build_subagent_section(max_concurrent: int) -> str: """Build the subagent system prompt section with dynamic concurrency limit. diff --git a/backend/packages/harness/deerflow/client.py b/backend/packages/harness/deerflow/client.py index 996625a22..950fdb085 100644 --- a/backend/packages/harness/deerflow/client.py +++ b/backend/packages/harness/deerflow/client.py @@ -374,7 +374,7 @@ class DeerFlowClient: """ checkpointer = self._checkpointer if checkpointer is None: - from deerflow.agents.checkpointer.provider import get_checkpointer + from deerflow.runtime.checkpointer.provider import get_checkpointer checkpointer = get_checkpointer() @@ -429,7 +429,7 @@ class DeerFlowClient: """ checkpointer = self._checkpointer if checkpointer is None: - from deerflow.agents.checkpointer.provider import get_checkpointer + from deerflow.runtime.checkpointer.provider import get_checkpointer checkpointer = get_checkpointer() diff --git a/backend/packages/harness/deerflow/runtime/runs/worker.py b/backend/packages/harness/deerflow/runtime/runs/worker.py index 5431c0e72..74581a275 100644 --- a/backend/packages/harness/deerflow/runtime/runs/worker.py +++ b/backend/packages/harness/deerflow/runtime/runs/worker.py @@ -85,64 +85,6 @@ async def run_agent( pre_run_snapshot: dict[str, Any] | None = None snapshot_capture_failed = False - # Initialize RunJournal for event capture - journal = None - if event_store is not None: - from deerflow.runtime.journal import RunJournal - - journal = RunJournal( - run_id=run_id, - thread_id=thread_id, - event_store=event_store, - track_token_usage=getattr(run_events_config, "track_token_usage", True), - ) - - # Write human_message event (model_dump format, aligned with checkpoint) - human_msg = _extract_human_message(graph_input) - if human_msg is not None: - msg_metadata = {} - if follow_up_to_run_id: - msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id - await event_store.put( - thread_id=thread_id, - run_id=run_id, - event_type="human_message", - category="message", - content=human_msg.model_dump(), - metadata=msg_metadata or None, - ) - content = human_msg.content - journal.set_first_human_message(content if isinstance(content, str) else str(content)) - - # Initialize RunJournal for event capture - journal = None - if event_store is not None: - from deerflow.runtime.journal import RunJournal - - journal = RunJournal( - run_id=run_id, - thread_id=thread_id, - event_store=event_store, - track_token_usage=getattr(run_events_config, "track_token_usage", True), - ) - - # Write human_message event (model_dump format, aligned with checkpoint) - human_msg = _extract_human_message(graph_input) - if human_msg is not None: - msg_metadata = {} - if follow_up_to_run_id: - msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id - await event_store.put( - thread_id=thread_id, - run_id=run_id, - event_type="human_message", - category="message", - content=human_msg.model_dump(), - metadata=msg_metadata or None, - ) - content = human_msg.content - journal.set_first_human_message(content if isinstance(content, str) else str(content)) - journal = None # Track whether "events" was requested but skipped diff --git a/backend/tests/test_checkpointer.py b/backend/tests/test_checkpointer.py index 81ac1fd9a..58f57237e 100644 --- a/backend/tests/test_checkpointer.py +++ b/backend/tests/test_checkpointer.py @@ -178,7 +178,7 @@ class TestAsyncCheckpointer: @pytest.mark.anyio async def test_sqlite_creates_parent_dir_via_to_thread(self): """Async SQLite setup should move mkdir off the event loop.""" - from deerflow.agents.checkpointer.async_provider import make_checkpointer + from deerflow.runtime.checkpointer.async_provider import make_checkpointer mock_config = MagicMock() mock_config.checkpointer = CheckpointerConfig(type="sqlite", connection_string="relative/test.db") @@ -195,11 +195,11 @@ class TestAsyncCheckpointer: mock_module.AsyncSqliteSaver = mock_saver_cls with ( - patch("deerflow.agents.checkpointer.async_provider.get_app_config", return_value=mock_config), + patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config), patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}), - patch("deerflow.agents.checkpointer.async_provider.asyncio.to_thread", new_callable=AsyncMock) as mock_to_thread, + patch("deerflow.runtime.checkpointer.async_provider.asyncio.to_thread", new_callable=AsyncMock) as mock_to_thread, patch( - "deerflow.agents.checkpointer.async_provider.resolve_sqlite_conn_str", + "deerflow.runtime.checkpointer.async_provider.resolve_sqlite_conn_str", return_value="/tmp/resolved/test.db", ), ): diff --git a/backend/tests/test_client.py b/backend/tests/test_client.py index 579b13a47..a9b854e8e 100644 --- a/backend/tests/test_client.py +++ b/backend/tests/test_client.py @@ -1015,7 +1015,7 @@ class TestThreadQueries: mock_checkpointer = MagicMock() mock_checkpointer.list.return_value = [] - with patch("deerflow.agents.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer): + with patch("deerflow.runtime.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer): # No internal checkpointer, should fetch from provider result = client.list_threads() @@ -1069,7 +1069,7 @@ class TestThreadQueries: mock_checkpointer = MagicMock() mock_checkpointer.list.return_value = [] - with patch("deerflow.agents.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer): + with patch("deerflow.runtime.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer): result = client.get_thread("t99") assert result["thread_id"] == "t99" diff --git a/frontend/src/components/workspace/messages/message-list-item.tsx b/frontend/src/components/workspace/messages/message-list-item.tsx index 6c3dd48f0..c9583f7a5 100644 --- a/frontend/src/components/workspace/messages/message-list-item.tsx +++ b/frontend/src/components/workspace/messages/message-list-item.tsx @@ -39,19 +39,11 @@ export function MessageListItem({ threadId, message, isLoading, - threadId, }: { className?: string; - threadId?: string; + threadId: string; message: Message; isLoading?: boolean; - threadId: string; - // ``feedback`` is ``undefined`` for messages that are not feedback-eligible - // (non-final AI messages, humans, tool results). It is ``null`` for the - // final ai_message of a run that has no rating yet, and a FeedbackData - // object once rated. The button renders whenever the field is present. - feedback?: FeedbackData | null; - runId?: string; }) { const isHuman = message.type === "human"; return ( @@ -80,13 +72,6 @@ export function MessageListItem({ "" } /> - {feedback !== undefined && runId && threadId && ( - - )} )} diff --git a/frontend/src/components/workspace/messages/message-list.tsx b/frontend/src/components/workspace/messages/message-list.tsx index 9ef917e8d..cd167f8e4 100644 --- a/frontend/src/components/workspace/messages/message-list.tsx +++ b/frontend/src/components/workspace/messages/message-list.tsx @@ -18,7 +18,6 @@ import { useRehypeSplitWordsIntoSpans } from "@/core/rehype"; import type { Subtask } from "@/core/tasks"; import { useUpdateSubtask } from "@/core/tasks/context"; import type { AgentThreadState } from "@/core/threads"; -import { useThreadMessageEnrichment } from "@/core/threads/hooks"; import { cn } from "@/lib/utils"; import { ArtifactFileList } from "../artifacts/artifact-file-list"; @@ -48,7 +47,6 @@ export function MessageList({ const rehypePlugins = useRehypeSplitWordsIntoSpans(thread.isLoading); const updateSubtask = useUpdateSubtask(); const messages = thread.messages; - const { data: enrichment } = useThreadMessageEnrichment(threadId); if (thread.isThreadLoading && messages.length === 0) { return ; @@ -61,21 +59,12 @@ export function MessageList({ {groupMessages(messages, (group) => { if (group.type === "human" || group.type === "assistant") { return group.messages.map((msg) => { - // Run id and feedback are sourced from the ``/history`` - // enrichment query (see ``useThreadMessageEnrichment``). The - // map is keyed by ``message.id`` so tool_call interleavings - // and multi-run threads map cleanly without positional math. - // ``feedback`` is ``undefined`` for non-eligible messages, - // ``null`` for eligible-but-unrated, and an object once rated. - const entry = msg.id ? enrichment?.get(msg.id) : undefined; return ( ); });