六大编排模式 · LangGraph/CrewAI/AutoGen · MCP+A2A · 可观测性工程
团队在 Cursor 里跑通了单 Agent Demo,上线后却要面对并行研究、工具隔离、人工审核闸门与共享 Token 预算。单一 Agent 会撞上上下文窗口、样样通样样松、零并行、单点故障(SPOF)四道墙。本文面向 AI 工程师与技术负责人:拆解多Agent 协作系统(MAS)的六大编排模式、LangGraph vs CrewAI vs AutoGen 选型矩阵、MCP + A2A 双层协议栈、生产级工程实践(PostgresSaver、HITL interrupt、熔断器)、MAST 可观测性资料,以及决策树与 2026 趋势展望。
2024 至 2025 年,AI Agent 从实验室走向生产。许多团队很快发现:把所有任务塞给一个 LLM Agent,系统会在规模化时崩溃。问题不在模型不够聪明,而在结构性限制。
上下文窗口瓶颈:复杂任务的中间结果会把上下文塞满,后續推理质量骤降,Agent 忘记十轮前设定的约束。
专业能力稀釋:一个 Agent 既要做信息检索、又要寫程式、又要做决策审核,樣樣都做但樣樣不精,指令互相干擾提高幻觉率。
串行执行低效:所有子任务顺序执行,总耗时是每步耗时之和,无法并行;独立子任务(同时爬三个站、跑三套测试)白白浪費墙鐘时间。
单点故障风险:一旦这个 Agent 出问题,整个流程全部停摆;没有隔离域供重试或回滾。
成本归因不透明:财务无法回答哪一步烧掉了 Token;没有 per-agent 预算,一个话多的研究 Agent 就能耗尽月度上限。
根据 MLflow 2026 年报告,Google 内部 Agent Bake-Off 实验显示:采用分布式多Agent架构后,处理时间从 1 小时降至 10 分钟,提升超过 6 倍。AdaptOrch(2026 学术论文)进一步证明:在多Agent 系统中,编排拓扑的选择对系统性能的影响比底层模型的选择更大,在 SWE-bench 等基准测试中,正确的拓扑选择可带来 12–23% 的性能提升。
拓扑大于模型。AdaptOrch 证明编排结构比模型选型多解释 12–23% 的结果变异——先画好图,再升级 GPT 方案。
多Agent 协作系统(Multi-Agent System,MAS)是指由多个独立的 AI Agent 组成的系统,这些 Agent 通过明确的通信协议和编排机制协作,完成单一 Agent 无法高效处理的复杂任务。
| 特征 | 描述 |
|---|---|
| 角色專一 | 只负责一个明确定義的子任务(检索、推理、生成、验证等) |
| 工具访问 | 擁有完成自身任务所需的特定工具集 |
| 状态隔离 | 维护自己的上下文和内存,不污染其他 Agent |
| 可替换性 | 可以独立升级、替换,不影响整体系统 |
| 拓扑 | 控制流 | 优点 | 风险 |
|---|---|---|---|
| 集中式(Centralized) | 单一 Orchestrator 路由所有消息 | 可审计、可控、政策执行严格 | Orchestrator 上下文膨胀;路由器 SPOF |
| 分布式(Decentralized) | Agent 点对点直接通信,无中央协调者 | 高弹性、低延迟、容错好 | 调试难、非确定性高、终止条件难保证 |
| 层级式(Hierarchical) | Supervisor 委派给 Worker,Worker 向上回报 | 企业工作流、多层审批、两者平衡 | Supervisor 提示复杂;延迟堆叠 |
2026 年大多数生产堆栈预设采用层级式,并以薄型集中式路由器处理鉴权与预算执行——混合第一与第三种拓扑的优点。
这六种模式覆盖生产中 95% 以上的多Agent 系统场景。模式可组合——客服堆栈可能用 Supervisor 扇出并行研究 Agent,再流水线合成给 Writer。
Agent A 的输出直接作为 Agent B 的输入,严格线性执行。适用于步骤间有严格依赖、流程固定、不需动态路由的场景(文章创作、代码审查、合规审核)。
from langgraph.graph import StateGraph, START, END
from typing import TypedDict
class PipelineState(TypedDict):
query: str
retrieved_docs: str
analysis: str
final_report: str
def retrieval_agent(state: PipelineState):
docs = search_knowledge_base(state["query"])
return {"retrieved_docs": docs}
def analysis_agent(state: PipelineState):
result = llm.invoke(f"分析以下内容:{state['retrieved_docs']}")
return {"analysis": result.content}
def writer_agent(state: PipelineState):
report = llm.invoke(f"根据分析撰写报告:{state['analysis']}")
return {"final_report": report.content}
builder = StateGraph(PipelineState)
builder.add_node("retriever", retrieval_agent)
builder.add_node("analyzer", analysis_agent)
builder.add_node("writer", writer_agent)
builder.add_edge(START, "retriever")
builder.add_edge("retriever", "analyzer")
builder.add_edge("analyzer", "writer")
builder.add_edge("writer", END)
pipeline = builder.compile()
| 优点 | 缺点 |
|---|---|
| 实现简单、易于调试、行为可预测、适合合规审计 | 总耗时 = 各步耗时之和;单步失败整体阻塞;无法处理动态分支 |
多个 Agent 同时处理独立子任务,最后由汇聚节点合并结果。总耗时 = max(T1, T2, ..., Tn) 而非加总。LangGraph 的 Send API 实现真正的并行 fan-out。
from langgraph.types import Send
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
import operator
class ResearchState(TypedDict):
query: str
research_results: Annotated[list, operator.add]
final_synthesis: str
def supervisor(state: ResearchState):
subtasks = [
{"query": state["query"], "source": "academic"},
{"query": state["query"], "source": "industry"},
{"query": state["query"], "source": "news"},
]
return [Send("research_worker", task) for task in subtasks]
def research_worker(state: dict):
result = search_by_source(state["query"], state["source"])
return {"research_results": [result]}
def synthesizer(state: ResearchState):
combined = "\n".join(state["research_results"])
synthesis = llm.invoke(f"综合以下研究结果:{combined}")
return {"final_synthesis": synthesis.content}
builder = StateGraph(ResearchState)
builder.add_node("research_worker", research_worker)
builder.add_node("synthesizer", synthesizer)
builder.add_conditional_edges(START, supervisor, ["research_worker"])
builder.add_edge("research_worker", "synthesizer")
builder.add_edge("synthesizer", END)
graph = builder.compile()
主管 Agent 负责意图识别、任务拆解和路由决策,将子任务分配给专业 Worker Agent。加入关键字快速通道:高信心意图用 regex 匹配,跳过 LLM 路由调用,响应延迟 <1ms。
KEYWORD_ROUTING = {
"代码": "code_agent",
"code": "code_agent",
"搜索": "search_agent",
"查询": "search_agent",
"资料": "data_agent",
}
def supervisor_with_fast_path(state):
query = state["query"].lower()
for keyword, agent_name in KEYWORD_ROUTING.items():
if keyword in query:
return {"next": agent_name}
routing_prompt = f"""
用户请求:{state['query']}
可用 Agent:code_agent, search_agent, data_agent
请返回最合适的 Agent 名称,只返回名称。
"""
decision = llm.invoke(routing_prompt)
return {"next": decision.content.strip()}
Agent 之间点对点直接传递任务,没有中央协调者,依靠终止规则(轮数、共识、超时)停止。适合多轮协商和辩论;非确定性高,生产环境慎用。
import autogen
reviewer_1 = autogen.AssistantAgent(
name="SecurityReviewer",
system_message="你是一位安全专家,专注於代码中的安全漏洞。"
)
reviewer_2 = autogen.AssistantAgent(
name="PerformanceReviewer",
system_message="你是一位性能专家,专注於代码的效率和資源使用。"
)
human_proxy = autogen.UserProxyAgent(
name="CodeAuthor",
human_input_mode="NEVER",
max_consecutive_auto_reply=2,
is_termination_msg=lambda x: "APPROVED" in x.get("content", "")
)
groupchat = autogen.GroupChat(
agents=[human_proxy, reviewer_1, reviewer_2],
messages=[],
max_round=6
)
manager = autogen.GroupChatManager(groupchat=groupchat)
所有 Agent 共享一个结构化工作空间(黑板),Agent 在满足自身前提条件时主动读写黑板,无需显式排程。适合长时间异步任务(小时级甚至天级)、异构服务协作、工作流条件复杂难以预定路由的场景。
在同一系统中组合多种模式,通常是「主管模式 + 流水线」的组合。典型架构:Intent Router → 简单查询直接回答;复杂报告 → Supervisor → 并行研究扇出 + 质量保障流水线 → 人工审核 → 发布。
| 模式 | 并发性 | 可调试性 | 典型框架 |
|---|---|---|---|
| Sequential Pipeline | 低 | 高 | LangGraph、CrewAI sequential |
| Fan-out / Fan-in | 高 | 中 | LangGraph Send |
| Supervisor-Worker | 中 | 高 | LangGraph、CrewAI hierarchical |
| Swarm | 中 | 低 | AutoGen、Swarm SDK |
| Blackboard | 中 | 中 | Custom + 共享存储 |
| Hybrid | 可变 | 中 | LangGraph(最常见) |
三者均在 2026 年有生产用户,但优化方向不同。依拓扑选框架,而非品牌偏好。
| 维度 | LangGraph | CrewAI | AutoGen(微软) |
|---|---|---|---|
| 架构范式 | 状态机图 | 角色制团队 | 对话式多Agent |
| 程式语言 | Python / JS/TS | Python | Python / .NET |
| 学习曲线 | 较陡 | 平缓 | 中等 |
| 状态管理 | 原生支持(PostgresSaver) | 需自实现 | 有限支持 |
| Human-in-the-Loop | 原生 interrupt() | 需自实现 | UserProxyAgent 模式 |
| 可观测性 | LangSmith(商业) | 有限 | Azure Monitor |
| 生产就绪度 | 最高 | 中等 | 高 |
| 快速原型 | 中等 | 最快 | 高 |
| Azure 整合 | 中等 | 较弱 | 最强 |
| 适合场景 | 复杂有状态工作流 | 角色制内容流水线 | 对话式协作、研究实验 |
需要持久检查点 + HITL 审批闸门? → LangGraph(合规、金融、医疗)。
需要 1–2 天出可读角色 YAML 原型? → CrewAI(内容生成、研究报告)。
需要 Agent 多轮辩论和迭代推理? → AutoGen(微软/Azure 技术栈)。
需要图控制 + 对话 handoff 并存? → LangGraph Orchestrator 包装 AutoGen Worker。
2026 年,多Agent系统的通信协议已标准化为两层互补架构,两者均已纳入 Linux Foundation Agentic AI Foundation 管理。MCP 负责垂直整合(Agent ↔ 工具/数据);A2A 负责横向编排(Agent ↔ Agent)。
| 层级 | 协议 | 连接对象 | 类比 |
|---|---|---|---|
| 垂直层 | MCP(Model Context Protocol) | Agent ↔ 工具、数据库、API | AI 工具整合的 USB-C |
| 横向层 | A2A(Agent-to-Agent) | Agent ↔ Agent 任务委派 | 服务网格的 HTTP |
MCP 由 Anthropic 主导,统一 Agent 访问外部工具的接口。每个 Agent 内部通过 tools/list 发现工具;A2A 则通过 Agent Card 发现并委派任务给其他 Agent。
from mcp.server import Server
from mcp.types import Tool, TextContent
app = Server("data-agent-mcp")
@app.list_tools()
async def list_tools():
return [
Tool(
name="query_customer_db",
description="查询客戶数据库,支持按 ID、姓名、Email 检索",
inputSchema={
"type": "object",
"properties": {
"field": {"type": "string", "enum": ["id", "name", "email"]},
"value": {"type": "string"}
},
"required": ["field", "value"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: dict):
if name == "query_customer_db":
result = db.query(arguments["field"], arguments["value"])
return [TextContent(type="text", text=str(result))]
{
"name": "ResearchAgent",
"version": "1.0",
"description": "专业信息检索与摘要 Agent",
"url": "https://research-agent.internal/a2a",
"capabilities": {
"streaming": true,
"async": true
},
"skills": [
{
"id": "web_research",
"name": "网络信息检索",
"description": "从国际网络检索并摘要最新信息",
"tags": ["research", "summarization", "web"]
},
{
"id": "academic_search",
"name": "学术文獻检索",
"description": "检索 arXiv、Google Scholar 等学术数据库"
}
]
}
import httpx
async def discover_and_delegate(agent_url: str, task: str):
card_response = await httpx.get(f"{agent_url}/.well-known/agent.json")
agent_card = card_response.json()
available_skills = [s["id"] for s in agent_card["skills"]]
if "web_research" not in available_skills:
raise ValueError(f"Agent {agent_card['name']} 不支持 web_research 技能")
payload = {
"jsonrpc": "2.0",
"method": "message/send",
"id": "task-001",
"params": {
"message": {
"role": "user",
"parts": [{"type": "text", "text": task}]
}
}
}
response = await httpx.post(agent_card["url"], json=payload)
return response.json()
垂直层详见 MCP 协议深度解读;本节聚焦 Agent 之间的横向委派模式。
Demo 用内存状态;生产需要宕机恢复、高风险操作人工审批、成本上限。四个原语覆盖大多数团队在自建基础设施前的需求。
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string("postgresql://user:pass@localhost/agentdb") as checkpointer:
graph = builder.compile(checkpointer=checkpointer)
config = {"configurable": {"thread_id": "user-session-12345"}}
result = graph.invoke({"query": "分析 Q2 财报"}, config)
from langgraph.types import interrupt
def high_risk_action_agent(state):
proposed_action = plan_action(state)
human_decision = interrupt({
"proposed_action": proposed_action,
"risk_level": "HIGH",
"message": "此操作将修改生产数据库,请确认是否执行"
})
if human_decision["approved"]:
return execute_action(proposed_action)
else:
return {"status": "cancelled", "reason": human_decision.get("reason")}
import time
from functools import wraps
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_count = 0
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = "CLOSED"
self.last_failure_time = None
def __call__(self, func):
@wraps(func)
async def wrapper(*args, **kwargs):
if self.state == "OPEN":
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker OPEN - Agent 暂时不可用")
try:
result = await func(*args, **kwargs)
if self.state == "HALF_OPEN":
self.state = "CLOSED"
self.failure_count = 0
return result
except Exception:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
raise
return wrapper
@CircuitBreaker(failure_threshold=3, recovery_timeout=30)
async def call_external_agent(task):
return await agent_client.send(task)
class TokenBudgetManager:
def __init__(self, total_budget: int = 100_000):
self.total_budget = total_budget
self.used_tokens = 0
self.agent_usage = {}
def check_budget(self, agent_name: str, estimated_tokens: int) -> bool:
remaining = self.total_budget - self.used_tokens
if estimated_tokens > remaining:
raise BudgetExceededException(
f"Agent {agent_name} 请求 {estimated_tokens} tokens,"
f"但剩余预算仅 {remaining} tokens"
)
return True
def record_usage(self, agent_name: str, actual_tokens: int):
self.used_tokens += actual_tokens
self.agent_usage[agent_name] = self.agent_usage.get(agent_name, 0) + actual_tokens
MAX_ITERATIONS = 25
class ProductionGuardrails:
def __init__(self, budget: TokenBudgetManager, breaker: CircuitBreaker):
self.budget = budget
self.breaker = breaker
self.iterations = 0
def before_step(self, agent_id: str, est_tokens: int):
self.iterations += 1
if self.iterations > MAX_ITERATIONS:
raise RunawayLoopError()
self.budget.charge(agent_id, est_tokens)
self.breaker.check()
先在纸上画拓扑图:标注同步边、并行分支与 HITL interrupt 点,再写 LangGraph 节点。
接入 PostgresSaver:检查点指向托管 Postgres;验证 kill 进程后可从断点恢复。
按 Agent 注册 MCP 工具:每个 Agent 最小权限工具子集;禁止共享一个巨型 tool list。
添加 interrupt 节点:部署、删除、支付、PII 导出等高险工具必须经人工审批。
启用 TokenBudgetManager + CircuitBreaker:设置 per-agent 日预算;80% 消耗率时告警。
先上可观测性再上功能:每步 Agent 调用包裹 OpenTelemetry span;加第 7 个 Agent 前先建好 MONITORING_METRICS 看板。
建议:做混沌演练——在图执行中途 kill Worker、重启,确认 PostgresSaver 从上次检查点恢复且无重复副作用。
MAST 研究团队分析了 1,642 个多Agent 执行追踪,故障分布高度可预测——多数是设计问题,而非模型智商不足。
| 故障类型 | 占比 | 说明 |
|---|---|---|
| 系统设计问题 | 41.77% | 步骤重复、工具选择错误、上下文溢出、缺少终止条件 |
| Agent 间不对齐 | 36.94% | 交接时上下文遗失、一个 Agent 的幻觉成为下一个的「事实」 |
| 任务验证失败 | 21.30% | 过早终止、不完整验证、任务看似完成实则未完成 |
更令人担忧的是:57% 的组织已有 Agent 在生产环境运行,但仅 8% 完成了 LLM 可观测性的实施。大量错误以 HTTP 200 返回,监控面板全绿,系统实际输出的却是错误结果。
from opentelemetry import trace
import uuid
tracer = trace.get_tracer("multi-agent-system")
def traced_agent_call(agent_name: str, task: dict, correlation_id: str = None):
if not correlation_id:
correlation_id = str(uuid.uuid4())
with tracer.start_as_current_span(f"agent.{agent_name}") as span:
span.set_attribute("agent.name", agent_name)
span.set_attribute("correlation.id", correlation_id)
span.set_attribute("task.type", task.get("type", "unknown"))
try:
result = agent_registry[agent_name].run(task)
span.set_attribute("agent.tokens_used", result.get("tokens", 0))
span.set_attribute("agent.status", "success")
return result
except Exception as e:
span.set_attribute("agent.status", "error")
span.set_attribute("error.message", str(e))
raise
| 指标 | 意义 |
|---|---|
| task_success_rate | 端到端任务完成率(目标:>85%) |
| e2e_latency_p95 | P95 端到端延迟(目标:<30s) |
| total_cost_per_task | 每次任务平均 Token 成本 |
| agent_error_rate | 各 Agent 错误率(目标:<5%) |
| handoff_error_rate | A2A 结构不匹配与消息遗失 |
| output_quality_score | LLM-as-Judge 或人工标注的输出质量评分 |
MONITORING_METRICS = {
"task_success_rate": "端到端任务完成率(目标:>85%)",
"e2e_latency_p95": "P95 端到端延迟(目标:<30s)",
"total_cost_per_task": "每次任务平均 Token 成本",
"agent_error_rate": "各 Agent 错误率(目标:<5%)",
"agent_retry_count": "重试次数(高重试 = 需要调查)",
"tool_call_budget_usage": "工具调用次数/预算比",
"output_quality_score": "输出质量评分",
"goal_alignment_score": "目标一致性评分",
"hallucination_rate": "幻觉检测率",
}
import json
def evaluate_agent_output(original_task: str, agent_output: str) -> dict:
evaluation_prompt = f"""
你是一位严格的质量评审专家。请评估以下 AI Agent 的输出质量。
原始任务:{original_task}
Agent 输出:{agent_output}
请从完成度、准确性、相关性、幻觉检测四个维度评分(1-5 分)。
以 JSON 格式返回:
{{"completeness": x, "accuracy": x, "relevance": x, "hallucination_detected": true/false, "comments": "..."}}
"""
evaluation = llm.invoke(evaluation_prompt)
return json.loads(evaluation.content)
Agent A 产生幻觉(错误的「事实」),错误结果传给 Agent B、C,整个系统输出基于错误前提——而所有 HTTP 状态码都是 200。
def validate_agent_output(output: dict, schema: dict) -> bool:
jsonschema.validate(output, schema)
if output.get("confidence_score", 1.0) < 0.7:
raise LowConfidenceError(f"Agent 输出置信度过低: {output['confidence_score']}")
required_fields = schema.get("required", [])
missing = [f for f in required_fields if not output.get(f)]
if missing:
raise MissingFieldsError(f"输出缺少必填字段: {missing}")
return True
Agent 进入重试循环或反复调用工具,Token 费用在几分钟内暴涨至预期的百倍。
MAX_ITERATIONS = 10
MAX_TOOL_CALLS_PER_AGENT = 20
MAX_TOTAL_TOKENS = 50_000
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["high_cost_tool"]
)
为了使用多Agent 而使用多Agent,把简单的两步 LLM 链拆成 8 个 Agent,调试难度指数级上升。生产系统最佳 Agent 数量通常是 3–8 个——只有当有具体证据(并行需求、上下文溢出、子 Agent 需独立升级)时才增加数量。
内部 Demo 效果很好,上线后面对真实用户的边缘输入就频繁失败。生产环境必须从第一天就包装边界防护。
class ProductionGuardrails:
def validate_input(self, user_input: str) -> str:
if len(user_input) > 10000:
raise InputTooLongError("输入超过 10000 字符限制")
injection_patterns = ["ignore previous instructions", "forget everything"]
for pattern in injection_patterns:
if pattern.lower() in user_input.lower():
raise PromptInjectionError("侦测到潜在的提示注入攻击")
return user_input.strip()
def validate_output(self, output: str) -> str:
output = self.pii_filter.redact(output)
if self.content_classifier.is_harmful(output):
raise HarmfulContentError("输出包含有害内容")
return output
Fan-in 在所有 Send 分支完成前就执行,导致结果不完整或重复执行。LangGraph 边上加 defer=True,让汇聚节点等待全部并行 Worker。
graph.add_edge("fan_out", "fan_in", defer=True)
警告:最昂贵的错误是用加 Agent 来修 prompt 问题。先调整专家 Agent 提示与交接 schema,再考虑新增节点。
在选框架和画拓扑之前,用以下决策树快速定位模式。答案没有唯一正解,但顺序问对问题能避免 80% 的过度设计。
你的任务是否有明确的线性依赖步骤?
├─ 是 → 子任务是否可以并行执行?
│ ├─ 否 → 【顺序流水线】
│ └─ 是 → 【并行扇出 + 顺序流水线 混合】
│
└─ 否 → 是否有一个 Agent 具有决策权威?
├─ 是 → 规模是否足够大需要子团队?
│ ├─ 否 → 【Supervisor-Worker 层级模式】
│ └─ 是 → 【层级式(Supervisors of Supervisors)】
│
└─ 否 → 任务是否是长时间异步的?
├─ 是 → 【黑板架构】
└─ 否 → Agent 数量是否 ≤ 5?
├─ 是 → 【Swarm(注意设定终止条件)】
└─ 否 → 【考虑重新拆分为层级模式】
子任务彼此独立? 是 → 并行扇出。否 → 继续。
顺序严格? 是 → 顺序流水线。否 → 继续。
需要涌现式对话? 是 → Swarm / AutoGen。否 → Supervisor-Worker。
需要宕机安全恢复? 是 → LangGraph + PostgresSaver。否 → CrewAI 快速路径。
跨团队 Agent 发现? 是 → 发布 Agent Card + A2A。仅工具 → 每 Agent 各自 MCP。
笔记本适合跑 Agent Demo,却在合盖休眠、缺少 macOS 原生工具链与长期程序守护上削弱 7×24 体验;纯 Linux 云主机能跑无状态 API Worker 与远端 HTTP+SSE MCP,却难以承载依赖 Keychain、Xcode 或 Apple 生态的 MCP 工具。对需要把多Agent 编排图当「常驻基础设施」、让 LangGraph 检查点与 MCP 会话跨周累积复利的技术团队,VpsMesh Mac Mini M4 云端租赁把 uptime、远端 KVM 与可预期月租打包成生产级 OpEx。方案见 Mac Mini M4 租赁价格,部署与运维问题见 帮助中心,在线下单见 订购页。
大多数生产系统落在 3 到 8 个专用 Agent 之间。少于 3 个通常不值得编排开销;超过 8 个往往意味着过度工程化,除非有清晰的领域边界和 per-agent 可观测性。建议从 Supervisor + 2 个 Worker 起步,量测 tokens_per_success,仅在某个 Agent 上下文持续溢出时再拆分。
MCP 是垂直层:每个 Agent 通过 tools/list 与 JSON Schema 描述符连接工具和数据。A2A 是水平层:Agent 通过 Agent Card 发现对等节点并委派子任务。每个 Agent 内部用 MCP;Agent 之间用 A2A。垂直层详见 MCP 协议深度解读;委派模式见本文第 05 节。
不必须。无状态 LangGraph Worker 与远端 HTTP+SSE MCP 可部署在 Linux 云主机。当 Agent 依赖 macOS 工具链、Xcode 构建、Keychain 机密,或需要不间断的检查点会话时,租用 Mac Mini M4 比对抗笔记本休眠周期更省心。建议先租 1 个月验证检查点延迟与 Token 消耗。价格:Mac Mini M4 租赁价格。部署:帮助中心。下单:订购页。