deer-flow/backend/tests/test_mailbox_redis.py
greatmengqi 228a2a66e3 fix(actor): harden lifecycle, supervision, Redis mailbox, and add comprehensive tests
- Fix spawn() zombie cell: clean up registry on start() failure
- Fix shutdown(): cancel + await tasks that exceed graceful timeout
- Fix _shutdown(): await mailbox.close() to release backend resources
- Fix escalate directive: stop failing child before propagating to grandparent
- Fix RedisMailbox.put(): wrap Redis errors in try/except, return False on failure
- Fix retry.py: replace assert with proper raise for last_exc
- Add put_batch() to Mailbox abstraction for single-roundtrip bulk enqueue
- Add RedisMailbox.put_batch() with atomic Lua script for bounded queues
- Add MailboxFullError exception type for semantic backpressure handling
- Add redis>=7.4.0 dependency with public PyPI sources in uv.lock

Tests added (31 total, up from 27):
- test_middleware_on_restart_hook: verifies middleware.on_restart() on supervision restart
- test_ask_propagates_actor_exception: ask() re-raises original exception type
- test_ask_propagates_exception_while_supervised: exception propagates; root actor survives
- test_ask_timeout_late_reply_no_exception: late reply after timeout is silent no-op
- test_actor_backpressure.py: MailboxFullError + dead letter on full mailbox
- test_actor_retry.py: ask_with_retry with exponential backoff
- test_mailbox_redis.py: RedisMailbox put/get/batch/close
- bench_actor_redis.py: RedisMailbox throughput benchmarks
2026-03-31 10:09:05 +08:00

84 lines
2.5 KiB
Python

import asyncio
import pytest
redis = pytest.importorskip("redis.asyncio")
from deerflow.actor.mailbox_redis import RedisMailbox
from deerflow.actor.ref import _Envelope, _Stop
pytestmark = pytest.mark.anyio
async def _make_mailbox(queue_name: str, *, maxlen: int = 0) -> RedisMailbox:
client = redis.Redis(host="127.0.0.1", port=6379, decode_responses=False)
await client.ping()
await client.delete(queue_name)
mailbox = RedisMailbox(client.connection_pool, queue_name, maxlen=maxlen, brpop_timeout=0.2)
return mailbox
async def test_roundtrip_envelope_and_stop():
queue = "deerflow:test:redis-mailbox:roundtrip"
mailbox = await _make_mailbox(queue)
try:
msg = _Envelope(payload={"k": "v"}, correlation_id="c1", reply_to="sysA")
ok = await mailbox.put(msg)
assert ok is True
got = await mailbox.get()
assert isinstance(got, _Envelope)
assert got.payload == {"k": "v"}
assert got.correlation_id == "c1"
assert got.reply_to == "sysA"
ok = await mailbox.put(_Stop())
assert ok is True
stop = await mailbox.get()
assert isinstance(stop, _Stop)
finally:
await mailbox.close()
async def test_bounded_queue_rejects_when_full():
queue = "deerflow:test:redis-mailbox:bounded"
mailbox = await _make_mailbox(queue, maxlen=1)
try:
assert await mailbox.put(_Envelope("m1")) is True
assert await mailbox.put(_Envelope("m2")) is False
finally:
await mailbox.close()
async def test_put_nowait_and_get_nowait_contract():
queue = "deerflow:test:redis-mailbox:nowait"
mailbox = await _make_mailbox(queue)
try:
assert mailbox.put_nowait(_Envelope("x")) is False
with pytest.raises(Exception, match="does not support synchronous get_nowait"):
mailbox.get_nowait()
finally:
await mailbox.close()
async def test_system_enqueue_fallback_with_async_mailbox():
from deerflow.actor import Actor, ActorSystem
class EchoActor(Actor):
async def on_receive(self, message):
return message
queue = "deerflow:test:redis-mailbox:system-fallback"
mailbox = await _make_mailbox(queue)
system = ActorSystem("redis-test")
ref = await system.spawn(EchoActor, "echo", mailbox=mailbox)
try:
# This exercises _ActorCell.enqueue fallback path:
# put_nowait() -> False, then await put() -> True
result = await ref.ask("hello", timeout=3.0)
assert result == "hello"
finally:
await system.shutdown()