From 8760937439e2722203f7d759414b667f20bbb285 Mon Sep 17 00:00:00 2001 From: DanielWalnut <45447813+hetaoBackend@users.noreply.github.com> Date: Tue, 14 Apr 2026 16:41:54 +0800 Subject: [PATCH] fix(memory): use asyncio.to_thread for blocking file I/O in aupdate_memory (#2220) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix(memory): use asyncio.to_thread for blocking file I/O in aupdate_memory `_finalize_update` performs synchronous blocking operations (os.mkdir, file open/write/rename/stat) that were called directly from the async `aupdate_memory` method, causing `BlockingError` from blockbuster when running under an ASGI server. Wrap the call with `asyncio.to_thread` to offload all blocking I/O to a thread pool. Co-Authored-By: Claude Sonnet 4.6 * fix(memory): use unique temp filename to prevent concurrent write collision `file_path.with_suffix(".tmp")` produces a fixed path — concurrent saves for the same agent (now possible after wrapping _finalize_update in asyncio.to_thread) would clobber the same temp file. Use a UUID-suffixed temp file so each write is isolated. Co-Authored-By: Claude Sonnet 4.6 * fix(memory): also offload _prepare_update_prompt to thread pool FileMemoryStorage.load() inside _prepare_update_prompt performs synchronous stat() and file read, blocking the event loop just like _finalize_update did. Wrap _prepare_update_prompt in asyncio.to_thread for the same reason. The async path now has no blocking file I/O on the event loop: to_thread(_prepare_update_prompt) → await model.ainvoke() → to_thread(_finalize_update) Co-Authored-By: Claude Sonnet 4.6 --------- Co-authored-by: Claude Sonnet 4.6 --- backend/packages/harness/deerflow/agents/memory/storage.py | 3 ++- backend/packages/harness/deerflow/agents/memory/updater.py | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/backend/packages/harness/deerflow/agents/memory/storage.py b/backend/packages/harness/deerflow/agents/memory/storage.py index 3d57d059b..0934e3ca2 100644 --- a/backend/packages/harness/deerflow/agents/memory/storage.py +++ b/backend/packages/harness/deerflow/agents/memory/storage.py @@ -4,6 +4,7 @@ import abc import json import logging import threading +import uuid from datetime import UTC, datetime from pathlib import Path from typing import Any @@ -144,7 +145,7 @@ class FileMemoryStorage(MemoryStorage): file_path.parent.mkdir(parents=True, exist_ok=True) memory_data["lastUpdated"] = utc_now_iso_z() - temp_path = file_path.with_suffix(".tmp") + temp_path = file_path.with_suffix(f".{uuid.uuid4().hex}.tmp") with open(temp_path, "w", encoding="utf-8") as f: json.dump(memory_data, f, indent=2, ensure_ascii=False) diff --git a/backend/packages/harness/deerflow/agents/memory/updater.py b/backend/packages/harness/deerflow/agents/memory/updater.py index 1ef32fb60..e70eeb102 100644 --- a/backend/packages/harness/deerflow/agents/memory/updater.py +++ b/backend/packages/harness/deerflow/agents/memory/updater.py @@ -394,7 +394,8 @@ class MemoryUpdater: ) -> bool: """Update memory asynchronously based on conversation messages.""" try: - prepared = self._prepare_update_prompt( + prepared = await asyncio.to_thread( + self._prepare_update_prompt, messages=messages, agent_name=agent_name, correction_detected=correction_detected, @@ -406,7 +407,8 @@ class MemoryUpdater: current_memory, prompt = prepared model = self._get_model() response = await model.ainvoke(prompt) - return self._finalize_update( + return await asyncio.to_thread( + self._finalize_update, current_memory=current_memory, response_content=response.content, thread_id=thread_id,