From 10cc6515784591951417c15e74995f9113f8a5d7 Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Sat, 11 Apr 2026 11:16:22 +0800 Subject: [PATCH] fix(persistence): stream hang when run_events.backend=db DbRunEventStore._user_id_from_context() returned user.id without coercing it to str. User.id is a Pydantic UUID, and aiosqlite cannot bind a raw UUID object to a VARCHAR column, so the INSERT for the initial human_message event silently rolled back and raised out of the worker task. Because that put() sat outside the worker's try block, the finally-clause that publishes end-of-stream never ran and the SSE stream hung forever. jsonl mode was unaffected because json.dumps(default=str) coerces UUID objects transparently. Fixes: - db.py: coerce user.id to str at the context-read boundary (matches what resolve_user_id already does for the other repositories) - worker.py: move RunJournal init + human_message put inside the try block so any failure flows through the finally/publish_end path instead of hanging the subscriber Defense-in-depth: - engine.py: add PRAGMA busy_timeout=5000 so checkpointer and event store wait for each other on the shared deerflow.db file instead of failing immediately under write-lock contention - journal.py: skip fire-and-forget _flush_sync when a previous flush task is still in flight, to avoid piling up concurrent put_batch writes on the same SQLAlchemy engine during streaming; flush() now waits for pending tasks before draining the buffer - database_config.py: doc-only update clarifying WAL + busy_timeout keep the unified deerflow.db safe for both workloads Co-Authored-By: Claude Opus 4.6 (1M context) --- .../deerflow/config/database_config.py | 5 +- .../harness/deerflow/persistence/engine.py | 1 + .../deerflow/runtime/events/store/db.py | 8 +- .../harness/deerflow/runtime/journal.py | 25 ++++-- .../harness/deerflow/runtime/runs/worker.py | 81 ++++++++++--------- 5 files changed, 76 insertions(+), 44 deletions(-) diff --git a/backend/packages/harness/deerflow/config/database_config.py b/backend/packages/harness/deerflow/config/database_config.py index a4160c79f..19fb297b0 100644 --- a/backend/packages/harness/deerflow/config/database_config.py +++ b/backend/packages/harness/deerflow/config/database_config.py @@ -7,7 +7,10 @@ configures one backend; the system handles physical separation details. SQLite mode: checkpointer and app share a single .db file ({sqlite_dir}/deerflow.db) with WAL journal mode enabled on every connection. WAL allows concurrent readers and a single writer without -blocking, making a unified file safe for both workloads. +blocking, making a unified file safe for both workloads. The +``busy_timeout`` PRAGMA (set in ``engine.py``) ensures writers wait +for each other instead of failing immediately when they contend for +the write lock. Postgres mode: both use the same database URL but maintain independent connection pools with different lifecycles. diff --git a/backend/packages/harness/deerflow/persistence/engine.py b/backend/packages/harness/deerflow/persistence/engine.py index 7e374788c..4857c13a0 100644 --- a/backend/packages/harness/deerflow/persistence/engine.py +++ b/backend/packages/harness/deerflow/persistence/engine.py @@ -105,6 +105,7 @@ async def init_engine( cursor.execute("PRAGMA journal_mode=WAL;") cursor.execute("PRAGMA synchronous=NORMAL;") cursor.execute("PRAGMA foreign_keys=ON;") + cursor.execute("PRAGMA busy_timeout=5000;") finally: cursor.close() elif backend == "postgres": diff --git a/backend/packages/harness/deerflow/runtime/events/store/db.py b/backend/packages/harness/deerflow/runtime/events/store/db.py index c88445bab..63328db43 100644 --- a/backend/packages/harness/deerflow/runtime/events/store/db.py +++ b/backend/packages/harness/deerflow/runtime/events/store/db.py @@ -62,9 +62,15 @@ class DbRunEventStore(RunEventStore): which is the expected case for background worker writes. HTTP request writes will have the contextvar set by auth middleware and get their user_id stamped automatically. + + Coerces ``user.id`` to ``str`` at the boundary: ``User.id`` is + typed as ``UUID`` by the auth layer, but ``run_events.user_id`` + is ``VARCHAR(64)`` and aiosqlite cannot bind a raw UUID object + to a VARCHAR column ("type 'UUID' is not supported") — the + INSERT would silently roll back and the worker would hang. """ user = get_current_user() - return user.id if user is not None else None + return str(user.id) if user is not None else None async def put(self, *, thread_id, run_id, event_type, category, content="", metadata=None, created_at=None): # noqa: D401 """Write a single event — low-frequency path only. diff --git a/backend/packages/harness/deerflow/runtime/journal.py b/backend/packages/harness/deerflow/runtime/journal.py index b9aa019ad..ad35f5ff9 100644 --- a/backend/packages/harness/deerflow/runtime/journal.py +++ b/backend/packages/harness/deerflow/runtime/journal.py @@ -50,6 +50,7 @@ class RunJournal(BaseCallbackHandler): # Write buffer self._buffer: list[dict] = [] + self._pending_flush_tasks: set[asyncio.Task[None]] = set() # Token accumulators self._total_input_tokens = 0 @@ -381,6 +382,10 @@ class RunJournal(BaseCallbackHandler): """ if not self._buffer: return + # Skip if a flush is already in flight — avoids concurrent writes + # to the same SQLite file from multiple fire-and-forget tasks. + if self._pending_flush_tasks: + return try: loop = asyncio.get_running_loop() except RuntimeError: @@ -389,6 +394,7 @@ class RunJournal(BaseCallbackHandler): batch = self._buffer.copy() self._buffer.clear() task = loop.create_task(self._flush_async(batch)) + self._pending_flush_tasks.add(task) task.add_done_callback(self._on_flush_done) async def _flush_async(self, batch: list[dict]) -> None: @@ -404,8 +410,8 @@ class RunJournal(BaseCallbackHandler): # Return failed events to buffer for retry on next flush self._buffer = batch + self._buffer - @staticmethod - def _on_flush_done(task: asyncio.Task) -> None: + def _on_flush_done(self, task: asyncio.Task) -> None: + self._pending_flush_tasks.discard(task) if task.cancelled(): return exc = task.exception() @@ -450,10 +456,17 @@ class RunJournal(BaseCallbackHandler): async def flush(self) -> None: """Force flush remaining buffer. Called in worker's finally block.""" - if self._buffer: - batch = self._buffer.copy() - self._buffer.clear() - await self._store.put_batch(batch) + if self._pending_flush_tasks: + await asyncio.gather(*tuple(self._pending_flush_tasks), return_exceptions=True) + + while self._buffer: + batch = self._buffer[: self._flush_threshold] + del self._buffer[: self._flush_threshold] + try: + await self._store.put_batch(batch) + except Exception: + self._buffer = batch + self._buffer + raise def get_completion_data(self) -> dict: """Return accumulated token and message data for run completion.""" diff --git a/backend/packages/harness/deerflow/runtime/runs/worker.py b/backend/packages/harness/deerflow/runtime/runs/worker.py index d2463f4de..5431c0e72 100644 --- a/backend/packages/harness/deerflow/runtime/runs/worker.py +++ b/backend/packages/harness/deerflow/runtime/runs/worker.py @@ -143,34 +143,7 @@ async def run_agent( content = human_msg.content journal.set_first_human_message(content if isinstance(content, str) else str(content)) - # Initialize RunJournal for event capture journal = None - if event_store is not None: - from deerflow.runtime.journal import RunJournal - - journal = RunJournal( - run_id=run_id, - thread_id=thread_id, - event_store=event_store, - track_token_usage=getattr(run_events_config, "track_token_usage", True), - ) - - # Write human_message event (model_dump format, aligned with checkpoint) - human_msg = _extract_human_message(graph_input) - if human_msg is not None: - msg_metadata = {} - if follow_up_to_run_id: - msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id - await event_store.put( - thread_id=thread_id, - run_id=run_id, - event_type="human_message", - category="message", - content=human_msg.model_dump(), - metadata=msg_metadata or None, - ) - content = human_msg.content - journal.set_first_human_message(content if isinstance(content, str) else str(content)) # Track whether "events" was requested but skipped if "events" in requested_modes: @@ -180,6 +153,38 @@ async def run_agent( ) try: + # Initialize RunJournal + write human_message event. + # These are inside the try block so any exception (e.g. a DB + # error writing the event) flows through the except/finally + # path that publishes an "end" event to the SSE bridge — + # otherwise a failure here would leave the stream hanging + # with no terminator. + if event_store is not None: + from deerflow.runtime.journal import RunJournal + + journal = RunJournal( + run_id=run_id, + thread_id=thread_id, + event_store=event_store, + track_token_usage=getattr(run_events_config, "track_token_usage", True), + ) + + human_msg = _extract_human_message(graph_input) + if human_msg is not None: + msg_metadata = {} + if follow_up_to_run_id: + msg_metadata["follow_up_to_run_id"] = follow_up_to_run_id + await event_store.put( + thread_id=thread_id, + run_id=run_id, + event_type="human_message", + category="message", + content=human_msg.model_dump(), + metadata=msg_metadata or None, + ) + content = human_msg.content + journal.set_first_human_message(content if isinstance(content, str) else str(content)) + # 1. Mark running await run_manager.set_status(run_id, RunStatus.running) @@ -363,12 +368,15 @@ async def run_agent( except Exception: logger.warning("Failed to flush journal for run %s", run_id, exc_info=True) - # Persist token usage + convenience fields to RunStore - completion = journal.get_completion_data() - await run_manager.update_run_completion(run_id, status=record.status.value, **completion) + try: + # Persist token usage + convenience fields to RunStore + completion = journal.get_completion_data() + await run_manager.update_run_completion(run_id, status=record.status.value, **completion) + except Exception: + logger.warning("Failed to persist run completion for %s (non-fatal)", run_id, exc_info=True) # Sync title from checkpoint to threads_meta.display_name - if checkpointer is not None: + if checkpointer is not None and thread_store is not None: try: ckpt_config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": ""}} ckpt_tuple = await checkpointer.aget_tuple(ckpt_config) @@ -381,11 +389,12 @@ async def run_agent( logger.debug("Failed to sync title for thread %s (non-fatal)", thread_id) # Update threads_meta status based on run outcome - try: - final_status = "idle" if record.status == RunStatus.success else record.status.value - await thread_store.update_status(thread_id, final_status) - except Exception: - logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id) + if thread_store is not None: + try: + final_status = "idle" if record.status == RunStatus.success else record.status.value + await thread_store.update_status(thread_id, final_status) + except Exception: + logger.debug("Failed to update thread_meta status for %s (non-fatal)", thread_id) await bridge.publish_end(run_id) asyncio.create_task(bridge.cleanup(run_id, delay=60))