From b0ccc228b42b3f04987280b31c078e06ccbd6945 Mon Sep 17 00:00:00 2001 From: Shu Yao Date: Thu, 5 Feb 2026 18:11:06 +0800 Subject: [PATCH] fix: websocket disconnect problem upgrade: allow set not log_output to avoid log every node output --- entity/configs/node/node.py | 12 ++++++++++++ frontend/src/pages/LaunchView.vue | 9 ++++++++- server/services/websocket_manager.py | 25 +++++++++++++++++++++++-- workflow/graph.py | 2 +- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/entity/configs/node/node.py b/entity/configs/node/node.py index e8e201bd..fec4fab5 100755 --- a/entity/configs/node/node.py +++ b/entity/configs/node/node.py @@ -65,6 +65,7 @@ class Node(BaseConfig): type: str description: str | None = None # keep_context: bool = False + log_output: bool = True context_window: int = 0 vars: Dict[str, Any] = field(default_factory=dict) config: BaseConfig | None = None @@ -118,6 +119,15 @@ class Node(BaseConfig): description="Number of context messages accessible during node execution. 0 means clear all context except messages with keep_message=True, -1 means unlimited, other values represent the number of context messages to keep besides those with keep_message=True.", # advance=True, ), + "log_output": ConfigFieldSpec( + name="log_output", + display_name="Log Output", + type_hint="bool", + required=False, + default=True, + advance=True, + description="Whether to log this node's output content. Set to false to avoid logging outputs.", + ), "config": ConfigFieldSpec( name="config", display_name="Node Configuration", @@ -170,6 +180,7 @@ class Node(BaseConfig): description = optional_str(mapping, "description", path) # keep_context = bool(mapping.get("keep_context", False)) + log_output = bool(mapping.get("log_output", True)) context_window = int(mapping.get("context_window", 0)) input_value = ensure_list(mapping.get("input")) output_value = ensure_list(mapping.get("output")) @@ -206,6 +217,7 @@ class Node(BaseConfig): id=node_id, type=node_type, description=description, + log_output=log_output, input=input_messages, output=formatted_output, # keep_context=keep_context, diff --git a/frontend/src/pages/LaunchView.vue b/frontend/src/pages/LaunchView.vue index 31399f04..a8683a54 100755 --- a/frontend/src/pages/LaunchView.vue +++ b/frontend/src/pages/LaunchView.vue @@ -844,6 +844,13 @@ const handleClickOutside = (event) => { // Add a dialogue entry const addDialogue = (name, message) => { + if (message === null || message === undefined) { + return + } + const text = typeof message === 'string' ? message : String(message) + if (!text.trim()) { + return + } let avatar if (nameToSpriteMap.value.has(name)) { avatar = nameToSpriteMap.value.get(name) @@ -857,7 +864,7 @@ const addDialogue = (name, message) => { chatMessages.value.push({ type: 'dialogue', name: name, - text: message, + text: text, avatar: avatar, isRight: isRight, timestamp: Date.now() diff --git a/server/services/websocket_manager.py b/server/services/websocket_manager.py index 049030e6..78cc5142 100755 --- a/server/services/websocket_manager.py +++ b/server/services/websocket_manager.py @@ -49,6 +49,8 @@ class WebSocketManager: ): self.active_connections: Dict[str, WebSocket] = {} self.connection_timestamps: Dict[str, float] = {} + self.send_locks: Dict[str, asyncio.Lock] = {} + self.loop: asyncio.AbstractEventLoop | None = None self.session_store = session_store or WorkflowSessionStore() self.session_controller = session_controller or SessionExecutionController(self.session_store) self.attachment_service = attachment_service or AttachmentService() @@ -65,10 +67,16 @@ class WebSocketManager: async def connect(self, websocket: WebSocket, session_id: Optional[str] = None) -> str: await websocket.accept() + if self.loop is None: + try: + self.loop = asyncio.get_running_loop() + except RuntimeError: + self.loop = None if not session_id: session_id = str(uuid.uuid4()) self.active_connections[session_id] = websocket self.connection_timestamps[session_id] = time.time() + self.send_locks[session_id] = asyncio.Lock() logging.info("WebSocket connected: %s", session_id) await self.send_message( session_id, @@ -90,6 +98,8 @@ class WebSocketManager: del self.active_connections[session_id] if session_id in self.connection_timestamps: del self.connection_timestamps[session_id] + if session_id in self.send_locks: + del self.send_locks[session_id] self.session_controller.cleanup_session(session_id) remaining_session = self.session_store.get_session(session_id) if remaining_session and remaining_session.executor is None: @@ -101,7 +111,12 @@ class WebSocketManager: if session_id in self.active_connections: websocket = self.active_connections[session_id] try: - await websocket.send_text(_encode_ws_message(message)) + lock = self.send_locks.get(session_id) + if lock is None: + await websocket.send_text(_encode_ws_message(message)) + else: + async with lock: + await websocket.send_text(_encode_ws_message(message)) except Exception as exc: traceback.print_exc() logging.error("Failed to send message to %s: %s", session_id, exc) @@ -115,7 +130,13 @@ class WebSocketManager: else: asyncio.run(self.send_message(session_id, message)) except RuntimeError: - asyncio.run(self.send_message(session_id, message)) + if self.loop and self.loop.is_running(): + asyncio.run_coroutine_threadsafe( + self.send_message(session_id, message), + self.loop, + ) + else: + asyncio.run(self.send_message(session_id, message)) async def broadcast(self, message: Dict[str, Any]) -> None: for session_id in list(self.active_connections.keys()): diff --git a/workflow/graph.py b/workflow/graph.py index cf3e23eb..35675ba9 100755 --- a/workflow/graph.py +++ b/workflow/graph.py @@ -626,7 +626,7 @@ class GraphExecutor: output_role = "none" output_source = None - self.log_manager.record_node_end(node.id, output_text, { + self.log_manager.record_node_end(node.id, output_text if node.log_output else "", { "output_size": len(output_text), "output_count": len(output_messages), "output_role": output_role,