From 332fb18b34a7d52d19fd066926178d11aa2bde9c Mon Sep 17 00:00:00 2001 From: rayhpeng Date: Mon, 6 Apr 2026 11:09:42 +0800 Subject: [PATCH] refactor(gateway): move sanitize_log_param to app/gateway/utils.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract the log-injection sanitizer from routers/threads.py into a shared utils module and rename to sanitize_log_param (public API). Eliminates the reverse service → router import in services.py. Co-Authored-By: Claude Opus 4.6 (1M context) --- backend/app/gateway/routers/threads.py | 40 ++++++++++++-------------- backend/app/gateway/services.py | 6 ++-- backend/app/gateway/utils.py | 6 ++++ 3 files changed, 27 insertions(+), 25 deletions(-) create mode 100644 backend/app/gateway/utils.py diff --git a/backend/app/gateway/routers/threads.py b/backend/app/gateway/routers/threads.py index 3dbd6393b..59f5e6ea0 100644 --- a/backend/app/gateway/routers/threads.py +++ b/backend/app/gateway/routers/threads.py @@ -21,6 +21,7 @@ from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel, Field from app.gateway.deps import get_checkpointer, get_store +from app.gateway.utils import sanitize_log_param from deerflow.config.paths import Paths, get_paths from deerflow.runtime import serialize_channel_values @@ -35,11 +36,6 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/threads", tags=["threads"]) -def _sanitize_log_param(value: str) -> str: - """Strip control characters to prevent log injection.""" - return value.replace("\n", "").replace("\r", "").replace("\x00", "") - - # --------------------------------------------------------------------------- # Response / request models # --------------------------------------------------------------------------- @@ -141,13 +137,13 @@ def _delete_thread_data(thread_id: str, paths: Paths | None = None) -> ThreadDel raise HTTPException(status_code=422, detail=str(exc)) from exc except FileNotFoundError: # Not critical — thread data may not exist on disk - logger.debug("No local thread data to delete for %s", _sanitize_log_param(thread_id)) + logger.debug("No local thread data to delete for %s", sanitize_log_param(thread_id)) return ThreadDeleteResponse(success=True, message=f"No local data for {thread_id}") except Exception as exc: - logger.exception("Failed to delete thread data for %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to delete thread data for %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to delete local thread data.") from exc - logger.info("Deleted local thread data for %s", _sanitize_log_param(thread_id)) + logger.info("Deleted local thread data for %s", sanitize_log_param(thread_id)) return ThreadDeleteResponse(success=True, message=f"Deleted local thread data for {thread_id}") @@ -236,7 +232,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe try: await store.adelete(THREADS_NS, thread_id) except Exception: - logger.debug("Could not delete store record for thread %s (not critical)", _sanitize_log_param(thread_id)) + logger.debug("Could not delete store record for thread %s (not critical)", sanitize_log_param(thread_id)) # Remove checkpoints (best-effort) checkpointer = getattr(request.app.state, "checkpointer", None) @@ -245,7 +241,7 @@ async def delete_thread_data(thread_id: str, request: Request) -> ThreadDeleteRe if hasattr(checkpointer, "adelete_thread"): await checkpointer.adelete_thread(thread_id) except Exception: - logger.debug("Could not delete checkpoints for thread %s (not critical)", _sanitize_log_param(thread_id)) + logger.debug("Could not delete checkpoints for thread %s (not critical)", sanitize_log_param(thread_id)) return response @@ -289,7 +285,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe }, ) except Exception: - logger.exception("Failed to write thread %s to store", _sanitize_log_param(thread_id)) + logger.exception("Failed to write thread %s to store", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to create thread") # Write an empty checkpoint so state endpoints work immediately @@ -307,7 +303,7 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe } await checkpointer.aput(config, empty_checkpoint(), ckpt_metadata, {}) except Exception: - logger.exception("Failed to create checkpoint for thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to create checkpoint for thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to create thread") # Write thread_meta so the thread appears in /threads/search immediately @@ -322,9 +318,9 @@ async def create_thread(body: ThreadCreateRequest, request: Request) -> ThreadRe metadata=body.metadata, ) except Exception: - logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", _sanitize_log_param(thread_id)) + logger.debug("Failed to upsert thread_meta on create for %s (non-fatal)", sanitize_log_param(thread_id)) - logger.info("Thread created: %s", _sanitize_log_param(thread_id)) + logger.info("Thread created: %s", sanitize_log_param(thread_id)) return ThreadResponse( thread_id=thread_id, status="idle", @@ -391,7 +387,7 @@ async def patch_thread(thread_id: str, body: ThreadPatchRequest, request: Reques try: await _store_put(store, updated) except Exception: - logger.exception("Failed to patch thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to patch thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to update thread") return ThreadResponse( @@ -423,7 +419,7 @@ async def get_thread(thread_id: str, request: Request) -> ThreadResponse: try: checkpoint_tuple = await checkpointer.aget_tuple(config) except Exception: - logger.exception("Failed to get checkpoint for thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to get checkpoint for thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to get thread") if record is None and checkpoint_tuple is None: @@ -471,7 +467,7 @@ async def get_thread_state(thread_id: str, request: Request) -> ThreadStateRespo try: checkpoint_tuple = await checkpointer.aget_tuple(config) except Exception: - logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to get state for thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to get thread state") if checkpoint_tuple is None: @@ -533,7 +529,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re try: checkpoint_tuple = await checkpointer.aget_tuple(read_config) except Exception: - logger.exception("Failed to get state for thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to get state for thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to get thread state") if checkpoint_tuple is None: @@ -567,7 +563,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re try: new_config = await checkpointer.aput(write_config, checkpoint, metadata, {}) except Exception: - logger.exception("Failed to update state for thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to update state for thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to update thread state") new_checkpoint_id: str | None = None @@ -579,7 +575,7 @@ async def update_thread_state(thread_id: str, body: ThreadStateUpdateRequest, re try: await _store_upsert(store, thread_id, values={"title": body.values["title"]}) except Exception: - logger.debug("Failed to sync title to store for thread %s (non-fatal)", _sanitize_log_param(thread_id)) + logger.debug("Failed to sync title to store for thread %s (non-fatal)", sanitize_log_param(thread_id)) return ThreadStateResponse( values=serialize_channel_values(channel_values), @@ -613,7 +609,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request try: all_messages = await event_store.list_messages(thread_id, limit=10_000) except Exception: - logger.warning("Failed to load messages from event store for thread %s", _sanitize_log_param(thread_id), exc_info=True) + logger.warning("Failed to load messages from event store for thread %s", sanitize_log_param(thread_id), exc_info=True) all_messages = [] entries: list[HistoryEntry] = [] @@ -665,7 +661,7 @@ async def get_thread_history(thread_id: str, body: ThreadHistoryRequest, request ) ) except Exception: - logger.exception("Failed to get history for thread %s", _sanitize_log_param(thread_id)) + logger.exception("Failed to get history for thread %s", sanitize_log_param(thread_id)) raise HTTPException(status_code=500, detail="Failed to get thread history") return entries diff --git a/backend/app/gateway/services.py b/backend/app/gateway/services.py index 31833b822..d2f62a4d1 100644 --- a/backend/app/gateway/services.py +++ b/backend/app/gateway/services.py @@ -19,7 +19,7 @@ from fastapi import HTTPException, Request from langchain_core.messages import HumanMessage from app.gateway.deps import get_run_context, get_run_manager, get_run_store, get_stream_bridge -from app.gateway.routers.threads import _sanitize_log_param +from app.gateway.utils import sanitize_log_param from deerflow.runtime import ( END_SENTINEL, HEARTBEAT_SENTINEL, @@ -186,7 +186,7 @@ async def _upsert_thread_in_store(store, thread_id: str, metadata: dict | None) try: await _store_upsert(store, thread_id, metadata=metadata) except Exception: - logger.warning("Failed to upsert thread %s in store (non-fatal)", _sanitize_log_param(thread_id)) + logger.warning("Failed to upsert thread %s in store (non-fatal)", sanitize_log_param(thread_id)) async def _sync_thread_title_after_run( @@ -309,7 +309,7 @@ async def start_run( else: await run_ctx.thread_meta_repo.update_status(thread_id, "running") except Exception: - logger.warning("Failed to upsert thread_meta for %s (non-fatal)", _sanitize_log_param(thread_id)) + logger.warning("Failed to upsert thread_meta for %s (non-fatal)", sanitize_log_param(thread_id)) agent_factory = resolve_agent_factory(body.assistant_id) graph_input = normalize_input(body.input) diff --git a/backend/app/gateway/utils.py b/backend/app/gateway/utils.py new file mode 100644 index 000000000..8368d84fc --- /dev/null +++ b/backend/app/gateway/utils.py @@ -0,0 +1,6 @@ +"""Shared utility helpers for the Gateway layer.""" + + +def sanitize_log_param(value: str) -> str: + """Strip control characters to prevent log injection.""" + return value.replace("\n", "").replace("\r", "").replace("\x00", "")