deer-flow/backend/tests/test_run_journal.py
rayhpeng 185f5649dd feat(persistence): add unified persistence layer with event store, token tracking, and feedback (#1930)
* feat(persistence): add SQLAlchemy 2.0 async ORM scaffold

Introduce a unified database configuration (DatabaseConfig) that
controls both the LangGraph checkpointer and the DeerFlow application
persistence layer from a single `database:` config section.

New modules:
- deerflow.config.database_config — Pydantic config with memory/sqlite/postgres backends
- deerflow.persistence — async engine lifecycle, DeclarativeBase with to_dict mixin, Alembic skeleton
- deerflow.runtime.runs.store — RunStore ABC + MemoryRunStore implementation

Gateway integration initializes/tears down the persistence engine in
the existing langgraph_runtime() context manager. Legacy checkpointer
config is preserved for backward compatibility.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(persistence): add RunEventStore ABC + MemoryRunEventStore

Phase 2-A prerequisite for event storage: adds the unified run event
stream interface (RunEventStore) with an in-memory implementation,
RunEventsConfig, gateway integration, and comprehensive tests (27 cases).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(persistence): add ORM models, repositories, DB/JSONL event stores, RunJournal, and API endpoints

Phase 2-B: run persistence + event storage + token tracking.

- ORM models: RunRow (with token fields), ThreadMetaRow, RunEventRow
- RunRepository implements RunStore ABC via SQLAlchemy ORM
- ThreadMetaRepository with owner access control
- DbRunEventStore with trace content truncation and cursor pagination
- JsonlRunEventStore with per-run files and seq recovery from disk
- RunJournal (BaseCallbackHandler) captures LLM/tool/lifecycle events,
  accumulates token usage by caller type, buffers and flushes to store
- RunManager now accepts optional RunStore for persistent backing
- Worker creates RunJournal, writes human_message, injects callbacks
- Gateway deps use factory functions (RunRepository when DB available)
- New endpoints: messages, run messages, run events, token-usage
- ThreadCreateRequest gains assistant_id field
- 92 tests pass (33 new), zero regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(persistence): add user feedback + follow-up run association

Phase 2-C: feedback and follow-up tracking.

- FeedbackRow ORM model (rating +1/-1, optional message_id, comment)
- FeedbackRepository with CRUD, list_by_run/thread, aggregate stats
- Feedback API endpoints: create, list, stats, delete
- follow_up_to_run_id in RunCreateRequest (explicit or auto-detected
  from latest successful run on the thread)
- Worker writes follow_up_to_run_id into human_message event metadata
- Gateway deps: feedback_repo factory + getter
- 17 new tests (14 FeedbackRepository + 3 follow-up association)
- 109 total tests pass, zero regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test+config: comprehensive Phase 2 test coverage + deprecate checkpointer config

- config.example.yaml: deprecate standalone checkpointer section, activate
  unified database:sqlite as default (drives both checkpointer + app data)
- New: test_thread_meta_repo.py (14 tests) — full ThreadMetaRepository coverage
  including check_access owner logic, list_by_owner pagination
- Extended test_run_repository.py (+4 tests) — completion preserves fields,
  list ordering desc, limit, owner_none returns all
- Extended test_run_journal.py (+8 tests) — on_chain_error, track_tokens=false,
  middleware no ai_message, unknown caller tokens, convenience fields,
  tool_error, non-summarization custom event
- Extended test_run_event_store.py (+7 tests) — DB batch seq continuity,
  make_run_event_store factory (memory/db/jsonl/fallback/unknown)
- Extended test_phase2b_integration.py (+4 tests) — create_or_reject persists,
  follow-up metadata, summarization in history, full DB-backed lifecycle
- Fixed DB integration test to use proper fake objects (not MagicMock)
  for JSON-serializable metadata
- 157 total Phase 2 tests pass, zero regressions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* config: move default sqlite_dir to .deer-flow/data

Keep SQLite databases alongside other DeerFlow-managed data
(threads, memory) under the .deer-flow/ directory instead of a
top-level ./data folder.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(persistence): remove UTFJSON, use engine-level json_serializer + datetime.now()

- Replace custom UTFJSON type with standard sqlalchemy.JSON in all ORM
  models. Add json_serializer=json.dumps(ensure_ascii=False) to all
  create_async_engine calls so non-ASCII text (Chinese etc.) is stored
  as-is in both SQLite and Postgres.
- Change ORM datetime defaults from datetime.now(UTC) to datetime.now(),
  remove UTC imports.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(gateway): simplify deps.py with getter factory + inline repos

- Replace 6 identical getter functions with _require() factory.
- Inline 3 _make_*_repo() factories into langgraph_runtime(), call
  get_session_factory() once instead of 3 times.
- Add thread_meta upsert in start_run (services.py).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(docker): add UV_EXTRAS build arg for optional dependencies

Support installing optional dependency groups (e.g. postgres) at
Docker build time via UV_EXTRAS build arg:
  UV_EXTRAS=postgres docker compose build

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(journal): fix flush, token tracking, and consolidate tests

RunJournal fixes:
- _flush_sync: retain events in buffer when no event loop instead of
  dropping them; worker's finally block flushes via async flush().
- on_llm_end: add tool_calls filter and caller=="lead_agent" guard for
  ai_message events; mark message IDs for dedup with record_llm_usage.
- worker.py: persist completion data (tokens, message count) to RunStore
  in finally block.

Model factory:
- Auto-inject stream_usage=True for BaseChatOpenAI subclasses with
  custom api_base, so usage_metadata is populated in streaming responses.

Test consolidation:
- Delete test_phase2b_integration.py (redundant with existing tests).
- Move DB-backed lifecycle test into test_run_journal.py.
- Add tests for stream_usage injection in test_model_factory.py.
- Clean up executor/task_tool dead journal references.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): widen content type to str|dict in all store backends

Allow event content to be a dict (for structured OpenAI-format messages)
in addition to plain strings. Dict values are JSON-serialized for the DB
backend and deserialized on read; memory and JSONL backends handle dicts
natively. Trace truncation now serializes dicts to JSON before measuring.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(events): use metadata flag instead of heuristic for dict content detection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(converters): add LangChain-to-OpenAI message format converters

Pure functions langchain_to_openai_message, langchain_to_openai_completion,
langchain_messages_to_openai, and _infer_finish_reason for converting
LangChain BaseMessage objects to OpenAI Chat Completions format, used by
RunJournal for event storage. 15 unit tests added.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(converters): handle empty list content as null, clean up test

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): human_message content uses OpenAI user message format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(events): ai_message uses OpenAI format, add ai_tool_call message event

- ai_message content now uses {"role": "assistant", "content": "..."} format
- New ai_tool_call message event emitted when lead_agent LLM responds with tool_calls
- ai_tool_call uses langchain_to_openai_message converter for consistent format
- Both events include finish_reason in metadata ("stop" or "tool_calls")

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): add tool_result message event with OpenAI tool message format

Cache tool_call_id from on_tool_start keyed by run_id as fallback for on_tool_end,
then emit a tool_result message event (role=tool, tool_call_id, content) after each
successful tool completion.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(events): summary content uses OpenAI system message format

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): replace llm_start/llm_end with llm_request/llm_response in OpenAI format

Add on_chat_model_start to capture structured prompt messages as llm_request events.
Replace llm_end trace events with llm_response using OpenAI Chat Completions format.
Track llm_call_index to pair request/response events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(events): add record_middleware method for middleware trace events

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test(events): add full run sequence integration test for OpenAI content format

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* feat(events): align message events with checkpoint format and add middleware tag injection

- Message events (ai_message, ai_tool_call, tool_result, human_message) now use
  BaseMessage.model_dump() format, matching LangGraph checkpoint values.messages
- on_tool_end extracts tool_call_id/name/status from ToolMessage objects
- on_tool_error now emits tool_result message events with error status
- record_middleware uses middleware:{tag} event_type and middleware category
- Summarization custom events use middleware:summarize category
- TitleMiddleware injects middleware:title tag via get_config() inheritance
- SummarizationMiddleware model bound with middleware:summarize tag
- Worker writes human_message using HumanMessage.model_dump()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(threads): switch search endpoint to threads_meta table and sync title

- POST /api/threads/search now queries threads_meta table directly,
  removing the two-phase Store + Checkpointer scan approach
- Add ThreadMetaRepository.search() with metadata/status filters
- Add ThreadMetaRepository.update_display_name() for title sync
- Worker syncs checkpoint title to threads_meta.display_name on run completion
- Map display_name to values.title in search response for API compatibility

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat(threads): history endpoint reads messages from event store

- POST /api/threads/{thread_id}/history now combines two data sources:
  checkpointer for checkpoint_id, metadata, title, thread_data;
  event store for messages (complete history, not truncated by summarization)
- Strip internal LangGraph metadata keys from response
- Remove full channel_values serialization in favor of selective fields

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: remove duplicate optional-dependencies header in pyproject.toml

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(middleware): pass tagged config to TitleMiddleware ainvoke call

Without the config, the middleware:title tag was not injected,
causing the LLM response to be recorded as a lead_agent ai_message
in run_events.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: resolve merge conflict in .env.example

Keep both DATABASE_URL (from persistence-scaffold) and WECOM
credentials (from main) after the merge.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(persistence): address review feedback on PR #1851

- Fix naive datetime.now() → datetime.now(UTC) in all ORM models
- Fix seq race condition in DbRunEventStore.put() with FOR UPDATE
  and UNIQUE(thread_id, seq) constraint
- Encapsulate _store access in RunManager.update_run_completion()
- Deduplicate _store.put() logic in RunManager via _persist_to_store()
- Add update_run_completion to RunStore ABC + MemoryRunStore
- Wire follow_up_to_run_id through the full create path
- Add error recovery to RunJournal._flush_sync() lost-event scenario
- Add migration note for search_threads breaking change
- Fix test_checkpointer_none_fix mock to set database=None

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* chore: update uv.lock

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(persistence): address 22 review comments from CodeQL, Copilot, and Code Quality

Bug fixes:
- Sanitize log params to prevent log injection (CodeQL)
- Reset threads_meta.status to idle/error when run completes
- Attach messages only to latest checkpoint in /history response
- Write threads_meta on POST /threads so new threads appear in search

Lint fixes:
- Remove unused imports (journal.py, migrations/env.py, test_converters.py)
- Convert lambda to named function (engine.py, Ruff E731)
- Remove unused logger definitions in repos (Ruff F841)
- Add logging to JSONL decode errors and empty except blocks
- Separate assert side-effects in tests (CodeQL)
- Remove unused local variables in tests (Ruff F841)
- Fix max_trace_content truncation to use byte length, not char length

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* style: apply ruff format to persistence and runtime files

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Potential fix for pull request finding 'Statement has no effect'

Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>

* refactor(runtime): introduce RunContext to reduce run_agent parameter bloat

Extract checkpointer, store, event_store, run_events_config, thread_meta_repo,
and follow_up_to_run_id into a frozen RunContext dataclass. Add get_run_context()
in deps.py to build the base context from app.state singletons. start_run() uses
dataclasses.replace() to enrich per-run fields before passing ctx to run_agent.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(gateway): move sanitize_log_param to app/gateway/utils.py

Extract the log-injection sanitizer from routers/threads.py into a shared
utils module and rename to sanitize_log_param (public API). Eliminates the
reverse service → router import in services.py.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* perf: use SQL aggregation for feedback stats and thread token usage

Replace Python-side counting in FeedbackRepository.aggregate_by_run with
a single SELECT COUNT/SUM query. Add RunStore.aggregate_tokens_by_thread
abstract method with SQL GROUP BY implementation in RunRepository and
Python fallback in MemoryRunStore. Simplify the thread_token_usage
endpoint to delegate to the new method, eliminating the limit=10000
truncation risk.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* docs: annotate DbRunEventStore.put() as low-frequency path

Add docstring clarifying that put() opens a per-call transaction with
FOR UPDATE and should only be used for infrequent writes (currently
just the initial human_message event). High-throughput callers should
use put_batch() instead.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(threads): fall back to Store search when ThreadMetaRepository is unavailable

When database.backend=memory (default) or no SQL session factory is
configured, search_threads now queries the LangGraph Store instead of
returning 503. Returns empty list if neither Store nor repo is available.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(persistence): introduce ThreadMetaStore ABC for backend-agnostic thread metadata

Add ThreadMetaStore abstract base class with create/get/search/update/delete
interface. ThreadMetaRepository (SQL) now inherits from it. New
MemoryThreadMetaStore wraps LangGraph BaseStore for memory-mode deployments.

deps.py now always provides a non-None thread_meta_repo, eliminating all
`if thread_meta_repo is not None` guards in services.py, worker.py, and
routers/threads.py. search_threads no longer needs a Store fallback branch.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(history): read messages from checkpointer instead of RunEventStore

The /history endpoint now reads messages directly from the
checkpointer's channel_values (the authoritative source) instead of
querying RunEventStore.list_messages(). The RunEventStore API is
preserved for other consumers.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(persistence): address new Copilot review comments

- feedback.py: validate thread_id/run_id before deleting feedback
- jsonl.py: add path traversal protection with ID validation
- run_repo.py: parse `before` to datetime for PostgreSQL compat
- thread_meta_repo.py: fix pagination when metadata filter is active
- database_config.py: use resolve_path for sqlite_dir consistency

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Implement skill self-evolution and skill_manage flow (#1874)

* chore: ignore .worktrees directory

* Add skill_manage self-evolution flow

* Fix CI regressions for skill_manage

* Address PR review feedback for skill evolution

* fix(skill-evolution): preserve history on delete

* fix(skill-evolution): tighten scanner fallbacks

* docs: add skill_manage e2e evidence screenshot

* fix(skill-manage): avoid blocking fs ops in session runtime

---------

Co-authored-by: Willem Jiang <willem.jiang@gmail.com>

* fix(config): resolve sqlite_dir relative to CWD, not Paths.base_dir

resolve_path() resolves relative to Paths.base_dir (.deer-flow),
which double-nested the path to .deer-flow/.deer-flow/data/app.db.
Use Path.resolve() (CWD-relative) instead.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Feature/feishu receive file (#1608)

* feat(feishu): add channel file materialization hook for inbound messages

- Introduce Channel.receive_file(msg, thread_id) as a base method for file materialization; default is no-op.
- Implement FeishuChannel.receive_file to download files/images from Feishu messages, save to sandbox, and inject virtual paths into msg.text.
- Update ChannelManager to call receive_file for any channel if msg.files is present, enabling downstream model access to user-uploaded files.
- No impact on Slack/Telegram or other channels (they inherit the default no-op).

* style(backend): format code with ruff for lint compliance

- Auto-formatted packages/harness/deerflow/agents/factory.py and tests/test_create_deerflow_agent.py using `ruff format`
- Ensured both files conform to project linting standards
- Fixes CI lint check failures caused by code style issues

* fix(feishu): handle file write operation asynchronously to prevent blocking

* fix(feishu): rename GetMessageResourceRequest to _GetMessageResourceRequest and remove redundant code

* test(feishu): add tests for receive_file method and placeholder replacement

* fix(manager): remove unnecessary type casting for channel retrieval

* fix(feishu): update logging messages to reflect resource handling instead of image

* fix(feishu): sanitize filename by replacing invalid characters in file uploads

* fix(feishu): improve filename sanitization and reorder image key handling in message processing

* fix(feishu): add thread lock to prevent filename conflicts during file downloads

* fix(test): correct bad merge in test_feishu_parser.py

* chore: run ruff and apply formatting cleanup
fix(feishu): preserve rich-text attachment order and improve fallback filename handling

* fix(docker): restore gateway env vars and fix langgraph empty arg issue (#1915)

Two production docker-compose.yaml bugs prevent `make up` from working:

1. Gateway missing DEER_FLOW_CONFIG_PATH and DEER_FLOW_EXTENSIONS_CONFIG_PATH
   environment overrides. Added in fb2d99f (#1836) but accidentally reverted
   by ca2fb95 (#1847). Without them, gateway reads host paths from .env via
   env_file, causing FileNotFoundError inside the container.

2. Langgraph command fails when LANGGRAPH_ALLOW_BLOCKING is unset (default).
   Empty $${allow_blocking} inserts a bare space between flags, causing
   ' --no-reload' to be parsed as unexpected extra argument. Fix by building
   args string first and conditionally appending --allow-blocking.

Co-authored-by: cooper <cooperfu@tencent.com>

* fix(frontend): resolve invalid HTML nesting and tabnabbing vulnerabilities (#1904)

* fix(frontend): resolve invalid HTML nesting and tabnabbing vulnerabilities

Fix `<button>` inside `<a>` invalid HTML in artifact components and add
missing `noopener,noreferrer` to `window.open` calls to prevent reverse
tabnabbing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix(frontend): address Copilot review on tabnabbing and double-tab-open

Remove redundant parent onClick on web_fetch ChainOfThoughtStep to
prevent opening two tabs on link click, and explicitly null out
window.opener after window.open() for defensive tabnabbing hardening.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

* refactor(persistence): organize entities into per-entity directories

Restructure the persistence layer from horizontal "models/ + repositories/"
split into vertical entity-aligned directories. Each entity (thread_meta,
run, feedback) now owns its ORM model, abstract interface (where applicable),
and concrete implementations under a single directory with an aggregating
__init__.py for one-line imports.

Layout:
  persistence/thread_meta/{base,model,sql,memory}.py
  persistence/run/{model,sql}.py
  persistence/feedback/{model,sql}.py

models/__init__.py is kept as a facade so Alembic autogenerate continues to
discover all ORM tables via Base.metadata. RunEventRow remains under
models/run_event.py because its storage implementation lives in
runtime/events/store/db.py and has no matching repository directory.

The repositories/ directory is removed entirely. All call sites in
gateway/deps.py and tests are updated to import from the new entity
packages, e.g.:

    from deerflow.persistence.thread_meta import ThreadMetaRepository
    from deerflow.persistence.run import RunRepository
    from deerflow.persistence.feedback import FeedbackRepository

Full test suite passes (1690 passed, 14 skipped).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix(gateway): sync thread rename and delete through ThreadMetaStore

The POST /threads/{id}/state endpoint previously synced title changes
only to the LangGraph Store via _store_upsert. In sqlite mode the search
endpoint reads from the ThreadMetaRepository SQL table, so renames never
appeared in /threads/search until the next agent run completed (worker.py
syncs title from checkpoint to thread_meta in its finally block).

Likewise the DELETE /threads/{id} endpoint cleaned up the filesystem,
Store, and checkpointer but left the threads_meta row orphaned in sqlite,
so deleted threads kept appearing in /threads/search.

Fix both endpoints by routing through the ThreadMetaStore abstraction
which already has the correct sqlite/memory implementations wired up by
deps.py. The rename path now calls update_display_name() and the delete
path calls delete() — both work uniformly across backends.

Verified end-to-end with curl in gateway mode against sqlite backend.
Existing test suite (1690 passed) and focused router/repo tests pass.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor(gateway): route all thread metadata access through ThreadMetaStore

Following the rename/delete bug fix in PR1, migrate the remaining direct
LangGraph Store reads/writes in the threads router and services to the
ThreadMetaStore abstraction so that the sqlite and memory backends behave
identically and the legacy dual-write paths can be removed.

Migrated endpoints (threads.py):
- create_thread: idempotency check + write now use thread_meta_repo.get/create
  instead of dual-writing the LangGraph Store and the SQL row.
- get_thread: reads from thread_meta_repo.get; the checkpoint-only fallback
  for legacy threads is preserved.
- patch_thread: replaced _store_get/_store_put with thread_meta_repo.update_metadata.
- delete_thread_data: dropped the legacy store.adelete; thread_meta_repo.delete
  already covers it.

Removed dead code (services.py):
- _upsert_thread_in_store — redundant with the immediately following
  thread_meta_repo.create() call.
- _sync_thread_title_after_run — worker.py's finally block already syncs
  the title via thread_meta_repo.update_display_name() after each run.

Removed dead code (threads.py):
- _store_get / _store_put / _store_upsert helpers (no remaining callers).
- THREADS_NS constant.
- get_store import (router no longer touches the LangGraph Store directly).

New abstract method:
- ThreadMetaStore.update_metadata(thread_id, metadata) merges metadata into
  the thread's metadata field. Implemented in both ThreadMetaRepository (SQL,
  read-modify-write inside one session) and MemoryThreadMetaStore. Three new
  unit tests cover merge / empty / nonexistent behaviour.

Net change: -134 lines. Full test suite: 1693 passed, 14 skipped.
Verified end-to-end with curl in gateway mode against sqlite backend
(create / patch / get / rename / search / delete).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com>
Co-authored-by: DanielWalnut <45447813+hetaoBackend@users.noreply.github.com>
Co-authored-by: Willem Jiang <willem.jiang@gmail.com>
Co-authored-by: JilongSun <965640067@qq.com>
Co-authored-by: jie <49781832+stan-fu@users.noreply.github.com>
Co-authored-by: cooper <cooperfu@tencent.com>
Co-authored-by: yangzheli <43645580+yangzheli@users.noreply.github.com>
2026-04-11 11:23:39 +08:00

1043 lines
42 KiB
Python

"""Tests for RunJournal callback handler.
Uses MemoryRunEventStore as the backend for direct event inspection.
"""
import asyncio
from unittest.mock import MagicMock
from uuid import uuid4
import pytest
from deerflow.runtime.events.store.memory import MemoryRunEventStore
from deerflow.runtime.journal import RunJournal
@pytest.fixture
def journal_setup():
store = MemoryRunEventStore()
j = RunJournal("r1", "t1", store, flush_threshold=100)
return j, store
def _make_llm_response(content="Hello", usage=None, tool_calls=None, additional_kwargs=None):
"""Create a mock LLM response with a message.
model_dump() returns checkpoint-aligned format matching real AIMessage.
"""
msg = MagicMock()
msg.type = "ai"
msg.content = content
msg.id = f"msg-{id(msg)}"
msg.tool_calls = tool_calls or []
msg.invalid_tool_calls = []
msg.response_metadata = {"model_name": "test-model"}
msg.usage_metadata = usage
msg.additional_kwargs = additional_kwargs or {}
msg.name = None
# model_dump returns checkpoint-aligned format
msg.model_dump.return_value = {
"content": content,
"additional_kwargs": additional_kwargs or {},
"response_metadata": {"model_name": "test-model"},
"type": "ai",
"name": None,
"id": msg.id,
"tool_calls": tool_calls or [],
"invalid_tool_calls": [],
"usage_metadata": usage,
}
gen = MagicMock()
gen.message = msg
response = MagicMock()
response.generations = [[gen]]
return response
class TestLlmCallbacks:
@pytest.mark.anyio
async def test_on_llm_end_produces_trace_event(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_llm_start({}, [], run_id=run_id, tags=["lead_agent"])
j.on_llm_end(_make_llm_response("Hi"), run_id=run_id, tags=["lead_agent"])
await j.flush()
events = await store.list_events("t1", "r1")
trace_events = [e for e in events if e["event_type"] == "llm_response"]
assert len(trace_events) == 1
assert trace_events[0]["category"] == "trace"
@pytest.mark.anyio
async def test_on_llm_end_lead_agent_produces_ai_message(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_llm_start({}, [], run_id=run_id, tags=["lead_agent"])
j.on_llm_end(_make_llm_response("Answer"), run_id=run_id, tags=["lead_agent"])
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "ai_message"
# Content is checkpoint-aligned model_dump format
assert messages[0]["content"]["type"] == "ai"
assert messages[0]["content"]["content"] == "Answer"
@pytest.mark.anyio
async def test_on_llm_end_with_tool_calls_produces_ai_tool_call(self, journal_setup):
"""LLM response with pending tool_calls should produce ai_tool_call event."""
j, store = journal_setup
run_id = uuid4()
j.on_llm_end(
_make_llm_response("Let me search", tool_calls=[{"id": "call_1", "name": "search", "args": {}}]),
run_id=run_id,
tags=["lead_agent"],
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "ai_tool_call"
@pytest.mark.anyio
async def test_on_llm_end_subagent_no_ai_message(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_llm_start({}, [], run_id=run_id, tags=["subagent:research"])
j.on_llm_end(_make_llm_response("Sub answer"), run_id=run_id, tags=["subagent:research"])
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 0
@pytest.mark.anyio
async def test_token_accumulation(self, journal_setup):
j, store = journal_setup
usage1 = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}
usage2 = {"input_tokens": 20, "output_tokens": 10, "total_tokens": 30}
j.on_llm_end(_make_llm_response("A", usage=usage1), run_id=uuid4(), tags=["lead_agent"])
j.on_llm_end(_make_llm_response("B", usage=usage2), run_id=uuid4(), tags=["lead_agent"])
assert j._total_input_tokens == 30
assert j._total_output_tokens == 15
assert j._total_tokens == 45
assert j._llm_call_count == 2
@pytest.mark.anyio
async def test_total_tokens_computed_from_input_output(self, journal_setup):
"""If total_tokens is 0, it should be computed from input + output."""
j, store = journal_setup
j.on_llm_end(
_make_llm_response("Hi", usage={"input_tokens": 100, "output_tokens": 50, "total_tokens": 0}),
run_id=uuid4(),
tags=["lead_agent"],
)
assert j._total_tokens == 150
assert j._lead_agent_tokens == 150
@pytest.mark.anyio
async def test_caller_token_classification(self, journal_setup):
j, store = journal_setup
usage = {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}
j.on_llm_end(_make_llm_response("A", usage=usage), run_id=uuid4(), tags=["lead_agent"])
j.on_llm_end(_make_llm_response("B", usage=usage), run_id=uuid4(), tags=["subagent:research"])
j.on_llm_end(_make_llm_response("C", usage=usage), run_id=uuid4(), tags=["middleware:summarization"])
assert j._lead_agent_tokens == 15
assert j._subagent_tokens == 15
assert j._middleware_tokens == 15
@pytest.mark.anyio
async def test_usage_metadata_none_no_crash(self, journal_setup):
j, store = journal_setup
j.on_llm_end(_make_llm_response("No usage", usage=None), run_id=uuid4(), tags=["lead_agent"])
await j.flush()
@pytest.mark.anyio
async def test_latency_tracking(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_llm_start({}, [], run_id=run_id, tags=["lead_agent"])
j.on_llm_end(_make_llm_response("Fast"), run_id=run_id, tags=["lead_agent"])
await j.flush()
events = await store.list_events("t1", "r1")
llm_resp = [e for e in events if e["event_type"] == "llm_response"][0]
assert "latency_ms" in llm_resp["metadata"]
assert llm_resp["metadata"]["latency_ms"] is not None
class TestLifecycleCallbacks:
@pytest.mark.anyio
async def test_chain_start_end_produce_lifecycle_events(self, journal_setup):
j, store = journal_setup
j.on_chain_start({}, {}, run_id=uuid4(), parent_run_id=None)
j.on_chain_end({}, run_id=uuid4(), parent_run_id=None)
await asyncio.sleep(0.05)
await j.flush()
events = await store.list_events("t1", "r1")
types = [e["event_type"] for e in events if e["category"] == "lifecycle"]
assert "run_start" in types
assert "run_end" in types
@pytest.mark.anyio
async def test_nested_chain_ignored(self, journal_setup):
j, store = journal_setup
parent_id = uuid4()
j.on_chain_start({}, {}, run_id=uuid4(), parent_run_id=parent_id)
j.on_chain_end({}, run_id=uuid4(), parent_run_id=parent_id)
await j.flush()
events = await store.list_events("t1", "r1")
lifecycle = [e for e in events if e["category"] == "lifecycle"]
assert len(lifecycle) == 0
class TestToolCallbacks:
@pytest.mark.anyio
async def test_tool_start_end_produce_trace(self, journal_setup):
j, store = journal_setup
j.on_tool_start({"name": "web_search"}, "query", run_id=uuid4())
j.on_tool_end("results", run_id=uuid4(), name="web_search")
await j.flush()
events = await store.list_events("t1", "r1")
trace_types = {e["event_type"] for e in events if e["category"] == "trace"}
assert "tool_start" in trace_types
assert "tool_end" in trace_types
@pytest.mark.anyio
async def test_on_tool_error(self, journal_setup):
j, store = journal_setup
j.on_tool_error(TimeoutError("timeout"), run_id=uuid4(), name="web_fetch")
await j.flush()
events = await store.list_events("t1", "r1")
assert any(e["event_type"] == "tool_error" for e in events)
class TestCustomEvents:
@pytest.mark.anyio
async def test_summarization_event(self, journal_setup):
j, store = journal_setup
j.on_custom_event(
"summarization",
{"summary": "Context was summarized.", "replaced_count": 5, "replaced_message_ids": ["a", "b"]},
run_id=uuid4(),
)
await j.flush()
events = await store.list_events("t1", "r1")
trace = [e for e in events if e["event_type"] == "summarization"]
assert len(trace) == 1
# Summarization goes to middleware category, not message
mw_events = [e for e in events if e["event_type"] == "middleware:summarize"]
assert len(mw_events) == 1
assert mw_events[0]["category"] == "middleware"
assert mw_events[0]["content"] == {"role": "system", "content": "Context was summarized."}
# No message events from summarization
messages = await store.list_messages("t1")
assert len(messages) == 0
@pytest.mark.anyio
async def test_non_summarization_custom_event(self, journal_setup):
j, store = journal_setup
j.on_custom_event("task_running", {"task_id": "t1", "status": "running"}, run_id=uuid4())
await j.flush()
events = await store.list_events("t1", "r1")
assert any(e["event_type"] == "task_running" for e in events)
class TestBufferFlush:
@pytest.mark.anyio
async def test_flush_threshold(self, journal_setup):
j, store = journal_setup
j._flush_threshold = 3
j.on_tool_start({"name": "a"}, "x", run_id=uuid4())
j.on_tool_start({"name": "b"}, "x", run_id=uuid4())
assert len(j._buffer) == 2
j.on_tool_start({"name": "c"}, "x", run_id=uuid4())
await asyncio.sleep(0.1)
events = await store.list_events("t1", "r1")
assert len(events) >= 3
@pytest.mark.anyio
async def test_events_retained_when_no_loop(self, journal_setup):
"""Events buffered in a sync (no-loop) context should survive
until the async flush() in the finally block."""
j, store = journal_setup
j._flush_threshold = 1
original = asyncio.get_running_loop
def no_loop():
raise RuntimeError("no running event loop")
asyncio.get_running_loop = no_loop
try:
j._put(event_type="llm_response", category="trace", content="test")
finally:
asyncio.get_running_loop = original
assert len(j._buffer) == 1
await j.flush()
events = await store.list_events("t1", "r1")
assert any(e["event_type"] == "llm_response" for e in events)
class TestIdentifyCaller:
def test_lead_agent_tag(self, journal_setup):
j, _ = journal_setup
assert j._identify_caller({"tags": ["lead_agent"]}) == "lead_agent"
def test_subagent_tag(self, journal_setup):
j, _ = journal_setup
assert j._identify_caller({"tags": ["subagent:research"]}) == "subagent:research"
def test_middleware_tag(self, journal_setup):
j, _ = journal_setup
assert j._identify_caller({"tags": ["middleware:summarization"]}) == "middleware:summarization"
def test_no_tags_returns_lead_agent(self, journal_setup):
j, _ = journal_setup
assert j._identify_caller({"tags": []}) == "lead_agent"
assert j._identify_caller({}) == "lead_agent"
class TestChainErrorCallback:
@pytest.mark.anyio
async def test_on_chain_error_writes_run_error(self, journal_setup):
j, store = journal_setup
j.on_chain_error(ValueError("boom"), run_id=uuid4(), parent_run_id=None)
await asyncio.sleep(0.05)
await j.flush()
events = await store.list_events("t1", "r1")
error_events = [e for e in events if e["event_type"] == "run_error"]
assert len(error_events) == 1
assert "boom" in error_events[0]["content"]
assert error_events[0]["metadata"]["error_type"] == "ValueError"
class TestTokenTrackingDisabled:
@pytest.mark.anyio
async def test_track_token_usage_false(self):
store = MemoryRunEventStore()
j = RunJournal("r1", "t1", store, track_token_usage=False, flush_threshold=100)
j.on_llm_end(
_make_llm_response("X", usage={"input_tokens": 50, "output_tokens": 50, "total_tokens": 100}),
run_id=uuid4(),
tags=["lead_agent"],
)
data = j.get_completion_data()
assert data["total_tokens"] == 0
assert data["llm_call_count"] == 0
class TestConvenienceFields:
@pytest.mark.anyio
async def test_last_ai_message_tracks_latest(self, journal_setup):
j, store = journal_setup
j.on_llm_end(_make_llm_response("First"), run_id=uuid4(), tags=["lead_agent"])
j.on_llm_end(_make_llm_response("Second"), run_id=uuid4(), tags=["lead_agent"])
data = j.get_completion_data()
assert data["last_ai_message"] == "Second"
assert data["message_count"] == 2
@pytest.mark.anyio
async def test_first_human_message_via_set(self, journal_setup):
j, _ = journal_setup
j.set_first_human_message("What is AI?")
data = j.get_completion_data()
assert data["first_human_message"] == "What is AI?"
@pytest.mark.anyio
async def test_get_completion_data(self, journal_setup):
j, _ = journal_setup
j._total_tokens = 100
j._msg_count = 5
data = j.get_completion_data()
assert data["total_tokens"] == 100
assert data["message_count"] == 5
class TestUnknownCallerTokens:
@pytest.mark.anyio
async def test_unknown_caller_tokens_go_to_lead(self, journal_setup):
j, store = journal_setup
j.on_llm_end(
_make_llm_response("X", usage={"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}),
run_id=uuid4(),
tags=[],
)
assert j._lead_agent_tokens == 15
# ---------------------------------------------------------------------------
# SQLite-backed end-to-end test
# ---------------------------------------------------------------------------
class TestDbBackedLifecycle:
@pytest.mark.anyio
async def test_full_lifecycle_with_sqlite(self, tmp_path):
"""Full lifecycle with SQLite-backed RunRepository + DbRunEventStore."""
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
from deerflow.persistence.run import RunRepository
from deerflow.runtime.events.store.db import DbRunEventStore
from deerflow.runtime.runs.manager import RunManager
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
sf = get_session_factory()
run_store = RunRepository(sf)
event_store = DbRunEventStore(sf)
mgr = RunManager(store=run_store)
# Create run
record = await mgr.create("t1", "lead_agent")
run_id = record.run_id
# Write human_message (checkpoint-aligned format)
from langchain_core.messages import HumanMessage
human_msg = HumanMessage(content="Hello DB")
await event_store.put(thread_id="t1", run_id=run_id, event_type="human_message", category="message", content=human_msg.model_dump())
# Simulate journal
journal = RunJournal(run_id, "t1", event_store, flush_threshold=100)
journal.set_first_human_message("Hello DB")
journal.on_chain_start({}, {}, run_id=uuid4(), parent_run_id=None)
llm_rid = uuid4()
journal.on_llm_start({"name": "test"}, [], run_id=llm_rid, tags=["lead_agent"])
journal.on_llm_end(
_make_llm_response("DB response", usage={"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}),
run_id=llm_rid,
tags=["lead_agent"],
)
journal.on_chain_end({}, run_id=uuid4(), parent_run_id=None)
await asyncio.sleep(0.05)
await journal.flush()
# Verify run persisted
row = await run_store.get(run_id)
assert row is not None
assert row["status"] == "pending"
# Update completion
completion = journal.get_completion_data()
await run_store.update_run_completion(run_id, status="success", **completion)
row = await run_store.get(run_id)
assert row["status"] == "success"
assert row["total_tokens"] == 15
# Verify messages from DB (checkpoint-aligned format)
messages = await event_store.list_messages("t1")
assert len(messages) == 2
assert messages[0]["event_type"] == "human_message"
assert messages[0]["content"]["type"] == "human"
assert messages[1]["event_type"] == "ai_message"
assert messages[1]["content"]["type"] == "ai"
assert messages[1]["content"]["content"] == "DB response"
# Verify events from DB
events = await event_store.list_events("t1", run_id)
event_types = {e["event_type"] for e in events}
assert "run_start" in event_types
assert "llm_response" in event_types
assert "run_end" in event_types
await close_engine()
class TestDictContentFlag:
"""Verify that content_is_dict metadata flag controls deserialization."""
@pytest.mark.anyio
async def test_db_store_str_starting_with_brace_not_deserialized(self, tmp_path):
"""Plain string content starting with { should NOT be deserialized."""
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
from deerflow.runtime.events.store.db import DbRunEventStore
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
sf = get_session_factory()
store = DbRunEventStore(sf)
await store.put(
thread_id="t1",
run_id="r1",
event_type="tool_end",
category="trace",
content="{not json, just a string}",
)
events = await store.list_events("t1", "r1")
assert events[0]["content"] == "{not json, just a string}"
assert isinstance(events[0]["content"], str)
await close_engine()
@pytest.mark.anyio
async def test_db_store_str_starting_with_bracket_not_deserialized(self, tmp_path):
"""Plain string content like '[1, 2, 3]' should NOT be deserialized."""
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
from deerflow.runtime.events.store.db import DbRunEventStore
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
sf = get_session_factory()
store = DbRunEventStore(sf)
await store.put(
thread_id="t1",
run_id="r1",
event_type="tool_end",
category="trace",
content="[1, 2, 3]",
)
events = await store.list_events("t1", "r1")
assert events[0]["content"] == "[1, 2, 3]"
assert isinstance(events[0]["content"], str)
await close_engine()
class TestDictContent:
"""Verify that store backends accept str | dict content."""
@pytest.mark.anyio
async def test_memory_store_dict_content(self):
store = MemoryRunEventStore()
record = await store.put(
thread_id="t1",
run_id="r1",
event_type="ai_message",
category="message",
content={"role": "assistant", "content": "Hello"},
)
assert record["content"] == {"role": "assistant", "content": "Hello"}
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"] == {"role": "assistant", "content": "Hello"}
@pytest.mark.anyio
async def test_memory_store_str_content_unchanged(self):
store = MemoryRunEventStore()
record = await store.put(
thread_id="t1",
run_id="r1",
event_type="ai_message",
category="message",
content="plain string",
)
assert record["content"] == "plain string"
assert isinstance(record["content"], str)
@pytest.mark.anyio
async def test_db_store_dict_content_roundtrip(self, tmp_path):
"""Dict content survives DB roundtrip (JSON serialize on write, deserialize on read)."""
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
from deerflow.runtime.events.store.db import DbRunEventStore
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
sf = get_session_factory()
store = DbRunEventStore(sf)
nested = {"role": "assistant", "content": "Hi", "metadata": {"model": "gpt-4", "tokens": [1, 2, 3]}}
record = await store.put(
thread_id="t1",
run_id="r1",
event_type="ai_message",
category="message",
content=nested,
)
assert record["content"] == nested
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"] == nested
await close_engine()
@pytest.mark.anyio
async def test_db_store_trace_dict_truncation(self, tmp_path):
"""Large dict trace content is truncated with metadata flag."""
from deerflow.persistence.engine import close_engine, get_session_factory, init_engine
from deerflow.runtime.events.store.db import DbRunEventStore
url = f"sqlite+aiosqlite:///{tmp_path / 'test.db'}"
await init_engine("sqlite", url=url, sqlite_dir=str(tmp_path))
sf = get_session_factory()
store = DbRunEventStore(sf, max_trace_content=100)
large_dict = {"role": "assistant", "content": "x" * 200}
record = await store.put(
thread_id="t1",
run_id="r1",
event_type="llm_end",
category="trace",
content=large_dict,
)
assert record["metadata"].get("content_truncated") is True
# Content should be a truncated string (serialized JSON was too long)
assert isinstance(record["content"], str)
assert len(record["content"]) <= 100
await close_engine()
class TestCheckpointAlignedHumanMessage:
@pytest.mark.anyio
async def test_human_message_checkpoint_format(self):
"""human_message content uses model_dump() checkpoint format."""
from langchain_core.messages import HumanMessage
store = MemoryRunEventStore()
human_msg = HumanMessage(content="What is AI?")
await store.put(
thread_id="t1",
run_id="r1",
event_type="human_message",
category="message",
content=human_msg.model_dump(),
metadata={"message_id": "msg_001"},
)
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"]["type"] == "human"
assert messages[0]["content"]["content"] == "What is AI?"
class TestCheckpointAlignedMessageFormat:
@pytest.mark.anyio
async def test_ai_message_checkpoint_format(self, journal_setup):
"""ai_message content should be checkpoint-aligned model_dump dict."""
j, store = journal_setup
j.on_llm_end(_make_llm_response("Answer"), run_id=uuid4(), tags=["lead_agent"])
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"]["type"] == "ai"
assert messages[0]["content"]["content"] == "Answer"
assert "response_metadata" in messages[0]["content"]
assert "additional_kwargs" in messages[0]["content"]
@pytest.mark.anyio
async def test_ai_tool_call_event(self, journal_setup):
"""LLM response with tool_calls should produce ai_tool_call with model_dump content."""
j, store = journal_setup
tool_calls = [{"id": "call_1", "name": "search", "args": {"query": "test"}}]
j.on_llm_end(
_make_llm_response("Let me search", tool_calls=tool_calls),
run_id=uuid4(),
tags=["lead_agent"],
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "ai_tool_call"
assert messages[0]["content"]["type"] == "ai"
assert messages[0]["content"]["content"] == "Let me search"
assert len(messages[0]["content"]["tool_calls"]) == 1
tc = messages[0]["content"]["tool_calls"][0]
assert tc["id"] == "call_1"
assert tc["name"] == "search"
@pytest.mark.anyio
async def test_ai_tool_call_only_from_lead_agent(self, journal_setup):
"""ai_tool_call should only be emitted for lead_agent, not subagents."""
j, store = journal_setup
tool_calls = [{"id": "call_1", "name": "search", "args": {}}]
j.on_llm_end(
_make_llm_response("searching", tool_calls=tool_calls),
run_id=uuid4(),
tags=["subagent:research"],
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 0
class TestToolResultMessage:
@pytest.mark.anyio
async def test_tool_end_produces_tool_result_message(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_tool_start({"name": "web_search"}, '{"query": "test"}', run_id=run_id, tool_call_id="call_abc")
j.on_tool_end("search results here", run_id=run_id, name="web_search", tool_call_id="call_abc")
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "tool_result"
# Content is checkpoint-aligned model_dump format
assert messages[0]["content"]["type"] == "tool"
assert messages[0]["content"]["tool_call_id"] == "call_abc"
assert messages[0]["content"]["content"] == "search results here"
assert messages[0]["content"]["name"] == "web_search"
@pytest.mark.anyio
async def test_tool_result_missing_tool_call_id(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_tool_start({"name": "bash"}, "ls", run_id=run_id)
j.on_tool_end("file1.txt", run_id=run_id, name="bash")
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"]["type"] == "tool"
@pytest.mark.anyio
async def test_tool_end_extracts_from_tool_message_object(self, journal_setup):
"""When LangChain passes a ToolMessage object as output, extract fields from it."""
from langchain_core.messages import ToolMessage
j, store = journal_setup
run_id = uuid4()
tool_msg = ToolMessage(
content="search results",
tool_call_id="call_from_obj",
name="web_search",
status="success",
)
j.on_tool_end(tool_msg, run_id=run_id)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"]["type"] == "tool"
assert messages[0]["content"]["tool_call_id"] == "call_from_obj"
assert messages[0]["content"]["content"] == "search results"
assert messages[0]["content"]["name"] == "web_search"
assert messages[0]["metadata"]["tool_name"] == "web_search"
assert messages[0]["metadata"]["status"] == "success"
events = await store.list_events("t1", "r1")
tool_end = [e for e in events if e["event_type"] == "tool_end"][0]
assert tool_end["metadata"]["tool_call_id"] == "call_from_obj"
assert tool_end["metadata"]["tool_name"] == "web_search"
@pytest.mark.anyio
async def test_tool_message_object_overrides_kwargs(self, journal_setup):
"""ToolMessage object fields take priority over kwargs."""
from langchain_core.messages import ToolMessage
j, store = journal_setup
run_id = uuid4()
tool_msg = ToolMessage(
content="result",
tool_call_id="call_obj",
name="tool_a",
status="success",
)
# Pass different values in kwargs — ToolMessage should win
j.on_tool_end(tool_msg, run_id=run_id, name="tool_b", tool_call_id="call_kwarg")
await j.flush()
messages = await store.list_messages("t1")
assert messages[0]["content"]["tool_call_id"] == "call_obj"
assert messages[0]["content"]["name"] == "tool_a"
assert messages[0]["metadata"]["tool_name"] == "tool_a"
@pytest.mark.anyio
async def test_tool_message_error_status(self, journal_setup):
"""ToolMessage with status='error' propagates status to metadata."""
from langchain_core.messages import ToolMessage
j, store = journal_setup
run_id = uuid4()
tool_msg = ToolMessage(
content="something went wrong",
tool_call_id="call_err",
name="web_fetch",
status="error",
)
j.on_tool_end(tool_msg, run_id=run_id)
await j.flush()
events = await store.list_events("t1", "r1")
tool_end = [e for e in events if e["event_type"] == "tool_end"][0]
assert tool_end["metadata"]["status"] == "error"
messages = await store.list_messages("t1")
assert messages[0]["content"]["status"] == "error"
assert messages[0]["metadata"]["status"] == "error"
@pytest.mark.anyio
async def test_tool_message_fallback_to_cache(self, journal_setup):
"""If ToolMessage has empty tool_call_id, fall back to cache from on_tool_start."""
from langchain_core.messages import ToolMessage
j, store = journal_setup
run_id = uuid4()
j.on_tool_start({"name": "bash"}, "ls", run_id=run_id, tool_call_id="call_cached")
tool_msg = ToolMessage(
content="file list",
tool_call_id="",
name="bash",
)
j.on_tool_end(tool_msg, run_id=run_id)
await j.flush()
messages = await store.list_messages("t1")
assert messages[0]["content"]["tool_call_id"] == "call_cached"
@pytest.mark.anyio
async def test_tool_error_produces_tool_result_message(self, journal_setup):
j, store = journal_setup
j.on_tool_error(TimeoutError("timeout"), run_id=uuid4(), name="web_fetch", tool_call_id="call_1")
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["event_type"] == "tool_result"
assert messages[0]["content"]["type"] == "tool"
assert messages[0]["content"]["tool_call_id"] == "call_1"
assert "timeout" in messages[0]["content"]["content"]
assert messages[0]["content"]["status"] == "error"
assert messages[0]["metadata"]["status"] == "error"
@pytest.mark.anyio
async def test_tool_error_uses_cached_tool_call_id(self, journal_setup):
"""on_tool_error should fall back to cached tool_call_id from on_tool_start."""
j, store = journal_setup
run_id = uuid4()
j.on_tool_start({"name": "web_fetch"}, "url", run_id=run_id, tool_call_id="call_cached")
j.on_tool_error(TimeoutError("timeout"), run_id=run_id, name="web_fetch")
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 1
assert messages[0]["content"]["tool_call_id"] == "call_cached"
def _make_base_messages():
"""Create mock LangChain BaseMessages for on_chat_model_start."""
sys_msg = MagicMock()
sys_msg.content = "You are helpful."
sys_msg.type = "system"
sys_msg.tool_calls = []
sys_msg.tool_call_id = None
user_msg = MagicMock()
user_msg.content = "Hello"
user_msg.type = "human"
user_msg.tool_calls = []
user_msg.tool_call_id = None
return [sys_msg, user_msg]
class TestLlmRequestResponse:
@pytest.mark.anyio
async def test_llm_request_event(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
messages = _make_base_messages()
j.on_chat_model_start({"name": "gpt-4o"}, [messages], run_id=run_id, tags=["lead_agent"])
await j.flush()
events = await store.list_events("t1", "r1")
req_events = [e for e in events if e["event_type"] == "llm_request"]
assert len(req_events) == 1
content = req_events[0]["content"]
assert content["model"] == "gpt-4o"
assert len(content["messages"]) == 2
assert content["messages"][0]["role"] == "system"
assert content["messages"][1]["role"] == "user"
@pytest.mark.anyio
async def test_llm_response_event(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_llm_start({}, [], run_id=run_id, tags=["lead_agent"])
j.on_llm_end(
_make_llm_response("Answer", usage={"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}),
run_id=run_id,
tags=["lead_agent"],
)
await j.flush()
events = await store.list_events("t1", "r1")
assert not any(e["event_type"] == "llm_end" for e in events)
resp_events = [e for e in events if e["event_type"] == "llm_response"]
assert len(resp_events) == 1
content = resp_events[0]["content"]
assert "choices" in content
assert content["choices"][0]["message"]["role"] == "assistant"
assert content["choices"][0]["message"]["content"] == "Answer"
assert content["usage"]["prompt_tokens"] == 10
@pytest.mark.anyio
async def test_llm_request_response_paired(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
messages = _make_base_messages()
j.on_chat_model_start({"name": "gpt-4o"}, [messages], run_id=run_id, tags=["lead_agent"])
j.on_llm_end(
_make_llm_response("Hi", usage={"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}),
run_id=run_id,
tags=["lead_agent"],
)
await j.flush()
events = await store.list_events("t1", "r1")
req = [e for e in events if e["event_type"] == "llm_request"][0]
resp = [e for e in events if e["event_type"] == "llm_response"][0]
assert req["metadata"]["llm_call_index"] == resp["metadata"]["llm_call_index"]
@pytest.mark.anyio
async def test_no_llm_start_event(self, journal_setup):
j, store = journal_setup
run_id = uuid4()
j.on_llm_start({"name": "test"}, [], run_id=run_id, tags=["lead_agent"])
await j.flush()
events = await store.list_events("t1", "r1")
assert not any(e["event_type"] == "llm_start" for e in events)
class TestMiddlewareEvents:
@pytest.mark.anyio
async def test_record_middleware_uses_middleware_category(self, journal_setup):
j, store = journal_setup
j.record_middleware(
"title",
name="TitleMiddleware",
hook="after_model",
action="generate_title",
changes={"title": "Test Title", "thread_id": "t1"},
)
await j.flush()
events = await store.list_events("t1", "r1")
mw_events = [e for e in events if e["event_type"] == "middleware:title"]
assert len(mw_events) == 1
assert mw_events[0]["category"] == "middleware"
assert mw_events[0]["content"]["name"] == "TitleMiddleware"
assert mw_events[0]["content"]["hook"] == "after_model"
assert mw_events[0]["content"]["action"] == "generate_title"
assert mw_events[0]["content"]["changes"]["title"] == "Test Title"
@pytest.mark.anyio
async def test_middleware_events_not_in_messages(self, journal_setup):
"""Middleware events should not appear in list_messages()."""
j, store = journal_setup
j.record_middleware(
"title",
name="TitleMiddleware",
hook="after_model",
action="generate_title",
changes={"title": "Test"},
)
await j.flush()
messages = await store.list_messages("t1")
assert len(messages) == 0
@pytest.mark.anyio
async def test_middleware_tag_variants(self, journal_setup):
"""Different middleware tags produce distinct event_types."""
j, store = journal_setup
j.record_middleware("title", name="TitleMiddleware", hook="after_model", action="generate_title", changes={})
j.record_middleware("guardrail", name="GuardrailMiddleware", hook="before_tool", action="deny", changes={})
await j.flush()
events = await store.list_events("t1", "r1")
event_types = {e["event_type"] for e in events}
assert "middleware:title" in event_types
assert "middleware:guardrail" in event_types
class TestFullRunSequence:
@pytest.mark.anyio
async def test_complete_run_event_sequence(self):
"""Simulate a full run: user -> LLM -> tool_call -> tool_result -> LLM -> final reply.
All message events use checkpoint-aligned model_dump format.
"""
from langchain_core.messages import HumanMessage
store = MemoryRunEventStore()
j = RunJournal("r1", "t1", store, flush_threshold=100)
# 1. Human message (written by worker, using model_dump format)
human_msg = HumanMessage(content="Search for quantum computing")
await store.put(
thread_id="t1",
run_id="r1",
event_type="human_message",
category="message",
content=human_msg.model_dump(),
)
j.set_first_human_message("Search for quantum computing")
# 2. Run start
j.on_chain_start({}, {}, run_id=uuid4(), parent_run_id=None)
# 3. First LLM call -> tool_calls
llm1_id = uuid4()
sys_msg = MagicMock(content="You are helpful.", type="system", tool_calls=[], tool_call_id=None)
user_msg = MagicMock(content="Search for quantum computing", type="human", tool_calls=[], tool_call_id=None)
j.on_chat_model_start({"name": "gpt-4o"}, [[sys_msg, user_msg]], run_id=llm1_id, tags=["lead_agent"])
j.on_llm_end(
_make_llm_response(
"Let me search",
tool_calls=[{"id": "call_1", "name": "web_search", "args": {"query": "quantum computing"}}],
usage={"input_tokens": 100, "output_tokens": 20, "total_tokens": 120},
),
run_id=llm1_id,
tags=["lead_agent"],
)
# 4. Tool execution
tool_id = uuid4()
j.on_tool_start({"name": "web_search"}, '{"query": "quantum computing"}', run_id=tool_id, tool_call_id="call_1")
j.on_tool_end("Quantum computing results...", run_id=tool_id, name="web_search", tool_call_id="call_1")
# 5. Middleware: title generation
j.record_middleware("title", name="TitleMiddleware", hook="after_model", action="generate_title", changes={"title": "Quantum Computing"})
# 6. Second LLM call -> final reply
llm2_id = uuid4()
j.on_chat_model_start({"name": "gpt-4o"}, [[sys_msg, user_msg]], run_id=llm2_id, tags=["lead_agent"])
j.on_llm_end(
_make_llm_response(
"Here are the results about quantum computing...",
usage={"input_tokens": 200, "output_tokens": 100, "total_tokens": 300},
),
run_id=llm2_id,
tags=["lead_agent"],
)
# 7. Run end
j.on_chain_end({}, run_id=uuid4(), parent_run_id=None)
await asyncio.sleep(0.05)
await j.flush()
# Verify message sequence
messages = await store.list_messages("t1")
msg_types = [m["event_type"] for m in messages]
assert msg_types == ["human_message", "ai_tool_call", "tool_result", "ai_message"]
# Verify checkpoint-aligned format: all messages use "type" not "role"
assert messages[0]["content"]["type"] == "human"
assert messages[0]["content"]["content"] == "Search for quantum computing"
assert messages[1]["content"]["type"] == "ai"
assert "tool_calls" in messages[1]["content"]
assert messages[2]["content"]["type"] == "tool"
assert messages[2]["content"]["tool_call_id"] == "call_1"
assert messages[3]["content"]["type"] == "ai"
assert messages[3]["content"]["content"] == "Here are the results about quantum computing..."
# Verify trace events
events = await store.list_events("t1", "r1")
trace_types = [e["event_type"] for e in events if e["category"] == "trace"]
assert "llm_request" in trace_types
assert "llm_response" in trace_types
assert "tool_start" in trace_types
assert "tool_end" in trace_types
assert "llm_start" not in trace_types
assert "llm_end" not in trace_types
# Verify middleware events are in their own category
mw_events = [e for e in events if e["category"] == "middleware"]
assert len(mw_events) == 1
assert mw_events[0]["event_type"] == "middleware:title"
# Verify token accumulation
data = j.get_completion_data()
assert data["total_tokens"] == 420 # 120 + 300
assert data["llm_call_count"] == 2
assert data["lead_agent_tokens"] == 420
assert data["message_count"] == 1 # only final ai_message counts
assert data["last_ai_message"] == "Here are the results about quantum computing..."
# Verify all message contents are checkpoint-aligned dicts with "type" field
for m in messages:
assert isinstance(m["content"], dict)
assert "type" in m["content"]