mirror of
https://github.com/bytedance/deer-flow.git
synced 2026-04-26 03:38:06 +00:00
- Fix spawn() zombie cell: clean up registry on start() failure - Fix shutdown(): cancel + await tasks that exceed graceful timeout - Fix _shutdown(): await mailbox.close() to release backend resources - Fix escalate directive: stop failing child before propagating to grandparent - Fix RedisMailbox.put(): wrap Redis errors in try/except, return False on failure - Fix retry.py: replace assert with proper raise for last_exc - Add put_batch() to Mailbox abstraction for single-roundtrip bulk enqueue - Add RedisMailbox.put_batch() with atomic Lua script for bounded queues - Add MailboxFullError exception type for semantic backpressure handling - Add redis>=7.4.0 dependency with public PyPI sources in uv.lock Tests added (31 total, up from 27): - test_middleware_on_restart_hook: verifies middleware.on_restart() on supervision restart - test_ask_propagates_actor_exception: ask() re-raises original exception type - test_ask_propagates_exception_while_supervised: exception propagates; root actor survives - test_ask_timeout_late_reply_no_exception: late reply after timeout is silent no-op - test_actor_backpressure.py: MailboxFullError + dead letter on full mailbox - test_actor_retry.py: ask_with_retry with exponential backoff - test_mailbox_redis.py: RedisMailbox put/get/batch/close - bench_actor_redis.py: RedisMailbox throughput benchmarks
269 lines
7.5 KiB
Python
269 lines
7.5 KiB
Python
"""Actor framework benchmarks — throughput, latency, concurrency."""
|
||
|
||
import asyncio
|
||
import time
|
||
|
||
from deerflow.actor import Actor, ActorSystem, Middleware
|
||
|
||
|
||
class NoopActor(Actor):
|
||
async def on_receive(self, message):
|
||
return message
|
||
|
||
|
||
class CounterActor(Actor):
|
||
async def on_started(self):
|
||
self.count = 0
|
||
|
||
async def on_receive(self, message):
|
||
if message == "inc":
|
||
self.count += 1
|
||
return self.count
|
||
if message == "get":
|
||
return self.count
|
||
return self.count
|
||
|
||
|
||
class ChainActor(Actor):
|
||
"""Forwards message to next actor in chain."""
|
||
next_ref = None
|
||
|
||
async def on_receive(self, message):
|
||
if self.next_ref is not None:
|
||
return await self.next_ref.ask(message)
|
||
return message
|
||
|
||
|
||
class ComputeActor(Actor):
|
||
"""Simulates CPU work via thread pool."""
|
||
async def on_receive(self, message):
|
||
def fib(n):
|
||
a, b = 0, 1
|
||
for _ in range(n):
|
||
a, b = b, a + b
|
||
return a
|
||
return await self.context.run_in_executor(fib, message)
|
||
|
||
|
||
class CountMiddleware(Middleware):
|
||
def __init__(self):
|
||
self.count = 0
|
||
|
||
async def on_receive(self, ctx, message, next_fn):
|
||
self.count += 1
|
||
return await next_fn(ctx, message)
|
||
|
||
|
||
def fmt(n):
|
||
if n >= 1_000_000:
|
||
return f"{n/1_000_000:.1f}M"
|
||
if n >= 1_000:
|
||
return f"{n/1_000:.0f}K"
|
||
return str(n)
|
||
|
||
|
||
async def bench_tell_throughput(n=100_000):
|
||
"""Measure tell (fire-and-forget) throughput."""
|
||
system = ActorSystem("bench")
|
||
ref = await system.spawn(CounterActor, "counter", mailbox_size=n + 10)
|
||
|
||
start = time.perf_counter()
|
||
for _ in range(n):
|
||
await ref.tell("inc")
|
||
# Wait for all messages to be processed
|
||
count = await ref.ask("get", timeout=30.0)
|
||
if count != n:
|
||
print(f" warning: expected {n} processed, got {count}")
|
||
elapsed = time.perf_counter() - start
|
||
|
||
await system.shutdown()
|
||
rate = n / elapsed
|
||
print(f" tell throughput: {fmt(n)} msgs in {elapsed:.2f}s = {fmt(int(rate))}/s")
|
||
|
||
|
||
async def bench_ask_throughput(n=50_000):
|
||
"""Measure ask (request-response) throughput."""
|
||
system = ActorSystem("bench")
|
||
ref = await system.spawn(NoopActor, "echo")
|
||
|
||
start = time.perf_counter()
|
||
for _ in range(n):
|
||
await ref.ask("ping")
|
||
elapsed = time.perf_counter() - start
|
||
|
||
await system.shutdown()
|
||
rate = n / elapsed
|
||
print(f" ask throughput: {fmt(n)} msgs in {elapsed:.2f}s = {fmt(int(rate))}/s")
|
||
|
||
|
||
async def bench_ask_latency(n=10_000):
|
||
"""Measure ask round-trip latency percentiles."""
|
||
system = ActorSystem("bench")
|
||
ref = await system.spawn(NoopActor, "echo")
|
||
|
||
# Warmup
|
||
for _ in range(100):
|
||
await ref.ask("warmup")
|
||
|
||
latencies = []
|
||
for _ in range(n):
|
||
t0 = time.perf_counter()
|
||
await ref.ask("ping")
|
||
latencies.append((time.perf_counter() - t0) * 1_000_000) # microseconds
|
||
|
||
await system.shutdown()
|
||
latencies.sort()
|
||
p50 = latencies[len(latencies) // 2]
|
||
p99 = latencies[int(len(latencies) * 0.99)]
|
||
p999 = latencies[int(len(latencies) * 0.999)]
|
||
print(f" ask latency: p50={p50:.0f}µs p99={p99:.0f}µs p99.9={p999:.0f}µs")
|
||
|
||
|
||
async def bench_concurrent_actors(num_actors=1000, msgs_per_actor=100):
|
||
"""Measure throughput with many concurrent actors."""
|
||
system = ActorSystem("bench")
|
||
refs = []
|
||
for i in range(num_actors):
|
||
refs.append(await system.spawn(CounterActor, f"a{i}", mailbox_size=msgs_per_actor + 10))
|
||
|
||
start = time.perf_counter()
|
||
|
||
async def send_batch(ref, n):
|
||
for i in range(n):
|
||
await ref.tell("inc")
|
||
# Yield control every 50 msgs so actor loops can drain
|
||
if i % 50 == 49:
|
||
await asyncio.sleep(0)
|
||
return await ref.ask("get", timeout=30.0)
|
||
|
||
results = await asyncio.gather(*[send_batch(r, msgs_per_actor) for r in refs])
|
||
elapsed = time.perf_counter() - start
|
||
|
||
total = num_actors * msgs_per_actor
|
||
delivered = sum(results)
|
||
rate = total / elapsed
|
||
loss = total - delivered
|
||
print(f" {num_actors} actors × {msgs_per_actor} msgs: {fmt(total)} in {elapsed:.2f}s = {fmt(int(rate))}/s (loss: {loss})")
|
||
|
||
await system.shutdown()
|
||
|
||
|
||
async def bench_actor_chain(depth=100):
|
||
"""Measure ask latency through a chain of actors (hop overhead)."""
|
||
system = ActorSystem("bench")
|
||
refs = []
|
||
for i in range(depth):
|
||
refs.append(await system.spawn(ChainActor, f"c{i}"))
|
||
# Link chain: c0 → c1 → ... → c99
|
||
for i in range(depth - 1):
|
||
refs[i]._cell.actor.next_ref = refs[i + 1]
|
||
|
||
start = time.perf_counter()
|
||
result = await refs[0].ask("ping", timeout=30.0)
|
||
elapsed = time.perf_counter() - start
|
||
|
||
assert result == "ping"
|
||
per_hop = elapsed / depth * 1_000_000 # µs
|
||
print(f" chain {depth} hops: {elapsed*1000:.1f}ms total, {per_hop:.0f}µs/hop")
|
||
|
||
await system.shutdown()
|
||
|
||
|
||
async def bench_middleware_overhead(n=50_000):
|
||
"""Measure overhead of middleware pipeline."""
|
||
mw = CountMiddleware()
|
||
|
||
system_plain = ActorSystem("plain")
|
||
ref_plain = await system_plain.spawn(NoopActor, "echo")
|
||
|
||
system_mw = ActorSystem("mw")
|
||
ref_mw = await system_mw.spawn(NoopActor, "echo", middlewares=[mw])
|
||
|
||
# Plain
|
||
t0 = time.perf_counter()
|
||
for _ in range(n):
|
||
await ref_plain.ask("p")
|
||
plain_elapsed = time.perf_counter() - t0
|
||
|
||
# With middleware
|
||
t0 = time.perf_counter()
|
||
for _ in range(n):
|
||
await ref_mw.ask("p")
|
||
mw_elapsed = time.perf_counter() - t0
|
||
|
||
overhead = ((mw_elapsed - plain_elapsed) / plain_elapsed) * 100
|
||
print(f" middleware overhead: {overhead:+.1f}% ({fmt(n)} ask calls, 1 middleware)")
|
||
|
||
await system_plain.shutdown()
|
||
await system_mw.shutdown()
|
||
|
||
|
||
async def bench_executor_parallel(num_tasks=16):
|
||
"""Measure thread pool parallelism with CPU work."""
|
||
system = ActorSystem("bench", executor_workers=8)
|
||
refs = [await system.spawn(ComputeActor, f"cpu{i}") for i in range(num_tasks)]
|
||
|
||
start = time.perf_counter()
|
||
results = await asyncio.gather(*[r.ask(10_000, timeout=30.0) for r in refs])
|
||
elapsed = time.perf_counter() - start
|
||
|
||
print(f" executor parallel: {num_tasks} fib(10K) in {elapsed*1000:.0f}ms ({num_tasks/elapsed:.0f} tasks/s)")
|
||
|
||
await system.shutdown()
|
||
|
||
|
||
async def bench_spawn_teardown(n=5000):
|
||
"""Measure actor spawn + shutdown speed."""
|
||
system = ActorSystem("bench")
|
||
|
||
start = time.perf_counter()
|
||
refs = []
|
||
for i in range(n):
|
||
refs.append(await system.spawn(NoopActor, f"a{i}"))
|
||
spawn_elapsed = time.perf_counter() - start
|
||
|
||
start = time.perf_counter()
|
||
await system.shutdown()
|
||
shutdown_elapsed = time.perf_counter() - start
|
||
|
||
print(f" spawn {n}: {spawn_elapsed*1000:.0f}ms ({n/spawn_elapsed:.0f}/s)")
|
||
print(f" shutdown {n}: {shutdown_elapsed*1000:.0f}ms")
|
||
|
||
|
||
async def main():
|
||
print("=" * 60)
|
||
print(" Actor Framework Benchmarks")
|
||
print("=" * 60)
|
||
print()
|
||
|
||
print("[Throughput]")
|
||
await bench_tell_throughput()
|
||
await bench_ask_throughput()
|
||
print()
|
||
|
||
print("[Latency]")
|
||
await bench_ask_latency()
|
||
await bench_actor_chain()
|
||
print()
|
||
|
||
print("[Concurrency]")
|
||
await bench_concurrent_actors()
|
||
await bench_executor_parallel()
|
||
print()
|
||
|
||
print("[Overhead]")
|
||
await bench_middleware_overhead()
|
||
print()
|
||
|
||
print("[Lifecycle]")
|
||
await bench_spawn_teardown()
|
||
print()
|
||
|
||
print("=" * 60)
|
||
print(" Done")
|
||
print("=" * 60)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main())
|