From 74dc663c23014fc5880b4ac85d9a0ecf05ba7a75 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sat, 4 Apr 2026 08:56:13 +0800 Subject: [PATCH] fix(events): use metadata flag instead of heuristic for dict content detection Co-Authored-By: Claude Opus 4.6 (1M context) --- .../deerflow/runtime/events/store/db.py | 18 +++++-- backend/tests/test_run_journal.py | 52 +++++++++++++++++++ 2 files changed, 65 insertions(+), 5 deletions(-) diff --git a/backend/packages/harness/deerflow/runtime/events/store/db.py b/backend/packages/harness/deerflow/runtime/events/store/db.py index 5e4808794..ce798813f 100644 --- a/backend/packages/harness/deerflow/runtime/events/store/db.py +++ b/backend/packages/harness/deerflow/runtime/events/store/db.py @@ -30,10 +30,10 @@ class DbRunEventStore(RunEventStore): d["created_at"] = val.isoformat() d.pop("id", None) # Restore dict content that was JSON-serialized on write - content = d.get("content", "") - if isinstance(content, str) and content and content[0] in ("{", "["): + raw = d.get("content", "") + if isinstance(raw, str) and d.get("metadata", {}).get("content_is_dict"): try: - d["content"] = json.loads(content) + d["content"] = json.loads(raw) except (json.JSONDecodeError, ValueError): pass return d @@ -48,7 +48,11 @@ class DbRunEventStore(RunEventStore): async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None): content, metadata = self._truncate_trace(category, content, metadata) - db_content = json.dumps(content, default=str, ensure_ascii=False) if isinstance(content, dict) else content + if isinstance(content, dict): + db_content = json.dumps(content, default=str, ensure_ascii=False) + metadata = {**(metadata or {}), "content_is_dict": True} + else: + db_content = content async with self._sf() as session: max_seq = await session.scalar(select(func.max(RunEventRow.seq)).where(RunEventRow.thread_id == thread_id)) seq = (max_seq or 0) + 1 @@ -82,7 +86,11 @@ class DbRunEventStore(RunEventStore): category = e.get("category", "trace") metadata = e.get("metadata") content, metadata = self._truncate_trace(category, content, metadata) - db_content = json.dumps(content, default=str, ensure_ascii=False) if isinstance(content, dict) else content + if isinstance(content, dict): + db_content = json.dumps(content, default=str, ensure_ascii=False) + metadata = {**(metadata or {}), "content_is_dict": True} + else: + db_content = content row = RunEventRow( thread_id=e["thread_id"], run_id=e["run_id"], diff --git a/backend/tests/test_run_journal.py b/backend/tests/test_run_journal.py index b87350cac..8200f6f39 100644 --- a/backend/tests/test_run_journal.py +++ b/backend/tests/test_run_journal.py @@ -419,6 +419,58 @@ class TestDbBackedLifecycle: await close_engine() +class TestDictContentFlag: + """Verify that content_is_dict metadata flag controls deserialization.""" + + @pytest.mark.anyio + async def test_db_store_str_starting_with_brace_not_deserialized(self, tmp_path): + """Plain string content starting with { should NOT be deserialized.""" + from deerflow.persistence.engine import close_engine, get_session_factory, init_engine + from deerflow.runtime.events.store.db import DbRunEventStore + + url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}" + await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path)) + sf = get_session_factory() + store = DbRunEventStore(sf) + + record = await store.put( + thread_id="t1", + run_id="r1", + event_type="tool_end", + category="trace", + content="{not json, just a string}", + ) + events = await store.list_events("t1", "r1") + assert events[0]["content"] == "{not json, just a string}" + assert isinstance(events[0]["content"], str) + + await close_engine() + + @pytest.mark.anyio + async def test_db_store_str_starting_with_bracket_not_deserialized(self, tmp_path): + """Plain string content like '[1, 2, 3]' should NOT be deserialized.""" + from deerflow.persistence.engine import close_engine, get_session_factory, init_engine + from deerflow.runtime.events.store.db import DbRunEventStore + + url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}" + await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path)) + sf = get_session_factory() + store = DbRunEventStore(sf) + + record = await store.put( + thread_id="t1", + run_id="r1", + event_type="tool_end", + category="trace", + content="[1, 2, 3]", + ) + events = await store.list_events("t1", "r1") + assert events[0]["content"] == "[1, 2, 3]" + assert isinstance(events[0]["content"], str) + + await close_engine() + + class TestDictContent: """Verify that store backends accept str | dict content."""