mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-26 03:38:06 +00:00
- Fix spawn() zombie cell: clean up registry on start() failure - Fix shutdown(): cancel + await tasks that exceed graceful timeout - Fix _shutdown(): await mailbox.close() to release backend resources - Fix escalate directive: stop failing child before propagating to grandparent - Fix RedisMailbox.put(): wrap Redis errors in try/except, return False on failure - Fix retry.py: replace assert with proper raise for last_exc - Add put_batch() to Mailbox abstraction for single-roundtrip bulk enqueue - Add RedisMailbox.put_batch() with atomic Lua script for bounded queues - Add MailboxFullError exception type for semantic backpressure handling - Add redis>=7.4.0 dependency with public PyPI sources in uv.lock Tests added (31 total, up from 27): - test_middleware_on_restart_hook: verifies middleware.on_restart() on supervision restart - test_ask_propagates_actor_exception: ask() re-raises original exception type - test_ask_propagates_exception_while_supervised: exception propagates; root actor survives - test_ask_timeout_late_reply_no_exception: late reply after timeout is silent no-op - test_actor_backpressure.py: MailboxFullError + dead letter on full mailbox - test_actor_retry.py: ask_with_retry with exponential backoff - test_mailbox_redis.py: RedisMailbox put/get/batch/close - bench_actor_redis.py: RedisMailbox throughput benchmarks
84 lines
2.5 KiB
Python
84 lines
2.5 KiB
Python
import asyncio
|
|
|
|
import pytest
|
|
|
|
redis = pytest.importorskip("redis.asyncio")
|
|
|
|
from deerflow.actor.mailbox_redis import RedisMailbox
|
|
from deerflow.actor.ref import _Envelope, _Stop
|
|
|
|
|
|
pytestmark = pytest.mark.anyio
|
|
|
|
|
|
async def _make_mailbox(queue_name: str, *, maxlen: int = 0) -> RedisMailbox:
|
|
client = redis.Redis(host="127.0.0.1", port=6379, decode_responses=False)
|
|
await client.ping()
|
|
await client.delete(queue_name)
|
|
mailbox = RedisMailbox(client.connection_pool, queue_name, maxlen=maxlen, brpop_timeout=0.2)
|
|
return mailbox
|
|
|
|
|
|
async def test_roundtrip_envelope_and_stop():
|
|
queue = "deerflow:test:redis-mailbox:roundtrip"
|
|
mailbox = await _make_mailbox(queue)
|
|
try:
|
|
msg = _Envelope(payload={"k": "v"}, correlation_id="c1", reply_to="sysA")
|
|
ok = await mailbox.put(msg)
|
|
assert ok is True
|
|
|
|
got = await mailbox.get()
|
|
assert isinstance(got, _Envelope)
|
|
assert got.payload == {"k": "v"}
|
|
assert got.correlation_id == "c1"
|
|
assert got.reply_to == "sysA"
|
|
|
|
ok = await mailbox.put(_Stop())
|
|
assert ok is True
|
|
stop = await mailbox.get()
|
|
assert isinstance(stop, _Stop)
|
|
finally:
|
|
await mailbox.close()
|
|
|
|
|
|
async def test_bounded_queue_rejects_when_full():
|
|
queue = "deerflow:test:redis-mailbox:bounded"
|
|
mailbox = await _make_mailbox(queue, maxlen=1)
|
|
try:
|
|
assert await mailbox.put(_Envelope("m1")) is True
|
|
assert await mailbox.put(_Envelope("m2")) is False
|
|
finally:
|
|
await mailbox.close()
|
|
|
|
|
|
async def test_put_nowait_and_get_nowait_contract():
|
|
queue = "deerflow:test:redis-mailbox:nowait"
|
|
mailbox = await _make_mailbox(queue)
|
|
try:
|
|
assert mailbox.put_nowait(_Envelope("x")) is False
|
|
with pytest.raises(Exception, match="does not support synchronous get_nowait"):
|
|
mailbox.get_nowait()
|
|
finally:
|
|
await mailbox.close()
|
|
|
|
|
|
async def test_system_enqueue_fallback_with_async_mailbox():
|
|
from deerflow.actor import Actor, ActorSystem
|
|
|
|
class EchoActor(Actor):
|
|
async def on_receive(self, message):
|
|
return message
|
|
|
|
queue = "deerflow:test:redis-mailbox:system-fallback"
|
|
mailbox = await _make_mailbox(queue)
|
|
|
|
system = ActorSystem("redis-test")
|
|
ref = await system.spawn(EchoActor, "echo", mailbox=mailbox)
|
|
try:
|
|
# This exercises _ActorCell.enqueue fallback path:
|
|
# put_nowait() -> False, then await put() -> True
|
|
result = await ref.ask("hello", timeout=3.0)
|
|
assert result == "hello"
|
|
finally:
|
|
await system.shutdown()
|