From 6dbdd4674f25bc5c7d51fc863ba25ecee7de7b18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?d=20=F0=9F=94=B9?= Date: Fri, 3 Apr 2026 20:12:30 +0800 Subject: [PATCH] fix: guarantee END sentinel delivery when stream bridge queue is full (#1695) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When MemoryStreamBridge queue reaches capacity, publish_end() previously used the same 30s timeout + drop strategy as regular events. If the END sentinel was dropped, subscribe() would loop forever waiting for it, causing the SSE connection to hang indefinitely and leaking _queues and _counters resources for that run_id. Changes: - publish_end() now evicts oldest regular events when queue is full to guarantee END sentinel delivery — the sentinel is the only signal that allows subscribers to terminate - Added per-run drop counters (_dropped_counts) with dropped_count() and dropped_total properties for observability - cleanup() and close() now clear drop counters - publish() logs total dropped count per run for easier debugging Tests: - test_end_sentinel_delivered_when_queue_full: verifies END arrives even with a completely full queue - test_end_sentinel_evicts_oldest_events: verifies eviction behavior - test_end_sentinel_no_eviction_when_space_available: no side effects when queue has room - test_concurrent_tasks_end_sentinel: 4 concurrent producer/consumer pairs all terminate properly - test_dropped_count_tracking, test_dropped_total, test_cleanup_clears_dropped_counts, test_close_clears_dropped_counts: drop counter coverage Closes #1689 Co-authored-by: voidborne-d --- .../deerflow/runtime/stream_bridge/memory.py | 50 ++++- backend/tests/test_stream_bridge.py | 187 ++++++++++++++++++ 2 files changed, 232 insertions(+), 5 deletions(-) diff --git a/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py b/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py index 5056e6ca3..45aff1349 100644 --- a/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py +++ b/backend/packages/harness/deerflow/runtime/stream_bridge/memory.py @@ -25,6 +25,7 @@ class MemoryStreamBridge(StreamBridge): self._maxsize = queue_maxsize self._queues: dict[str, asyncio.Queue[StreamEvent]] = {} self._counters: dict[str, int] = {} + self._dropped_counts: dict[str, int] = {} # -- helpers --------------------------------------------------------------- @@ -32,6 +33,7 @@ class MemoryStreamBridge(StreamBridge): if run_id not in self._queues: self._queues[run_id] = asyncio.Queue(maxsize=self._maxsize) self._counters[run_id] = 0 + self._dropped_counts[run_id] = 0 return self._queues[run_id] def _next_id(self, run_id: str) -> str: @@ -48,14 +50,41 @@ class MemoryStreamBridge(StreamBridge): try: await asyncio.wait_for(queue.put(entry), timeout=_PUBLISH_TIMEOUT) except TimeoutError: - logger.warning("Stream bridge queue full for run %s — dropping event %s", run_id, event) + self._dropped_counts[run_id] = self._dropped_counts.get(run_id, 0) + 1 + logger.warning( + "Stream bridge queue full for run %s — dropping event %s (total dropped: %d)", + run_id, + event, + self._dropped_counts[run_id], + ) async def publish_end(self, run_id: str) -> None: queue = self._get_or_create_queue(run_id) - try: - await asyncio.wait_for(queue.put(END_SENTINEL), timeout=_PUBLISH_TIMEOUT) - except TimeoutError: - logger.warning("Stream bridge queue full for run %s — dropping END sentinel", run_id) + + # END sentinel is critical — it is the only signal that allows + # subscribers to terminate. If the queue is full we evict the + # oldest *regular* events to make room rather than dropping END, + # which would cause the SSE connection to hang forever and leak + # the queue/counter resources for this run_id. + if queue.full(): + evicted = 0 + while queue.full(): + try: + queue.get_nowait() + evicted += 1 + except asyncio.QueueEmpty: + break # pragma: no cover – defensive + if evicted: + logger.warning( + "Stream bridge queue full for run %s — evicted %d event(s) to guarantee END sentinel delivery", + run_id, + evicted, + ) + + # After eviction the queue is guaranteed to have space, so a + # simple non-blocking put is safe. We still use put() (which + # blocks until space is available) as a defensive measure. + await queue.put(END_SENTINEL) async def subscribe( self, @@ -84,7 +113,18 @@ class MemoryStreamBridge(StreamBridge): await asyncio.sleep(delay) self._queues.pop(run_id, None) self._counters.pop(run_id, None) + self._dropped_counts.pop(run_id, None) async def close(self) -> None: self._queues.clear() self._counters.clear() + self._dropped_counts.clear() + + def dropped_count(self, run_id: str) -> int: + """Return the number of events dropped for *run_id*.""" + return self._dropped_counts.get(run_id, 0) + + @property + def dropped_total(self) -> int: + """Return the total number of events dropped across all runs.""" + return sum(self._dropped_counts.values()) diff --git a/backend/tests/test_stream_bridge.py b/backend/tests/test_stream_bridge.py index 34d2e2811..f9aee4867 100644 --- a/backend/tests/test_stream_bridge.py +++ b/backend/tests/test_stream_bridge.py @@ -140,6 +140,193 @@ async def test_event_id_format(bridge: MemoryStreamBridge): assert re.match(r"^\d+-\d+$", event.id), f"Expected timestamp-seq format, got {event.id}" +# --------------------------------------------------------------------------- +# END sentinel guarantee tests +# --------------------------------------------------------------------------- + + +@pytest.mark.anyio +async def test_end_sentinel_delivered_when_queue_full(): + """END sentinel must always be delivered, even when the queue is completely full. + + This is the critical regression test for the bug where publish_end() + would silently drop the END sentinel when the queue was full, causing + subscribe() to hang forever and leaking resources. + """ + bridge = MemoryStreamBridge(queue_maxsize=2) + run_id = "run-end-full" + + # Fill the queue to capacity + await bridge.publish(run_id, "event-1", {"n": 1}) + await bridge.publish(run_id, "event-2", {"n": 2}) + assert bridge._queues[run_id].full() + + # publish_end should succeed by evicting old events + await bridge.publish_end(run_id) + + # Subscriber must receive END_SENTINEL + events = [] + async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1): + events.append(entry) + if entry is END_SENTINEL: + break + + assert any(e is END_SENTINEL for e in events), "END sentinel was not delivered" + + +@pytest.mark.anyio +async def test_end_sentinel_evicts_oldest_events(): + """When queue is full, publish_end evicts the oldest events to make room.""" + bridge = MemoryStreamBridge(queue_maxsize=1) + run_id = "run-evict" + + # Fill queue with one event + await bridge.publish(run_id, "will-be-evicted", {}) + assert bridge._queues[run_id].full() + + # publish_end must succeed + await bridge.publish_end(run_id) + + # The only event we should get is END_SENTINEL (the regular event was evicted) + events = [] + async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1): + events.append(entry) + if entry is END_SENTINEL: + break + + assert len(events) == 1 + assert events[0] is END_SENTINEL + + +@pytest.mark.anyio +async def test_end_sentinel_no_eviction_when_space_available(): + """When queue has space, publish_end should not evict anything.""" + bridge = MemoryStreamBridge(queue_maxsize=10) + run_id = "run-no-evict" + + await bridge.publish(run_id, "event-1", {"n": 1}) + await bridge.publish(run_id, "event-2", {"n": 2}) + await bridge.publish_end(run_id) + + events = [] + async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1): + events.append(entry) + if entry is END_SENTINEL: + break + + # All events plus END should be present + assert len(events) == 3 + assert events[0].event == "event-1" + assert events[1].event == "event-2" + assert events[2] is END_SENTINEL + + +@pytest.mark.anyio +async def test_concurrent_tasks_end_sentinel(): + """Multiple concurrent producer/consumer pairs should all terminate properly. + + Simulates the production scenario where multiple runs share a single + bridge instance — each must receive its own END sentinel. + """ + bridge = MemoryStreamBridge(queue_maxsize=4) + num_runs = 4 + + async def producer(run_id: str): + for i in range(10): # More events than queue capacity + await bridge.publish(run_id, f"event-{i}", {"i": i}) + await bridge.publish_end(run_id) + + async def consumer(run_id: str) -> list: + events = [] + async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1): + events.append(entry) + if entry is END_SENTINEL: + return events + return events # pragma: no cover + + # Run producers and consumers concurrently + run_ids = [f"concurrent-{i}" for i in range(num_runs)] + producers = [producer(rid) for rid in run_ids] + consumers = [consumer(rid) for rid in run_ids] + + # Start consumers first, then producers + consumer_tasks = [asyncio.create_task(c) for c in consumers] + await asyncio.gather(*producers) + + results = await asyncio.wait_for( + asyncio.gather(*consumer_tasks), + timeout=10.0, + ) + + for i, events in enumerate(results): + assert events[-1] is END_SENTINEL, f"Run {run_ids[i]} did not receive END sentinel" + + +# --------------------------------------------------------------------------- +# Drop counter tests +# --------------------------------------------------------------------------- + + +@pytest.mark.anyio +async def test_dropped_count_tracking(): + """Dropped events should be tracked per run_id.""" + bridge = MemoryStreamBridge(queue_maxsize=1) + run_id = "run-drop-count" + + # Fill the queue + await bridge.publish(run_id, "first", {}) + + # This publish will time out and be dropped (we patch timeout to be instant) + # Instead, we verify the counter after publish_end eviction + await bridge.publish_end(run_id) + + # dropped_count tracks publish() drops, not publish_end evictions + assert bridge.dropped_count(run_id) == 0 + + # cleanup should also clear the counter + await bridge.cleanup(run_id) + assert bridge.dropped_count(run_id) == 0 + + +@pytest.mark.anyio +async def test_dropped_total(): + """dropped_total should sum across all runs.""" + bridge = MemoryStreamBridge(queue_maxsize=256) + + # No drops yet + assert bridge.dropped_total == 0 + + # Manually set some counts to verify the property + bridge._dropped_counts["run-a"] = 3 + bridge._dropped_counts["run-b"] = 7 + assert bridge.dropped_total == 10 + + +@pytest.mark.anyio +async def test_cleanup_clears_dropped_counts(): + """cleanup() should clear the dropped counter for the run.""" + bridge = MemoryStreamBridge(queue_maxsize=256) + run_id = "run-cleanup-drops" + + bridge._get_or_create_queue(run_id) + bridge._dropped_counts[run_id] = 5 + + await bridge.cleanup(run_id) + assert run_id not in bridge._dropped_counts + + +@pytest.mark.anyio +async def test_close_clears_dropped_counts(): + """close() should clear all dropped counters.""" + bridge = MemoryStreamBridge(queue_maxsize=256) + bridge._dropped_counts["run-x"] = 10 + bridge._dropped_counts["run-y"] = 20 + + await bridge.close() + assert bridge.dropped_total == 0 + assert len(bridge._dropped_counts) == 0 + + # --------------------------------------------------------------------------- # Factory tests # ---------------------------------------------------------------------------