Multi-Agent Systems
TL;DR
Multi-agent systems coordinate multiple LLM agents on tasks that exceed one context window or one line of investigation. The pattern that won in production is orchestrator–workers: a lead agent decomposes the task and spawns parallel subagents whose findings return compressed, while the orchestrator owns all writes. The decision rule is read-vs-write: parallelize reading (research, search, audit) freely; keep writing (code, documents, decisions) single-threaded, because context cannot be cheaply shared and conflicting partial views produce incoherent output. Expect multi-agent runs to cost an order of magnitude more tokens than chat — justified only when task value is high and the work genuinely parallelizes. Peer-to-peer, debate, and blackboard architectures remain useful as evaluation and research structures, not as production defaults.
When Multi-Agent Actually Wins (and When It Doesn't)
Two influential, opposing field reports frame the design space:
- Anthropic's research system (orchestrator + parallel search subagents) beat a single-agent baseline by ~90% on breadth-first research queries — but consumed ~15× the tokens of a chat interaction. The economics only close when task value is high and the work is parallelizable reading.
- Cognition's "Don't Build Multi-Agents" argues most multi-agent failures are context failures: each agent sees a slice, makes locally-reasonable but globally-conflicting decisions, and no message protocol fully transmits the unwritten context behind a decision. Their conclusion: prefer one continuous agent with full context — use compaction for length, not delegation.
Both are right, about different workloads:
| Workload | Verdict | Why |
|---|---|---|
| Breadth-first research, multi-source search | Parallelize | Subtasks independent; results merge by summarization; errors don't compound across branches |
| Large-scale audit/review (security pass over 200 files) | Parallelize | Read-only; findings are a union |
| Writing code in one repo | Single agent (+ read-only subagents) | Write-write conflicts in semantics, not just git; shared context is the product |
| One document, one decision, one design | Single agent | Decomposition costs more context than it saves |
| Independent deliverables (3 services, 3 repos) | Parallel single agents | Not really multi-agent — partition by ownership, integrate via contracts |
The synthesis used by current coding and research harnesses: one orchestrator owns the goal, the plan, and every write; subagents are context firewalls for expensive reads, returning ≤2K-token distillations instead of transcripts. Subagent value is context isolation first, wall-clock parallelism second — see Harness Engineering.
Cost model to internalize: every agent re-reads its own growing transcript, so N agents ≠ N× one agent's cost — coordination overhead, duplicated context, and synthesis turns push real-world multiples to 10–20× single-agent token spend. Budget at design time, not after the invoice.
The Problem with Single Agents
Single agents hit fundamental limits when facing complex, multi-faceted tasks:
Multi-Agent Architecture Patterns
Pattern 1: Hierarchical (Supervisor) Architecture
A supervisor agent coordinates specialized worker agents:
from typing import Literal
from pydantic import BaseModel
import asyncio
class TaskAssignment(BaseModel):
"""Supervisor's decision on task routing."""
agent: Literal["researcher", "coder", "reviewer", "done"]
task_description: str
context: str
priority: int = 1
class SupervisorAgent:
"""Coordinates specialized worker agents."""
def __init__(self, llm_client):
self.llm = llm_client
self.workers = {
"researcher": ResearchAgent(llm_client),
"coder": CodeAgent(llm_client),
"reviewer": ReviewAgent(llm_client),
}
self.conversation_history = []
self.task_results = {}
async def run(self, user_request: str, max_iterations: int = 10) -> str:
"""Main orchestration loop."""
self.conversation_history.append({
"role": "user",
"content": user_request
})
for iteration in range(max_iterations):
# Supervisor decides next action
assignment = await self._decide_next_action()
if assignment.agent == "done":
return await self._synthesize_final_response()
# Delegate to appropriate worker
worker = self.workers[assignment.agent]
result = await worker.execute(
task=assignment.task_description,
context=assignment.context,
previous_results=self.task_results
)
# Track results
self.task_results[f"{assignment.agent}_{iteration}"] = result
self.conversation_history.append({
"role": "assistant",
"content": f"[{assignment.agent}]: {result.summary}"
})
return await self._synthesize_final_response()
async def _decide_next_action(self) -> TaskAssignment:
"""Supervisor LLM decides which agent to invoke next."""
system_prompt = """You are a supervisor coordinating a team of agents:
- researcher: Gathers information, searches web, analyzes documents
- coder: Writes, debugs, and tests code
- reviewer: Reviews code for quality, security, best practices
- done: All tasks complete, ready to deliver final response
Based on the conversation and completed tasks, decide the next action.
Return a JSON with: agent, task_description, context, priority"""
response = await self.llm.generate(
system=system_prompt,
messages=self.conversation_history,
response_format=TaskAssignment
)
return response
async def _synthesize_final_response(self) -> str:
"""Combine all worker results into coherent response."""
synthesis_prompt = f"""Synthesize the following agent outputs into
a coherent final response:
{self.task_results}
Original request: {self.conversation_history[0]['content']}
"""
return await self.llm.generate(synthesis_prompt)
class ResearchAgent:
"""Specialized agent for research tasks."""
def __init__(self, llm_client):
self.llm = llm_client
self.tools = [WebSearchTool(), DocumentAnalyzer(), CitationManager()]
async def execute(self, task: str, context: str, previous_results: dict):
system_prompt = """You are a research specialist. Your capabilities:
- Search the web for current information
- Analyze and summarize documents
- Provide properly cited sources
Be thorough but concise. Always cite sources."""
# ReAct loop for research
return await self._react_loop(system_prompt, task, context)Pattern 2: Peer-to-Peer Collaboration
Agents communicate directly without central coordination:
Each agent can: broadcast messages to all, send direct messages, request help, vote on decisions
import asyncio
from dataclasses import dataclass
from typing import Optional
from enum import Enum
class MessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
BROADCAST = "broadcast"
VOTE_REQUEST = "vote_request"
VOTE = "vote"
@dataclass
class AgentMessage:
sender: str
recipient: Optional[str] # None = broadcast
message_type: MessageType
content: dict
correlation_id: str
class MessageBus:
"""Central message broker for agent communication."""
def __init__(self):
self.subscribers: dict[str, asyncio.Queue] = {}
self.message_history: list[AgentMessage] = []
def register(self, agent_id: str) -> asyncio.Queue:
"""Register an agent to receive messages."""
queue = asyncio.Queue()
self.subscribers[agent_id] = queue
return queue
async def publish(self, message: AgentMessage):
"""Publish a message to appropriate recipients."""
self.message_history.append(message)
if message.recipient is None:
# Broadcast to all except sender
for agent_id, queue in self.subscribers.items():
if agent_id != message.sender:
await queue.put(message)
else:
# Direct message
if message.recipient in self.subscribers:
await self.subscribers[message.recipient].put(message)
class PeerAgent:
"""Base class for peer-to-peer agents."""
def __init__(self, agent_id: str, role: str, llm_client, message_bus: MessageBus):
self.agent_id = agent_id
self.role = role
self.llm = llm_client
self.bus = message_bus
self.inbox = message_bus.register(agent_id)
self.pending_requests: dict[str, asyncio.Future] = {}
async def run(self):
"""Main agent loop - process incoming messages."""
while True:
message = await self.inbox.get()
asyncio.create_task(self._handle_message(message))
async def _handle_message(self, message: AgentMessage):
"""Process incoming message based on type."""
if message.message_type == MessageType.REQUEST:
response = await self._process_request(message)
await self.send(
recipient=message.sender,
message_type=MessageType.RESPONSE,
content=response,
correlation_id=message.correlation_id
)
elif message.message_type == MessageType.RESPONSE:
if message.correlation_id in self.pending_requests:
self.pending_requests[message.correlation_id].set_result(message)
elif message.message_type == MessageType.BROADCAST:
await self._handle_broadcast(message)
async def request(self, recipient: str, content: dict, timeout: float = 30) -> dict:
"""Send request and wait for response."""
correlation_id = str(uuid.uuid4())
future = asyncio.Future()
self.pending_requests[correlation_id] = future
await self.send(recipient, MessageType.REQUEST, content, correlation_id)
try:
response = await asyncio.wait_for(future, timeout)
return response.content
finally:
del self.pending_requests[correlation_id]
async def broadcast(self, content: dict):
"""Broadcast message to all agents."""
await self.send(None, MessageType.BROADCAST, content, str(uuid.uuid4()))
class PlannerAgent(PeerAgent):
"""Plans and coordinates task execution."""
async def _process_request(self, message: AgentMessage) -> dict:
if message.content.get("action") == "create_plan":
plan = await self._create_plan(message.content["task"])
# Broadcast plan for feedback
await self.broadcast({
"action": "plan_proposal",
"plan": plan,
"request_feedback": True
})
return {"status": "plan_created", "plan": plan}
async def _create_plan(self, task: str) -> list[dict]:
response = await self.llm.generate(
system="You are a planning specialist. Break down tasks into steps.",
prompt=f"Create a plan for: {task}"
)
return response
class CriticAgent(PeerAgent):
"""Reviews and critiques other agents' outputs."""
async def _handle_broadcast(self, message: AgentMessage):
if message.content.get("action") == "plan_proposal":
critique = await self._critique_plan(message.content["plan"])
# Send critique back to planner
await self.send(
recipient=message.sender,
message_type=MessageType.RESPONSE,
content={"critique": critique},
correlation_id=message.correlation_id
)Pattern 3: Debate / Adversarial Pattern
Multiple agents debate to improve output quality:
class DebateSystem:
"""Implements adversarial debate between agents."""
def __init__(self, llm_client, num_rounds: int = 3):
self.llm = llm_client
self.num_rounds = num_rounds
self.proposer = DebateAgent("proposer", "advocate", llm_client)
self.challenger = DebateAgent("challenger", "critic", llm_client)
self.judge = JudgeAgent(llm_client)
async def debate(self, topic: str) -> dict:
"""Run a full debate on a topic."""
debate_history = []
# Initial proposal
proposal = await self.proposer.make_argument(
topic=topic,
role="proposer",
history=[]
)
debate_history.append({"agent": "proposer", "argument": proposal})
# Debate rounds
for round_num in range(self.num_rounds):
# Challenger responds
challenge = await self.challenger.make_argument(
topic=topic,
role="challenger",
history=debate_history
)
debate_history.append({"agent": "challenger", "argument": challenge})
# Proposer responds
response = await self.proposer.make_argument(
topic=topic,
role="proposer",
history=debate_history
)
debate_history.append({"agent": "proposer", "argument": response})
# Judge decides
verdict = await self.judge.evaluate(topic, debate_history)
return {
"debate_history": debate_history,
"verdict": verdict,
"final_answer": verdict["synthesized_answer"]
}
class DebateAgent:
"""Agent that participates in debates."""
ROLE_PROMPTS = {
"proposer": """You are advocating for your position.
Make strong arguments, address counterpoints, and defend your stance.
Be persuasive but intellectually honest.""",
"challenger": """You are challenging the proposal.
Find weaknesses, ask probing questions, propose alternatives.
Be critical but constructive."""
}
async def make_argument(self, topic: str, role: str, history: list) -> str:
formatted_history = "\n".join([
f"{h['agent']}: {h['argument']}" for h in history
])
response = await self.llm.generate(
system=self.ROLE_PROMPTS[role],
prompt=f"""Topic: {topic}
Debate so far:
{formatted_history}
Make your next argument (2-3 paragraphs):"""
)
return response
class JudgeAgent:
"""Evaluates debate and synthesizes final answer."""
async def evaluate(self, topic: str, history: list) -> dict:
formatted = "\n".join([f"{h['agent']}: {h['argument']}" for h in history])
response = await self.llm.generate(
system="""You are an impartial judge evaluating a debate.
Consider the strength of arguments, evidence provided, and logical consistency.
Synthesize the best elements from both sides into a final answer.""",
prompt=f"""Topic: {topic}
Full debate:
{formatted}
Provide:
1. Analysis of each side's strongest points
2. Weaknesses in each position
3. Your synthesized final answer combining the best insights""",
response_format=JudgeVerdict
)
return responsePattern 4: Assembly Line (Pipeline)
Sequential processing with specialized stages:
Each stage: has specific responsibility, can reject/return to previous stage, adds metadata for next stage
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
@dataclass
class PipelineContext:
"""Carries data through the pipeline."""
original_input: str
current_data: any
stage_outputs: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
errors: list = field(default_factory=list)
class PipelineStage(ABC):
"""Base class for pipeline stages."""
@abstractmethod
async def process(self, context: PipelineContext) -> PipelineContext:
pass
@abstractmethod
def can_process(self, context: PipelineContext) -> bool:
pass
class CodeGenerationPipeline:
"""Assembly line for code generation."""
def __init__(self, llm_client):
self.stages = [
RequirementsAgent(llm_client),
ArchitectureAgent(llm_client),
ImplementationAgent(llm_client),
TestingAgent(llm_client),
ReviewAgent(llm_client),
]
async def run(self, user_request: str) -> PipelineContext:
context = PipelineContext(
original_input=user_request,
current_data=user_request
)
for stage in self.stages:
if not stage.can_process(context):
context.errors.append(f"Stage {stage.__class__.__name__} cannot process")
break
context = await stage.process(context)
context.stage_outputs[stage.__class__.__name__] = context.current_data
# Check for stage failures
if context.metadata.get("stage_failed"):
# Optionally retry or route to error handling
break
return context
class RequirementsAgent(PipelineStage):
"""Extracts and clarifies requirements."""
def __init__(self, llm_client):
self.llm = llm_client
def can_process(self, context: PipelineContext) -> bool:
return isinstance(context.current_data, str)
async def process(self, context: PipelineContext) -> PipelineContext:
response = await self.llm.generate(
system="""Extract structured requirements from the user request.
Identify: functional requirements, non-functional requirements,
constraints, and ambiguities that need clarification.""",
prompt=context.current_data,
response_format=Requirements
)
context.current_data = response
context.metadata["requirements_extracted"] = True
return context
class ArchitectureAgent(PipelineStage):
"""Designs system architecture based on requirements."""
async def process(self, context: PipelineContext) -> PipelineContext:
requirements = context.current_data
response = await self.llm.generate(
system="""Design a system architecture based on requirements.
Include: components, interfaces, data flow, and technology choices.""",
prompt=f"Requirements: {requirements}",
response_format=Architecture
)
context.current_data = response
context.metadata["architecture_designed"] = True
return contextCommunication Protocols
Interoperability Standards
Two protocols cover the integration surface, and they're complementary, not competing:
- MCP (Model Context Protocol) — connects an agent to tools and context: one server wraps a system (GitHub, Postgres, a browser), any MCP-capable agent uses it. Vertical integration: agent ↔ capabilities.
- A2A (Agent2Agent) — connects agents to agents across vendors and frameworks: capability discovery via Agent Cards, task lifecycle management, opaque execution (agents collaborate without exposing internals). Horizontal integration: agent ↔ agent, now under the Linux Foundation.
Inside a single product, you rarely need either between your own agents — a function call and a shared data structure beat a network protocol. Reach for A2A at organizational boundaries (another team's agent, another vendor's), the same way you'd reach for an API contract instead of a shared database. And regardless of transport, the hard problem remains semantic: a subagent that receives task description without the orchestrator's constraints and rejected-alternatives will make locally-sensible, globally-wrong choices. Pass decisions and the reasons for them.
Structured Message Format
from pydantic import BaseModel
from datetime import datetime
from typing import Optional, Any
class AgentMessage(BaseModel):
"""Standardized message format for agent communication."""
# Routing
message_id: str
correlation_id: Optional[str] = None # For request-response pairing
sender_id: str
recipient_id: Optional[str] = None # None = broadcast
# Timing
timestamp: datetime
ttl_seconds: Optional[int] = None
# Content
message_type: str # "request", "response", "event", "command"
action: str # Specific action requested
payload: dict[str, Any]
# Context
conversation_id: str
parent_message_id: Optional[str] = None
# Metadata
priority: int = 5 # 1-10, higher = more urgent
require_ack: bool = False
class ConversationProtocol:
"""Manages structured conversations between agents."""
def __init__(self):
self.conversations: dict[str, list[AgentMessage]] = {}
def start_conversation(self, initiator: str, topic: str) -> str:
"""Start a new conversation thread."""
conv_id = str(uuid.uuid4())
self.conversations[conv_id] = []
return conv_id
def add_message(self, message: AgentMessage):
"""Add message to conversation history."""
if message.conversation_id in self.conversations:
self.conversations[message.conversation_id].append(message)
def get_context(self, conversation_id: str, max_messages: int = 10) -> list:
"""Get recent conversation context."""
return self.conversations.get(conversation_id, [])[-max_messages:]Shared Memory / Blackboard Pattern
from typing import Optional
import asyncio
from datetime import datetime
class Blackboard:
"""Shared memory space for multi-agent collaboration."""
def __init__(self):
self._state: dict[str, Any] = {}
self._locks: dict[str, asyncio.Lock] = {}
self._subscribers: dict[str, list[callable]] = {}
self._history: list[dict] = []
async def read(self, key: str) -> Optional[Any]:
"""Read value from blackboard."""
return self._state.get(key)
async def write(self, key: str, value: Any, writer_id: str):
"""Write value to blackboard with locking."""
if key not in self._locks:
self._locks[key] = asyncio.Lock()
async with self._locks[key]:
old_value = self._state.get(key)
self._state[key] = value
# Record history
self._history.append({
"timestamp": datetime.now(),
"key": key,
"old_value": old_value,
"new_value": value,
"writer": writer_id
})
# Notify subscribers
await self._notify(key, value, writer_id)
def subscribe(self, key: str, callback: callable):
"""Subscribe to changes on a key."""
if key not in self._subscribers:
self._subscribers[key] = []
self._subscribers[key].append(callback)
async def _notify(self, key: str, value: Any, writer_id: str):
"""Notify all subscribers of a change."""
for callback in self._subscribers.get(key, []):
await callback(key, value, writer_id)
class BlackboardAgent:
"""Agent that collaborates via blackboard."""
def __init__(self, agent_id: str, blackboard: Blackboard, llm_client):
self.agent_id = agent_id
self.blackboard = blackboard
self.llm = llm_client
# Subscribe to relevant keys
blackboard.subscribe("problem_state", self.on_state_change)
blackboard.subscribe("help_requests", self.on_help_request)
async def on_state_change(self, key: str, value: Any, writer_id: str):
"""React to problem state changes."""
if writer_id == self.agent_id:
return # Ignore own writes
# Check if we can contribute
contribution = await self._evaluate_contribution(value)
if contribution:
current = await self.blackboard.read("partial_solutions") or []
current.append({
"agent": self.agent_id,
"contribution": contribution
})
await self.blackboard.write("partial_solutions", current, self.agent_id)
async def _evaluate_contribution(self, state: dict) -> Optional[dict]:
"""Determine if agent can contribute to current state."""
response = await self.llm.generate(
system=f"You are a specialist in {self.specialty}. "
"Evaluate if you can contribute to solving this problem.",
prompt=f"Current state: {state}\n\nCan you contribute? If so, what?"
)
return response if response.get("can_contribute") else NoneState Management
Distributed State with Event Sourcing
from dataclasses import dataclass
from typing import List
from datetime import datetime
import json
@dataclass
class AgentEvent:
"""Immutable event representing a state change."""
event_id: str
agent_id: str
event_type: str
payload: dict
timestamp: datetime
version: int
class EventStore:
"""Persistent store for agent events."""
def __init__(self, storage_backend):
self.storage = storage_backend
self.subscribers: list[callable] = []
async def append(self, event: AgentEvent):
"""Append event to store."""
await self.storage.save(event)
# Notify subscribers
for subscriber in self.subscribers:
await subscriber(event)
async def get_events(
self,
agent_id: str = None,
event_type: str = None,
since: datetime = None
) -> List[AgentEvent]:
"""Query events with filters."""
return await self.storage.query(agent_id, event_type, since)
async def rebuild_state(self, agent_id: str) -> dict:
"""Rebuild agent state from events."""
events = await self.get_events(agent_id=agent_id)
state = {}
for event in events:
state = self._apply_event(state, event)
return state
def _apply_event(self, state: dict, event: AgentEvent) -> dict:
"""Apply event to state (reducer pattern)."""
if event.event_type == "task_started":
state["current_task"] = event.payload["task"]
state["status"] = "working"
elif event.event_type == "task_completed":
state["completed_tasks"] = state.get("completed_tasks", [])
state["completed_tasks"].append(event.payload)
state["status"] = "idle"
elif event.event_type == "error_occurred":
state["last_error"] = event.payload
state["status"] = "error"
return state
class StatefulAgent:
"""Agent with event-sourced state management."""
def __init__(self, agent_id: str, event_store: EventStore, llm_client):
self.agent_id = agent_id
self.event_store = event_store
self.llm = llm_client
self._state = None
async def initialize(self):
"""Rebuild state from event history."""
self._state = await self.event_store.rebuild_state(self.agent_id)
async def execute_task(self, task: dict):
"""Execute task with state tracking."""
# Emit start event
await self._emit_event("task_started", {"task": task})
try:
result = await self._do_work(task)
await self._emit_event("task_completed", {
"task": task,
"result": result
})
return result
except Exception as e:
await self._emit_event("error_occurred", {
"task": task,
"error": str(e)
})
raise
async def _emit_event(self, event_type: str, payload: dict):
"""Emit and persist an event."""
event = AgentEvent(
event_id=str(uuid.uuid4()),
agent_id=self.agent_id,
event_type=event_type,
payload=payload,
timestamp=datetime.now(),
version=len(await self.event_store.get_events(self.agent_id)) + 1
)
await self.event_store.append(event)
# Update local state
self._state = self.event_store._apply_event(self._state, event)Conflict Resolution
Voting and Consensus
from enum import Enum
from collections import Counter
class VotingStrategy(Enum):
MAJORITY = "majority"
UNANIMOUS = "unanimous"
WEIGHTED = "weighted"
RANKED_CHOICE = "ranked_choice"
class ConflictResolver:
"""Resolves conflicts between agent outputs."""
def __init__(self, strategy: VotingStrategy = VotingStrategy.MAJORITY):
self.strategy = strategy
async def resolve(
self,
question: str,
agent_responses: dict[str, Any],
weights: dict[str, float] = None
) -> dict:
"""Resolve conflicting agent responses."""
if self.strategy == VotingStrategy.MAJORITY:
return self._majority_vote(agent_responses)
elif self.strategy == VotingStrategy.WEIGHTED:
return self._weighted_vote(agent_responses, weights)
elif self.strategy == VotingStrategy.RANKED_CHOICE:
return await self._ranked_choice(agent_responses)
def _majority_vote(self, responses: dict[str, Any]) -> dict:
"""Simple majority voting."""
# Normalize responses for comparison
normalized = {k: self._normalize(v) for k, v in responses.items()}
# Count votes
vote_counts = Counter(normalized.values())
winner = vote_counts.most_common(1)[0]
# Find original response
for agent_id, response in responses.items():
if self._normalize(response) == winner[0]:
return {
"decision": response,
"method": "majority_vote",
"vote_count": winner[1],
"total_votes": len(responses),
"winning_agent": agent_id
}
def _weighted_vote(
self,
responses: dict[str, Any],
weights: dict[str, float]
) -> dict:
"""Weighted voting based on agent expertise/reliability."""
scores = {}
for agent_id, response in responses.items():
normalized = self._normalize(response)
weight = weights.get(agent_id, 1.0)
if normalized not in scores:
scores[normalized] = {"score": 0, "response": response, "voters": []}
scores[normalized]["score"] += weight
scores[normalized]["voters"].append(agent_id)
# Find highest scored response
winner = max(scores.values(), key=lambda x: x["score"])
return {
"decision": winner["response"],
"method": "weighted_vote",
"score": winner["score"],
"voters": winner["voters"]
}
async def _ranked_choice(self, responses: dict[str, Any]) -> dict:
"""Ranked choice voting with elimination rounds."""
# Each agent ranks all responses (including their own)
# Simulate by having an LLM rank them
pass
def _normalize(self, response: Any) -> str:
"""Normalize response for comparison."""
if isinstance(response, dict):
return json.dumps(response, sort_keys=True)
return str(response).lower().strip()
class ConsensusBuilder:
"""Builds consensus through iterative refinement."""
def __init__(self, llm_client, max_rounds: int = 3):
self.llm = llm_client
self.max_rounds = max_rounds
async def build_consensus(
self,
topic: str,
agent_positions: dict[str, str]
) -> dict:
"""Iteratively build consensus among agents."""
positions = agent_positions.copy()
for round_num in range(self.max_rounds):
# Check if consensus reached
if self._is_consensus(positions):
return {
"consensus": True,
"rounds": round_num + 1,
"final_position": list(positions.values())[0]
}
# Each agent considers others' positions
new_positions = {}
for agent_id, position in positions.items():
others = {k: v for k, v in positions.items() if k != agent_id}
revised = await self._revise_position(
agent_id, position, others, topic
)
new_positions[agent_id] = revised
positions = new_positions
# No full consensus - find common ground
return {
"consensus": False,
"rounds": self.max_rounds,
"final_positions": positions,
"common_ground": await self._find_common_ground(positions, topic)
}
async def _revise_position(
self,
agent_id: str,
position: str,
others: dict,
topic: str
) -> str:
"""Agent revises position considering others' views."""
response = await self.llm.generate(
system=f"You are agent {agent_id}. Consider other perspectives "
"and revise your position if you find merit in their arguments. "
"Maintain your position if you believe you are correct.",
prompt=f"""Topic: {topic}
Your current position: {position}
Other agents' positions:
{json.dumps(others, indent=2)}
Provide your revised position (or reaffirm your current one):"""
)
return responseReal-World Example: Software Development Team
class SoftwareDevTeam:
"""Multi-agent system simulating a software development team."""
def __init__(self, llm_client):
self.llm = llm_client
self.message_bus = MessageBus()
self.blackboard = Blackboard()
self.event_store = EventStore(InMemoryStorage())
# Create specialized agents
self.agents = {
"product_manager": ProductManagerAgent(
"pm", self.message_bus, self.blackboard, llm_client
),
"architect": ArchitectAgent(
"arch", self.message_bus, self.blackboard, llm_client
),
"developer": DeveloperAgent(
"dev", self.message_bus, self.blackboard, llm_client
),
"qa": QAAgent(
"qa", self.message_bus, self.blackboard, llm_client
),
"tech_writer": TechWriterAgent(
"docs", self.message_bus, self.blackboard, llm_client
),
}
self.supervisor = TeamLeadAgent(
"lead", self.agents, self.message_bus, llm_client
)
async def develop_feature(self, feature_request: str) -> dict:
"""Complete feature development lifecycle."""
# Phase 1: Requirements
await self.blackboard.write(
"feature_request",
feature_request,
"system"
)
requirements = await self.supervisor.delegate(
agent="product_manager",
task="analyze_requirements",
input=feature_request
)
await self.blackboard.write("requirements", requirements, "pm")
# Phase 2: Architecture
architecture = await self.supervisor.delegate(
agent="architect",
task="design_architecture",
input=requirements
)
# Review architecture with debate
debate = DebateSystem(self.llm)
refined_arch = await debate.debate(
topic=f"Review architecture: {architecture}"
)
await self.blackboard.write("architecture", refined_arch, "arch")
# Phase 3: Implementation
implementation = await self.supervisor.delegate(
agent="developer",
task="implement",
input={"requirements": requirements, "architecture": refined_arch}
)
# Phase 4: Testing
test_results = await self.supervisor.delegate(
agent="qa",
task="test",
input=implementation
)
# Iterate if tests fail
while not test_results["all_passed"]:
fixes = await self.supervisor.delegate(
agent="developer",
task="fix_bugs",
input=test_results["failures"]
)
test_results = await self.supervisor.delegate(
agent="qa",
task="test",
input=fixes
)
# Phase 5: Documentation
docs = await self.supervisor.delegate(
agent="tech_writer",
task="document",
input={"code": implementation, "architecture": refined_arch}
)
return {
"requirements": requirements,
"architecture": refined_arch,
"implementation": implementation,
"tests": test_results,
"documentation": docs
}Trade-offs
| Approach | Pros | Cons |
|---|---|---|
| Hierarchical | Clear control, easier debugging | Single point of failure, bottleneck |
| Peer-to-Peer | Resilient, flexible | Complex coordination, potential chaos |
| Debate | Higher quality outputs | Slow, expensive (multiple LLM calls) |
| Pipeline | Simple, predictable | Rigid, no backtracking |
| Blackboard | Flexible collaboration | Race conditions, complexity |
When to Use Multi-Agent Systems
Good fit:
- Complex tasks requiring diverse expertise
- Tasks benefiting from multiple perspectives
- Long-running workflows with distinct phases
- Scenarios requiring checks and balances
Poor fit:
- Simple, single-step tasks
- Latency-critical applications
- Cost-sensitive environments
- Tasks requiring strong consistency
References
- How We Built Our Multi-Agent Research System - Anthropic; orchestrator–workers in production, with the token economics
- Don't Build Multi-Agents - Cognition; the context-sharing counterargument
- Agent2Agent (A2A) Protocol - cross-vendor agent interop, Linux Foundation
- Model Context Protocol - agent ↔ tools/context standard
- AutoGen: Enabling Next-Gen LLM Applications - Microsoft Research
- MetaGPT: Multi-Agent Framework
- Multi-Agent Debate Improves LLM Reasoning
- LangGraph Documentation / OpenAI Agents SDK - orchestration frameworks