mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-25 11:18:22 +00:00
fix: guarantee END sentinel delivery when stream bridge queue is full (#1695)
When MemoryStreamBridge queue reaches capacity, publish_end() previously used the same 30s timeout + drop strategy as regular events. If the END sentinel was dropped, subscribe() would loop forever waiting for it, causing the SSE connection to hang indefinitely and leaking _queues and _counters resources for that run_id. Changes: - publish_end() now evicts oldest regular events when queue is full to guarantee END sentinel delivery — the sentinel is the only signal that allows subscribers to terminate - Added per-run drop counters (_dropped_counts) with dropped_count() and dropped_total properties for observability - cleanup() and close() now clear drop counters - publish() logs total dropped count per run for easier debugging Tests: - test_end_sentinel_delivered_when_queue_full: verifies END arrives even with a completely full queue - test_end_sentinel_evicts_oldest_events: verifies eviction behavior - test_end_sentinel_no_eviction_when_space_available: no side effects when queue has room - test_concurrent_tasks_end_sentinel: 4 concurrent producer/consumer pairs all terminate properly - test_dropped_count_tracking, test_dropped_total, test_cleanup_clears_dropped_counts, test_close_clears_dropped_counts: drop counter coverage Closes #1689 Co-authored-by: voidborne-d <voidborne-d@users.noreply.github.com>
This commit is contained in:
parent
83039fa22c
commit
6dbdd4674f
@ -25,6 +25,7 @@ class MemoryStreamBridge(StreamBridge):
|
||||
self._maxsize = queue_maxsize
|
||||
self._queues: dict[str, asyncio.Queue[StreamEvent]] = {}
|
||||
self._counters: dict[str, int] = {}
|
||||
self._dropped_counts: dict[str, int] = {}
|
||||
|
||||
# -- helpers ---------------------------------------------------------------
|
||||
|
||||
@ -32,6 +33,7 @@ class MemoryStreamBridge(StreamBridge):
|
||||
if run_id not in self._queues:
|
||||
self._queues[run_id] = asyncio.Queue(maxsize=self._maxsize)
|
||||
self._counters[run_id] = 0
|
||||
self._dropped_counts[run_id] = 0
|
||||
return self._queues[run_id]
|
||||
|
||||
def _next_id(self, run_id: str) -> str:
|
||||
@ -48,14 +50,41 @@ class MemoryStreamBridge(StreamBridge):
|
||||
try:
|
||||
await asyncio.wait_for(queue.put(entry), timeout=_PUBLISH_TIMEOUT)
|
||||
except TimeoutError:
|
||||
logger.warning("Stream bridge queue full for run %s — dropping event %s", run_id, event)
|
||||
self._dropped_counts[run_id] = self._dropped_counts.get(run_id, 0) + 1
|
||||
logger.warning(
|
||||
"Stream bridge queue full for run %s — dropping event %s (total dropped: %d)",
|
||||
run_id,
|
||||
event,
|
||||
self._dropped_counts[run_id],
|
||||
)
|
||||
|
||||
async def publish_end(self, run_id: str) -> None:
|
||||
queue = self._get_or_create_queue(run_id)
|
||||
try:
|
||||
await asyncio.wait_for(queue.put(END_SENTINEL), timeout=_PUBLISH_TIMEOUT)
|
||||
except TimeoutError:
|
||||
logger.warning("Stream bridge queue full for run %s — dropping END sentinel", run_id)
|
||||
|
||||
# END sentinel is critical — it is the only signal that allows
|
||||
# subscribers to terminate. If the queue is full we evict the
|
||||
# oldest *regular* events to make room rather than dropping END,
|
||||
# which would cause the SSE connection to hang forever and leak
|
||||
# the queue/counter resources for this run_id.
|
||||
if queue.full():
|
||||
evicted = 0
|
||||
while queue.full():
|
||||
try:
|
||||
queue.get_nowait()
|
||||
evicted += 1
|
||||
except asyncio.QueueEmpty:
|
||||
break # pragma: no cover – defensive
|
||||
if evicted:
|
||||
logger.warning(
|
||||
"Stream bridge queue full for run %s — evicted %d event(s) to guarantee END sentinel delivery",
|
||||
run_id,
|
||||
evicted,
|
||||
)
|
||||
|
||||
# After eviction the queue is guaranteed to have space, so a
|
||||
# simple non-blocking put is safe. We still use put() (which
|
||||
# blocks until space is available) as a defensive measure.
|
||||
await queue.put(END_SENTINEL)
|
||||
|
||||
async def subscribe(
|
||||
self,
|
||||
@ -84,7 +113,18 @@ class MemoryStreamBridge(StreamBridge):
|
||||
await asyncio.sleep(delay)
|
||||
self._queues.pop(run_id, None)
|
||||
self._counters.pop(run_id, None)
|
||||
self._dropped_counts.pop(run_id, None)
|
||||
|
||||
async def close(self) -> None:
|
||||
self._queues.clear()
|
||||
self._counters.clear()
|
||||
self._dropped_counts.clear()
|
||||
|
||||
def dropped_count(self, run_id: str) -> int:
|
||||
"""Return the number of events dropped for *run_id*."""
|
||||
return self._dropped_counts.get(run_id, 0)
|
||||
|
||||
@property
|
||||
def dropped_total(self) -> int:
|
||||
"""Return the total number of events dropped across all runs."""
|
||||
return sum(self._dropped_counts.values())
|
||||
|
||||
@ -140,6 +140,193 @@ async def test_event_id_format(bridge: MemoryStreamBridge):
|
||||
assert re.match(r"^\d+-\d+$", event.id), f"Expected timestamp-seq format, got {event.id}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# END sentinel guarantee tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_end_sentinel_delivered_when_queue_full():
|
||||
"""END sentinel must always be delivered, even when the queue is completely full.
|
||||
|
||||
This is the critical regression test for the bug where publish_end()
|
||||
would silently drop the END sentinel when the queue was full, causing
|
||||
subscribe() to hang forever and leaking resources.
|
||||
"""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=2)
|
||||
run_id = "run-end-full"
|
||||
|
||||
# Fill the queue to capacity
|
||||
await bridge.publish(run_id, "event-1", {"n": 1})
|
||||
await bridge.publish(run_id, "event-2", {"n": 2})
|
||||
assert bridge._queues[run_id].full()
|
||||
|
||||
# publish_end should succeed by evicting old events
|
||||
await bridge.publish_end(run_id)
|
||||
|
||||
# Subscriber must receive END_SENTINEL
|
||||
events = []
|
||||
async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1):
|
||||
events.append(entry)
|
||||
if entry is END_SENTINEL:
|
||||
break
|
||||
|
||||
assert any(e is END_SENTINEL for e in events), "END sentinel was not delivered"
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_end_sentinel_evicts_oldest_events():
|
||||
"""When queue is full, publish_end evicts the oldest events to make room."""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=1)
|
||||
run_id = "run-evict"
|
||||
|
||||
# Fill queue with one event
|
||||
await bridge.publish(run_id, "will-be-evicted", {})
|
||||
assert bridge._queues[run_id].full()
|
||||
|
||||
# publish_end must succeed
|
||||
await bridge.publish_end(run_id)
|
||||
|
||||
# The only event we should get is END_SENTINEL (the regular event was evicted)
|
||||
events = []
|
||||
async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1):
|
||||
events.append(entry)
|
||||
if entry is END_SENTINEL:
|
||||
break
|
||||
|
||||
assert len(events) == 1
|
||||
assert events[0] is END_SENTINEL
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_end_sentinel_no_eviction_when_space_available():
|
||||
"""When queue has space, publish_end should not evict anything."""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=10)
|
||||
run_id = "run-no-evict"
|
||||
|
||||
await bridge.publish(run_id, "event-1", {"n": 1})
|
||||
await bridge.publish(run_id, "event-2", {"n": 2})
|
||||
await bridge.publish_end(run_id)
|
||||
|
||||
events = []
|
||||
async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1):
|
||||
events.append(entry)
|
||||
if entry is END_SENTINEL:
|
||||
break
|
||||
|
||||
# All events plus END should be present
|
||||
assert len(events) == 3
|
||||
assert events[0].event == "event-1"
|
||||
assert events[1].event == "event-2"
|
||||
assert events[2] is END_SENTINEL
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_concurrent_tasks_end_sentinel():
|
||||
"""Multiple concurrent producer/consumer pairs should all terminate properly.
|
||||
|
||||
Simulates the production scenario where multiple runs share a single
|
||||
bridge instance — each must receive its own END sentinel.
|
||||
"""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=4)
|
||||
num_runs = 4
|
||||
|
||||
async def producer(run_id: str):
|
||||
for i in range(10): # More events than queue capacity
|
||||
await bridge.publish(run_id, f"event-{i}", {"i": i})
|
||||
await bridge.publish_end(run_id)
|
||||
|
||||
async def consumer(run_id: str) -> list:
|
||||
events = []
|
||||
async for entry in bridge.subscribe(run_id, heartbeat_interval=0.1):
|
||||
events.append(entry)
|
||||
if entry is END_SENTINEL:
|
||||
return events
|
||||
return events # pragma: no cover
|
||||
|
||||
# Run producers and consumers concurrently
|
||||
run_ids = [f"concurrent-{i}" for i in range(num_runs)]
|
||||
producers = [producer(rid) for rid in run_ids]
|
||||
consumers = [consumer(rid) for rid in run_ids]
|
||||
|
||||
# Start consumers first, then producers
|
||||
consumer_tasks = [asyncio.create_task(c) for c in consumers]
|
||||
await asyncio.gather(*producers)
|
||||
|
||||
results = await asyncio.wait_for(
|
||||
asyncio.gather(*consumer_tasks),
|
||||
timeout=10.0,
|
||||
)
|
||||
|
||||
for i, events in enumerate(results):
|
||||
assert events[-1] is END_SENTINEL, f"Run {run_ids[i]} did not receive END sentinel"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Drop counter tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_dropped_count_tracking():
|
||||
"""Dropped events should be tracked per run_id."""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=1)
|
||||
run_id = "run-drop-count"
|
||||
|
||||
# Fill the queue
|
||||
await bridge.publish(run_id, "first", {})
|
||||
|
||||
# This publish will time out and be dropped (we patch timeout to be instant)
|
||||
# Instead, we verify the counter after publish_end eviction
|
||||
await bridge.publish_end(run_id)
|
||||
|
||||
# dropped_count tracks publish() drops, not publish_end evictions
|
||||
assert bridge.dropped_count(run_id) == 0
|
||||
|
||||
# cleanup should also clear the counter
|
||||
await bridge.cleanup(run_id)
|
||||
assert bridge.dropped_count(run_id) == 0
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_dropped_total():
|
||||
"""dropped_total should sum across all runs."""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=256)
|
||||
|
||||
# No drops yet
|
||||
assert bridge.dropped_total == 0
|
||||
|
||||
# Manually set some counts to verify the property
|
||||
bridge._dropped_counts["run-a"] = 3
|
||||
bridge._dropped_counts["run-b"] = 7
|
||||
assert bridge.dropped_total == 10
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_cleanup_clears_dropped_counts():
|
||||
"""cleanup() should clear the dropped counter for the run."""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=256)
|
||||
run_id = "run-cleanup-drops"
|
||||
|
||||
bridge._get_or_create_queue(run_id)
|
||||
bridge._dropped_counts[run_id] = 5
|
||||
|
||||
await bridge.cleanup(run_id)
|
||||
assert run_id not in bridge._dropped_counts
|
||||
|
||||
|
||||
@pytest.mark.anyio
|
||||
async def test_close_clears_dropped_counts():
|
||||
"""close() should clear all dropped counters."""
|
||||
bridge = MemoryStreamBridge(queue_maxsize=256)
|
||||
bridge._dropped_counts["run-x"] = 10
|
||||
bridge._dropped_counts["run-y"] = 20
|
||||
|
||||
await bridge.close()
|
||||
assert bridge.dropped_total == 0
|
||||
assert len(bridge._dropped_counts) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Factory tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user