fix(rebase): remove duplicate definitions and update stale module paths

Rebase left duplicate function blocks in worker.py (triple human_message
write causing 3x user messages in /history), deps.py, and prompt.py.
Also update checkpointer imports from the old deerflow.agents.checkpointer
path to deerflow.runtime.checkpointer, and clean up orphaned feedback
props in the frontend message components.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
rayhpeng 2026-04-12 09:30:39 +08:00
parent 3580897c56
commit 500cdfc8e4
8 changed files with 9 additions and 147 deletions

View File

@ -137,36 +137,6 @@ def get_run_context(request: Request) -> RunContext:
)
async def get_current_user(request: Request) -> str | None:
"""Extract user identity from request.
Phase 2: always returns None (no authentication).
Phase 3: extract user_id from JWT / session / API key header.
"""
return None
get_thread_meta_repo = _require("thread_meta_repo", "Thread metadata store")
def get_run_context(request: Request) -> RunContext:
"""Build a :class:`RunContext` from ``app.state`` singletons.
Returns a *base* context with infrastructure dependencies. Callers that
need per-run fields (e.g. ``follow_up_to_run_id``) should use
``dataclasses.replace(ctx, follow_up_to_run_id=...)`` before passing it
to :func:`run_agent`.
"""
from deerflow.config import get_app_config
return RunContext(
checkpointer=get_checkpointer(request),
store=get_store(request),
event_store=get_run_event_store(request),
run_events_config=getattr(get_app_config(), "run_events", None),
thread_meta_repo=get_thread_meta_repo(request),
)
# ---------------------------------------------------------------------------
# Auth helpers (used by authz.py and auth middleware)

View File

@ -164,30 +164,6 @@ Skip simple one-off tasks.
"""
def _skill_mutability_label(category: str) -> str:
return "[custom, editable]" if category == "custom" else "[built-in]"
def clear_skills_system_prompt_cache() -> None:
_get_cached_skills_prompt_section.cache_clear()
def _build_skill_evolution_section(skill_evolution_enabled: bool) -> str:
if not skill_evolution_enabled:
return ""
return """
## Skill Self-Evolution
After completing a task, consider creating or updating a skill when:
- The task required 5+ tool calls to resolve
- You overcame non-obvious errors or pitfalls
- The user corrected your approach and the corrected version worked
- You discovered a non-trivial, recurring workflow
If you used a skill and encountered issues not covered by it, patch it immediately.
Prefer patch over edit. Before creating a new skill, confirm with the user first.
Skip simple one-off tasks.
"""
def _build_subagent_section(max_concurrent: int) -> str:
"""Build the subagent system prompt section with dynamic concurrency limit.

View File

@ -374,7 +374,7 @@ class DeerFlowClient:
"""
checkpointer = self._checkpointer
if checkpointer is None:
from deerflow.agents.checkpointer.provider import get_checkpointer
from deerflow.runtime.checkpointer.provider import get_checkpointer
checkpointer = get_checkpointer()
@ -429,7 +429,7 @@ class DeerFlowClient:
"""
checkpointer = self._checkpointer
if checkpointer is None:
from deerflow.agents.checkpointer.provider import get_checkpointer
from deerflow.runtime.checkpointer.provider import get_checkpointer
checkpointer = get_checkpointer()

View File

@ -85,64 +85,6 @@ async def run_agent(
pre_run_snapshot: dict[str, Any] | None = None
snapshot_capture_failed = False
# Initialize RunJournal for event capture
journal = None
if event_store is not None:
from deerflow.runtime.journal import RunJournal
journal = RunJournal(
run_id=run_id,
thread_id=thread_id,
event_store=event_store,
track_token_usage=getattr(run_events_config, "track_token_usage", True),
)
# Write human_message event (model_dump format, aligned with checkpoint)
human_msg = _extract_human_message(graph_input)
if human_msg is not None:
msg_metadata = {}
if follow_up_to_run_id:
msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id
await event_store.put(
thread_id=thread_id,
run_id=run_id,
event_type="human_message",
category="message",
content=human_msg.model_dump(),
metadata=msg_metadata or None,
)
content = human_msg.content
journal.set_first_human_message(content if isinstance(content, str) else str(content))
# Initialize RunJournal for event capture
journal = None
if event_store is not None:
from deerflow.runtime.journal import RunJournal
journal = RunJournal(
run_id=run_id,
thread_id=thread_id,
event_store=event_store,
track_token_usage=getattr(run_events_config, "track_token_usage", True),
)
# Write human_message event (model_dump format, aligned with checkpoint)
human_msg = _extract_human_message(graph_input)
if human_msg is not None:
msg_metadata = {}
if follow_up_to_run_id:
msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id
await event_store.put(
thread_id=thread_id,
run_id=run_id,
event_type="human_message",
category="message",
content=human_msg.model_dump(),
metadata=msg_metadata or None,
)
content = human_msg.content
journal.set_first_human_message(content if isinstance(content, str) else str(content))
journal = None
# Track whether "events" was requested but skipped

View File

@ -178,7 +178,7 @@ class TestAsyncCheckpointer:
@pytest.mark.anyio
async def test_sqlite_creates_parent_dir_via_to_thread(self):
"""Async SQLite setup should move mkdir off the event loop."""
from deerflow.agents.checkpointer.async_provider import make_checkpointer
from deerflow.runtime.checkpointer.async_provider import make_checkpointer
mock_config = MagicMock()
mock_config.checkpointer = CheckpointerConfig(type="sqlite", connection_string="relative/test.db")
@ -195,11 +195,11 @@ class TestAsyncCheckpointer:
mock_module.AsyncSqliteSaver = mock_saver_cls
with (
patch("deerflow.agents.checkpointer.async_provider.get_app_config", return_value=mock_config),
patch("deerflow.runtime.checkpointer.async_provider.get_app_config", return_value=mock_config),
patch.dict(sys.modules, {"langgraph.checkpoint.sqlite.aio": mock_module}),
patch("deerflow.agents.checkpointer.async_provider.asyncio.to_thread", new_callable=AsyncMock) as mock_to_thread,
patch("deerflow.runtime.checkpointer.async_provider.asyncio.to_thread", new_callable=AsyncMock) as mock_to_thread,
patch(
"deerflow.agents.checkpointer.async_provider.resolve_sqlite_conn_str",
"deerflow.runtime.checkpointer.async_provider.resolve_sqlite_conn_str",
return_value="/tmp/resolved/test.db",
),
):

View File

@ -1015,7 +1015,7 @@ class TestThreadQueries:
mock_checkpointer = MagicMock()
mock_checkpointer.list.return_value = []
with patch("deerflow.agents.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer):
with patch("deerflow.runtime.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer):
# No internal checkpointer, should fetch from provider
result = client.list_threads()
@ -1069,7 +1069,7 @@ class TestThreadQueries:
mock_checkpointer = MagicMock()
mock_checkpointer.list.return_value = []
with patch("deerflow.agents.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer):
with patch("deerflow.runtime.checkpointer.provider.get_checkpointer", return_value=mock_checkpointer):
result = client.get_thread("t99")
assert result["thread_id"] == "t99"

View File

@ -39,19 +39,11 @@ export function MessageListItem({
threadId,
message,
isLoading,
threadId,
}: {
className?: string;
threadId?: string;
threadId: string;
message: Message;
isLoading?: boolean;
threadId: string;
// ``feedback`` is ``undefined`` for messages that are not feedback-eligible
// (non-final AI messages, humans, tool results). It is ``null`` for the
// final ai_message of a run that has no rating yet, and a FeedbackData
// object once rated. The button renders whenever the field is present.
feedback?: FeedbackData | null;
runId?: string;
}) {
const isHuman = message.type === "human";
return (
@ -80,13 +72,6 @@ export function MessageListItem({
""
}
/>
{feedback !== undefined && runId && threadId && (
<FeedbackButtons
threadId={threadId}
runId={runId}
initialFeedback={feedback}
/>
)}
</div>
</MessageToolbar>
)}

View File

@ -18,7 +18,6 @@ import { useRehypeSplitWordsIntoSpans } from "@/core/rehype";
import type { Subtask } from "@/core/tasks";
import { useUpdateSubtask } from "@/core/tasks/context";
import type { AgentThreadState } from "@/core/threads";
import { useThreadMessageEnrichment } from "@/core/threads/hooks";
import { cn } from "@/lib/utils";
import { ArtifactFileList } from "../artifacts/artifact-file-list";
@ -48,7 +47,6 @@ export function MessageList({
const rehypePlugins = useRehypeSplitWordsIntoSpans(thread.isLoading);
const updateSubtask = useUpdateSubtask();
const messages = thread.messages;
const { data: enrichment } = useThreadMessageEnrichment(threadId);
if (thread.isThreadLoading && messages.length === 0) {
return <MessageListSkeleton />;
@ -61,21 +59,12 @@ export function MessageList({
{groupMessages(messages, (group) => {
if (group.type === "human" || group.type === "assistant") {
return group.messages.map((msg) => {
// Run id and feedback are sourced from the ``/history``
// enrichment query (see ``useThreadMessageEnrichment``). The
// map is keyed by ``message.id`` so tool_call interleavings
// and multi-run threads map cleanly without positional math.
// ``feedback`` is ``undefined`` for non-eligible messages,
// ``null`` for eligible-but-unrated, and an object once rated.
const entry = msg.id ? enrichment?.get(msg.id) : undefined;
return (
<MessageListItem
key={`${group.id}/${msg.id}`}
threadId={threadId}
message={msg}
isLoading={thread.isLoading}
runId={entry?.run_id}
feedback={entry?.feedback}
/>
);
});