mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-26 03:38:06 +00:00
- Fix spawn() zombie cell: clean up registry on start() failure - Fix shutdown(): cancel + await tasks that exceed graceful timeout - Fix _shutdown(): await mailbox.close() to release backend resources - Fix escalate directive: stop failing child before propagating to grandparent - Fix RedisMailbox.put(): wrap Redis errors in try/except, return False on failure - Fix retry.py: replace assert with proper raise for last_exc - Add put_batch() to Mailbox abstraction for single-roundtrip bulk enqueue - Add RedisMailbox.put_batch() with atomic Lua script for bounded queues - Add MailboxFullError exception type for semantic backpressure handling - Add redis>=7.4.0 dependency with public PyPI sources in uv.lock Tests added (31 total, up from 27): - test_middleware_on_restart_hook: verifies middleware.on_restart() on supervision restart - test_ask_propagates_actor_exception: ask() re-raises original exception type - test_ask_propagates_exception_while_supervised: exception propagates; root actor survives - test_ask_timeout_late_reply_no_exception: late reply after timeout is silent no-op - test_actor_backpressure.py: MailboxFullError + dead letter on full mailbox - test_actor_retry.py: ask_with_retry with exponential backoff - test_mailbox_redis.py: RedisMailbox put/get/batch/close - bench_actor_redis.py: RedisMailbox throughput benchmarks
90 lines
2.2 KiB
Python
90 lines
2.2 KiB
Python
import asyncio
|
|
|
|
import pytest
|
|
|
|
from deerflow.actor import Actor, ActorSystem, MailboxFullError
|
|
from deerflow.actor.mailbox import BACKPRESSURE_BLOCK, BACKPRESSURE_DROP_NEW, BACKPRESSURE_FAIL, MemoryMailbox
|
|
|
|
|
|
class SlowActor(Actor):
|
|
async def on_started(self):
|
|
self.count = 0
|
|
|
|
async def on_receive(self, message):
|
|
if message == 'inc':
|
|
await asyncio.sleep(0.01)
|
|
self.count += 1
|
|
return None
|
|
if message == 'get':
|
|
return self.count
|
|
return None
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_memory_mailbox_drop_new_policy_drops_tell_to_dead_letters():
|
|
system = ActorSystem('bp')
|
|
ref = await system.spawn(
|
|
SlowActor,
|
|
'slow',
|
|
mailbox=MemoryMailbox(1, backpressure_policy=BACKPRESSURE_DROP_NEW),
|
|
)
|
|
|
|
# Overfill quickly
|
|
for _ in range(20):
|
|
await ref.tell('inc')
|
|
|
|
await asyncio.sleep(0.4)
|
|
count = await ref.ask('get', timeout=2.0)
|
|
await system.shutdown()
|
|
|
|
# Some messages should be dropped under drop_new
|
|
assert count < 20
|
|
assert len(system.dead_letters) > 0
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_memory_mailbox_fail_policy_rejects_ask_when_full():
|
|
system = ActorSystem('bp')
|
|
ref = await system.spawn(
|
|
SlowActor,
|
|
'slow',
|
|
mailbox=MemoryMailbox(1, backpressure_policy=BACKPRESSURE_FAIL),
|
|
)
|
|
|
|
# Fill queue with tell first
|
|
await ref.tell('inc')
|
|
|
|
# Then ask may be rejected when queue still full
|
|
got_reject = False
|
|
for _ in range(30):
|
|
try:
|
|
await ref.ask('inc', timeout=0.02)
|
|
except MailboxFullError:
|
|
got_reject = True
|
|
break
|
|
except asyncio.TimeoutError:
|
|
pass
|
|
|
|
await system.shutdown()
|
|
assert got_reject
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_memory_mailbox_block_policy_eventually_accepts():
|
|
system = ActorSystem('bp')
|
|
ref = await system.spawn(
|
|
SlowActor,
|
|
'slow',
|
|
mailbox=MemoryMailbox(1, backpressure_policy=BACKPRESSURE_BLOCK),
|
|
)
|
|
|
|
for _ in range(10):
|
|
await ref.tell('inc')
|
|
|
|
await asyncio.sleep(0.25)
|
|
count = await ref.ask('get', timeout=2.0)
|
|
await system.shutdown()
|
|
|
|
# Block policy should avoid dropping on tell path
|
|
assert count == 10
|