AI智能体系统的架构设计与模式应用
1、常见工作流
Anthropic 的研究突出了工作流(Workflow)和智能体(Agent)之间的一个关 键区别,即工作流指 LLM 和相关工具通过显式预定义的代码路径进行编排的系统。 这意味着操作序列、工具或 LLM 调用的条件以及整体任务执行流程都由开发者预先确定。相反,Anthropic 概念中的真正智能体具有更高程度的自主性。它们指的是由 LLM 动态指导自身流程的系统,能够实时决策工具的使用及实现目标所需的步骤。这类智能体始终掌控任务的执行过程,会根据输入和环境反馈调整方法,而非严格遵循预设路径。Anthropic 所识别和阐明的这些工作流模式,为在 LangGraph 中构建复杂的 AI 智能体系统提供了宝贵的框架。工作流和智能体之间的主要差异如图所示。
![image-20251130133200556]()
- 在 LangGraph 框架中,工作流通过其状态图架构得以优雅实现,使开发者能够以可视化与编程两种方式定义系统各组件间的信息流和控制流。借助 LangGraph, 开发者可使用持久化、流式传输、调试等强大功能,同时保持智能体系统结构的清晰与可管理性。本节将要探讨的工作流,基于 Anthropic 识别的常见模式,代表了使用 LangGraph 构建智能体系统的有效策略。这些模式构成了一套工具包,能满足各类 AI 应用程序的需求,为任务分解、复杂性管理及 LLM 输出质量提升提供了多元策略。
1.1 工作流的基础构建模块:增强型LLM
在深入研究特定的工作流模式之前,特别重要的是了解构建这些智能体系统所依据的基础构建模块:增强型 LLM(如图所示)。正如 Anthropic 在其对智能体构建模式的分析中强调的那样 —— 现代 LLM,尤其是那些为智能体系统相关任务设计的 LLM,不仅仅是独立的模型,更能通过一系列增强功能与世界互动,并执行超越简单文本生成的复杂任务。这些增强功能对于启用我们即将讨论的工作流和智能体至关重要。其中,核心增强功能通常如下所述。
- 检索(Retrieval):此功能允许 LLM 访问和整合外部信息,例如知识库、数据库或互联网。检索增强功能确保 LLM 的响应不限于其预训练数据,而是可以基于最新的和相关的信息。这对于需要事实准确性或访问特定数据的任务至关重要。
- 工具(Tool):使用工具可以使 LLM 能够与外部系统交互并在现实世界中执行操作。工具可以是从简单的函数(例如,计算器或日期检索器)到复杂的 API(可以与数据库交互、发送电子邮件或控制外部设备)。使用工具的能力使 LLM 能够超越被动信息处理,成为能够完成任务的主动智能体。
- 记忆(Memory):记忆机制允许 LLM 保留和利用来自过去交互或工作流步骤的信息。记忆的范围,从短期的对话记忆(有助于在正在进行的对话中保持上下文)到长期的记忆系统(使智能体能够随着时间的推移学习和适应)。 记忆对于构建有状态的智能体系统至关重要,因为这些系统需要维护上下文并建立在过去的经验之上。
这些增强功能与核心 LLM 协同工作,构成了构建复杂工作流和智能体的基础。 我们在探索每种工作流模式时,请记住,它们都以不同的方式利用这些增强功能来实 现特定功能。在 LangGraph 中,这些增强功能很容易与 LLM 集成,使开发者能够轻松构建可以检索信息、使用工具和维护记忆的智能体系统。以下工作流模式演示了如何编排这些增强型 LLM 功能以创建有效的智能体系统。
1.2 提示链
提示链(Prompt Chain)是一种基本的工作流模式,专注于将复杂任务分解为一 系列更简单、相互关联的步骤。在这种工作流中,利用增强型 LLM 的核心功能,一个 LLM 调用的输出成为后续调用的输入,从而创建一系列处理阶段。这种顺序性质允许整个调用链以更受控和审慎的方式解决问题,尤其是在任务本身包含多个方面的时候。提示链的一个关键方面是在中间阶段引入程序化检查的能力,通常称为“门控” (Gate)。门控充当质量控制存档点,确保流程保持在正轨上,并且中间输出符合预定义的标准。例如,在增强型 LLM 生成文档的初始草稿后,可以实施门控来检查特定的关键字或文体元素,然后继续进行下一个阶段。
提示链的主要优势在于,它能够通过简化每个增强型 LLM 调用来提高准确性。 通过将复杂任务分解为更小、更易于管理的子任务,每个增强型 LLM 调用都会获得更集中且更明确的提示。这可以提高每个步骤的输出的可靠性和质量,最终有助于获得更强大和更准确的结果。提示链特别适合将一个任务自然而清晰地划分为顺序子任务的情况。比如,生成营销文案,然后将其翻译成多种语言。第一步可以是使用增强型 LLM 创建引人注目的英文营销文案。此步骤的输出,即英文营销文案,成为第二步的输入。第二步可以是一系列增强型 LLM 调用,每个调用都将英文营销文案翻译成不同的目标语言。比如,采用提示链编写详细文档,首先使用增强型 LLM 生成文档大纲,然后实施门控来检查大纲是否满足特定要求(例如,关键主题的覆盖范围、 逻辑流程等),最后使用批准的大纲作为蓝图,供增强型 LLM 编写详细文档。
在 LangGraph 中,使用 Graph API 可以轻松实现提示链。提示链中的每个步骤都被表示为图中的一个节点,边定义了信息的顺序流。条件边可用于实现“门控”功 能,允许工作流根据程序化检查的结果进行分支(如图所示)。LangGraph 中的 Functional API 也提供了一种表达提示链的直观方式,允许通过以代码为中心的方式定义工作流步骤及其执行顺序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72# 基于 Graph API 的提示链工作流实现
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
# 使用 TypedDict 定义图状态,用于类型提示和状态管理
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
# 配置模型
llm = ChatOpenAI(model="gpt-4o-mini")
# 图中的节点,每个节点代表提示链中的一个步骤
def generate_joke(state: State):
"""第一个 LLM 调用,根据主题生成初始笑话"""
msg = llm.invoke(f" 写一个关于 {state['topic']} 的简短笑话 ") # 使用 状态中的主题调用 LLM
return {"joke": msg.content} # 返回生成的笑话,更新状态中的 joke 键
def check_punchline(state: State):
""" 门控函数,检查笑话中是否有妙语 """
# 简单检查 —— 笑话中是否包含“?”或“!”
if "?" in state["joke"] or "!" in state["joke"]:
return "Fail" # 笑话未能通过妙语检查
return "Pass" # 笑话通过妙语检查
def improve_joke(state: State):
"""第二个 LLM 调用,通过添加文字游戏来改进笑话"""
msg = llm.invoke(f"通过添加文字游戏使这个笑话更有趣:{state['joke']}") # 调用LLM以改进笑话
return {"improved_joke": msg.content} # 返回改进后的笑话,更新状态中的 improved_joke
def polish_joke(state: State):
""" 第三个 LLM 调用,用于最终润色,添加令人惊讶的转折 """
msg = llm.invoke(f" 为这个笑话添加一个令人惊讶的转折:{state['improved_joke']}") # 调用 LLM 以润色笑话
return {"final_joke": msg.content} # 返回润色后的笑话,更新状态中的 final_joke
# 使用StateGraph构建工作流,使用定义的状态进行初始化
workflow = StateGraph(State)
# 将节点添加到工作流图中,将它们与定义的函数关联起来
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
# 定义边以连接节点并建立工作流序列
workflow.add_edge(START, "generate_joke") # 将开始节点连接到 generate_joke 节点
workflow.add_conditional_edges("generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}) # generate_joke 之后的条件边,基于 check_punchline 输出
workflow.add_edge("improve_joke", "polish_joke") # 将 improve_joke 节点连接到 polish_joke 节点
workflow.add_edge("polish_joke",END) # 将 polish_joke 节点连接到结束节点
# 将工作流图编译为可执行链
chain = workflow.compile()
# 使用初始状态(主题:cats)调用编译链
state = chain.invoke({"topic": "cats"})
print("初始笑话: ")
print(state["joke"])
if "improved_joke" in state: # 检查 improved_joke 是否存在于状态中,指示妙语检查失败
print("改进后的笑话:")
print(state["improved_joke"])
print("最终笑话: ")
print(state["final_joke"])
# 初始笑话:
# 为什么猫总是喜欢坐在电脑键盘上?
#
# 因为它们想要“paw-s”一下工作! 🐾😺1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46# 基于 Functional API 的提示链工作流实现
from langchain_openai import ChatOpenAI
from langgraph.func import entrypoint, task
llm = ChatOpenAI(model="gpt-4o-mini")
# 使用 @task 装饰器定义的任务,代表工作流中的步骤
def generate_joke(topic: str):
"""第一个 LLM 调用,生成初始笑话"""
msg = llm.invoke(f" 写一个关于 {topic} 的简短笑话 ") # 调用 LLM 以根据主题生成笑话
return msg.content # 返回生成的笑话
def check_punchline(joke: str):
""" 门控函数,检查笑话中是否有妙语 """
# 简单检查 —— 笑话中是否包含“?”或“!”
if "?" in joke or "!" in joke:
return "Fail" # 笑话未能通过妙语检查
return "Pass" # 笑话通过妙语检查
def improve_joke(joke: str):
""" 第二个 LLM 调用,改进笑话 """
msg = llm.invoke(f" 通过添加文字游戏使这个笑话更有趣:{joke}") # 调用 LLM 以改进笑话
return msg.content # 返回改进后的笑话
def polish_joke(joke: str):
""" 第三个 LLM 调用,用于最终润色 """
msg = llm.invoke(f" 为这个笑话添加一个令人惊讶的转折:{joke}") # 调用 LLM 以润色笑话
return msg.content # 返回润色后的笑话
# 入口点装饰函数使用 Functional API 定义工作流
def workflow(topic: str):
original_joke = generate_joke(topic).result() # 执行 generate_joke 任务
if check_punchline(original_joke) == "Pass": # 基于 check_punchline 输出的条件进行检查
return original_joke # 如果妙语检查通过,则返回初始笑话
improved_joke = improve_joke(original_joke).result() # 如果妙语检查失败,则执行 improve_joke 任务
return polish_joke(improved_joke).result() # 执行 polish_joke 任务并返回最终结果
# 调用工作流并且流式传输每个步骤的更新
state = workflow.invoke("cats")
print(state)
# 为什么猫总是坐在电脑键盘上?
#
# 因为它们想要保持“毛”绒绒的输入!
1.3 路由
路由(Routing)工作流旨在通过对输入进行分类并将其定向到专门的下游任务来处理各种输入。对于需要处理各种输入类型的复杂应用程序来说,此模式尤其有价值,每种输入类型都需要不同的处理方法。路由背后的核心思想是实施决策步骤,通常由增强型 LLM 本身或更传统的分类模型提供支持,该步骤分析输入数据并确定最合适的后续处理路径。这允许关注点分离,从而可以为每种输入类型开发更集中、更优化的流程和提示。若没有路由,尝试为所有输入类型优化单一的、单片的工作流则可能会导致妥协,并导致不同类别任务之间的性能欠佳。
路由工作流的成功取决于分类步骤的准确性。如果输入分类错误,那么它们可能会被定向到不合适的处理路径,从而产生不正确或令人不满意的结果。例如,在客户服务应用程序中,路由可用于区分各种类型的客户查询。一般咨询、退款请求和技术支持都可以被路由到专用的下游流程和提示,并可能被路由到不同的工具集。 其中,一般咨询可能由更简单的 FAQ 检索系统处理;退款请求可能会触发涉及订单历史记录查找和退款处理工具的工作流;技术支持可能会被定向到更专业的技术支持智能体。
路由的另一个作用是优化成本和速度。根据复杂性或紧迫性对传入的请求进行分类,比如将简单的或常见的请求路由到更小、更快、更具成本效益的模型,如 Gemini Flash、Claude 3.5 Haiku,而将复杂的或不寻常的请求定向到功能更强大但可能更昂贵的模型,如 Gemini Pro、Claude 3.5 Sonnet。这种智能的资源分配可显著提升整个系统的效率和成本效益。
在 LangGraph 中,实现路由涉及定义一个执行分类的路由器节点和一组后续节点,每个节点处理特定类型的输入。然后,使用条件边根据分类结果将工作流从路由器节点定向到适当的下游节点(如图所示)。LangGraph 中的 Graph API 和 Functional API 都为构建路由工作流提供了灵活的机制,允许开发者选择最适合其应用程序需求和编码风格的方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93# 基于 Graph API 的路由工作流实现
import json
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
llm = ChatOpenAI(model="gpt-4o-mini")
# 路由工作流的状态定义
class State(TypedDict):
input: str
decision: str
output: str
# 图中的节点,每个节点处理特定的路由(story, joke, poem)
def llm_call_1(state: State):
""" 写一个故事 """
result = llm.invoke(state["input"]) # 调用 LLM 以根据输入写一个故事
return {"output": result.content} # 返回故事,更新状态中的 output
def llm_call_2(state: State):
""" 写一个笑话 """
result = llm.invoke(state["input"]) # 调用 LLM 以根据输入写一个笑话
return {"output": result.content} # 返回笑话,更新状态中的 output
def llm_call_3(state: State):
""" 写一首诗歌 """
result = llm.invoke(state["input"]) # 调用 LLM 以根据输入写一首诗歌
return {"output": result.content} # 返回诗歌,更新状态中的 output
def llm_call_router(state: State):
""" 使用结构化输出将输入路由到适当的节点 """
# 使用结构化输出调用增强型 LLM,以充当路由逻辑
model = ChatOpenAI(model="gpt-4o-mini", model_kwargs={"response_format": {"type": "json_object"}})
ai_msg = model.invoke(
[
SystemMessage(
content="You are a router that directs user input to the appropriate handler. Return a JSON object with a 'step' key and one of these values: 'story', 'joke', or 'poem'. For example: {'step':'joke'}"
# 路由 LLM 的系统消息
),
HumanMessage(content=state["input"]), # 用户输入消息
]
)
decision = json.loads(ai_msg.content)
return {"decision": decision["step"]} # 返回路由决策,更新状态中的 decision
# 条件边函数,根据决策路由到适当的节点
def route_decision(state: State):
# 根据状态中的 decision,返回要访问的下一个节点的名称
if state["decision"] == "story":
return "llm_call_1"
elif state["decision"] == "joke":
return "llm_call_2"
elif state["decision"] == "poem":
return "llm_call_3"
# 使用 StateGraph 构建路由工作流
router_builder = StateGraph(State)
# 将节点添加到图中
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)
# 定义边以连接节点并建立路由逻辑
router_builder.add_edge(START, "llm_call_router") # 将开始节点连接到路由器节点
router_builder.add_conditional_edges(
"llm_call_router",
route_decision,
{ # 由 route_decision 返回的名称:要访问的下一个节点的名称
"llm_call_1": "llm_call_1",
"llm_call_2": "llm_call_2",
"llm_call_3": "llm_call_3",
},
) # 从路由器节点到专用节点的条件边,基于路由决策
router_builder.add_edge("llm_call_1", END) # 将专用节点连接到结束节点
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)
# 编译路由工作流图
router_workflow = router_builder.compile()
# 使用示例输入调用路由工作流
state = router_workflow.invoke({"input": "给我写一个关于猫的笑话"})
print(state["output"])
# 当然可以!这是一个关于猫的笑话:
#
# 为什么猫总是上电脑?
#
# 因为它们想抓住鼠标!1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67# 基于 Functional API 的路由工作流实现
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.func import entrypoint, task
llm = ChatOpenAI(model="gpt-4o-mini")
def llm_call_1(input: str):
""" 写一个故事 """
result = llm.invoke(input) # 调用 LLM 以根据输入写一个故事
return result.content # 返回故事
def llm_call_2(input: str):
""" 写一个笑话 """
result = llm.invoke(input) # 调用 LLM 以根据输入写一个笑话
return result.content # 返回笑话
def llm_call_3(input: str):
""" 写一首诗歌 """
result = llm.invoke(input) # 调用 LLM 以根据输入写一首诗歌
return result.content # 返回诗歌
def llm_call_router(input: str):
""" 使用结构化输出将输入路由到适当的节点 """
# 使用结构化输出调用增强型 LLM,以充当路由逻辑
model = ChatOpenAI(model="gpt-4o-mini", model_kwargs={"response_format": {"type": "json_object"}})
ai_msg = model.invoke(
[
SystemMessage(
content="You are a router that directs user input to the appropriate handler. Return a JSON object with a 'step' key and one of these values: 'story', 'joke', or 'poem'. For example: {'step':'joke'}"
# 路由 LLM 的系统消息
),
HumanMessage(content=state["input"]), # 用户输入消息
]
)
decision = json.loads(ai_msg.content)
return {"decision": decision["step"]} # 返回路由决策,更新状态中的 decision
# 入口点装饰函数定义路由工作流
def router_workflow(input: str):
next_step = llm_call_router(input)["decision"] # 获取路由决策的 decision 值
llm_call = None # 初始化 llm_call 变量
if next_step == "story": # 基于路由决策的条件路由
llm_call = llm_call_1 # 如果路由是 story,则分配 llm_call_1 任务
elif next_step == "joke":
llm_call = llm_call_2 # 如果路由是 joke,则分配 llm_call_2 任务
elif next_step == "poem":
llm_call = llm_call_3 # 如果路由是 poem,则分配 llm_call_3 任务
if llm_call is None:
raise ValueError(f"Invalid routing decision: {next_step}")
return llm_call(input) # 执行选定的 LLM 调用任务并返回结果
# 调用路由工作流并流式传输每个步骤的更新
for step in router_workflow.stream("给我写一个关于猫的笑话", stream_mode="updates"):
print(step)
print("\n")
# {'router_workflow': <Future at 0x1222a5c10 state=pending>}
#
#
# {'llm_call_2': '当然可以!这是一个关于猫的小笑话:\n\n为什么猫总是喜欢坐在电脑键盘上?\n\n因为它们想要“掌控”一切!😸'}
1.4 并行化
并行化(Parallelization)是 Anthropic 的智能体系统分析中的一种工作流模式, 它利用增强型 LLM 同时处理任务不同的方面能力。并行化不是按顺序处理任务,而是允许同时进行 LLM 调用,其输出以编程的方式聚合。这体现在两个主要的变体中:分段 (Sectioning) 和投票 (Voting)。分段涉及将一个任务分解为可以并行执行的独立子任务,每个子任务都由单独的增强型 LLM 调用处理,然后将结果组合起来形成最终的输出。投票涉及多次运行相同的任务,通常使用略有不同的提示或配置,以获得不同的输出,然后聚合这些输出,通常通过投票或共识机制来获得更稳健和更可靠的结果。
并行化的主要好处是可以提高效率,尤其是在子任务真正独立并且可以被并发处理而没有显著的相互依赖性的情况下。并行化还可以带来置信度更高的结果,尤其是在多种视角或尝试有益的情况下。对于涉及多个考虑因素或标准的复杂任务,并行化可能特别有效。与其用众多约束条件压倒单个增强型 LLM 调用,不如让每个考虑因素都由专用的 LLM 调用来处理,从而可以集中精力处理每个特定方面。例如,在 AI 系统中构建护栏可以从并行化中受益。一个增强型 LLM 实例可以理解用户查询的主要意图,而另一个实例可以并行筛选相同的查询,以查找不当的内容或请求,并可能使用专门的内容分析工具。这种分段方法通常比让单个 LLM 调用同时处理核心响应生成和护栏检查更有效。同样,在 LLM 性能的自动评估中,可以采用并行化方法同时评估模型输出的不同方面。每个增强型 LLM 调用都可以负责评估模型性能的特定维度,例如事实准确性、连贯性或风格,并且可以聚合结果以提供全面的评估分数。 在投票变体中,考虑审查代码以查找漏洞的任务,可以并行执行多个增强型 LLM 调用,每个调用都使用略有不同的提示,或者从不同的漏洞检测角度来审查同一段代码。 如果其中几个调用都标记了潜在问题,则会增加对发现结果的信心。同样,评估一段内容是否不当可以通过投票来完成,使用多个增强型 LLM 调用来评估不当内容的不同方面,或者使用不同的阈值来标记内容。
LangGraph 通过其图结构促进并行化,它允许开发者定义可以从单个起点并发 (Concurrency)执行的多个节点;但是如果要实现真正的并行(Parallelization)执行,则需要开发者在节点中通过代码来实现。来自这些并发节点的输出随后可以在后续节点中聚合(如图所示)。LangGraph 中的 Graph API 自然支持在工作流中创建并发分支,而 Functional API 支持使用异步操作进行并发任务的执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109# 基于 Graph API 的并行化工作流实现
from typing_extensions import TypedDict
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, START, END
# 并行化工作流的图状态定义
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
llm = ChatOpenAI(model="gpt-4o-mini")
# 图中的节点,每个节点并发生成不同类型的内容
def call_llm_1(state: State):
"""第一个 LLM 调用,生成初始笑话"""
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话") # 调用 LLM 以根据主题写一个笑话
return {"joke": msg.content} # 返回笑话,更新状态中的 joke
def call_llm_2(state: State):
"""第二个 LLM 调用,生成故事"""
msg = llm.invoke(f" 写一个关于 {state['topic']} 的故事 ") # 调用 LLM 以根据主题写一个故事
return {"story": msg.content} # 返回故事,更新状态中的 story
def call_llm_3(state: State):
"""第三个 LLM 调用,生成诗歌"""
msg = llm.invoke(f" 写一首关于 {state['topic']} 的诗歌 ") # 调用 LLM 以根据主题写一首诗歌
return {"poem": msg.content} # 返回诗歌,更新状态中的 poem
def aggregator(state: State):
""" 将故事、笑话和诗歌组合成单个输出 """
combined = f" 这是一个关于 {state['topic']} 的故事、笑话和诗歌! \n\n"
# 开始组合输出
combined += f" 故事:\n{state['story']}\n\n" # 将故事添加到组合输出中
combined += f" 笑话:\n{state['joke']}\n\n" # 将笑话添加到组合输出中
combined += f" 诗歌:\n{state['poem']}" # 将诗歌添加到组合输出中
return {"combined_output": combined} # 返回组合输出,更新状态中的combined_output
# 使用 StateGraph 构建并行化工作流
parallel_builder = StateGraph(State)
# 将节点添加到图中
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
# 定义边以连接节点并建立并发执行任务
parallel_builder.add_edge(START, "call_llm_1") # 将开始节点连接到 call_llm_1,以进行并发执行
parallel_builder.add_edge(START, "call_llm_2") # 将开始节点连接到 call_llm_2,以进行并发执行
parallel_builder.add_edge(START, "call_llm_3") # 将开始节点连接到 call_llm_3,以进行并发执行
parallel_builder.add_edge("call_llm_1", "aggregator") # call_llm_1 节点执行完成后,将其连接到聚合器
parallel_builder.add_edge("call_llm_2", "aggregator") # call_llm_2 节点执行完成后,将其连接到聚合器
parallel_builder.add_edge("call_llm_3", "aggregator") # call_llm_3 节点执行完成后,将其连接到聚合器
parallel_builder.add_edge("aggregator", END) # 将聚合器节点连接到结束节点
# 编译并行化工作流图
parallel_workflow = parallel_builder.compile()
# 使用示例输入调用并行化工作流
state = parallel_workflow.invoke({"topic": "cats"})
print(state["combined_output"])
# 这是一个关于 cats 的故事、笑话和诗歌!
#
# 故事:
# 在一个宁静的小镇上,住着一只名叫小白的猫。小白是一只可爱的白色猫咪,性格温顺,喜欢在阳光下打盹,也喜欢与镇上的小朋友们嬉戏。
#
# 一天上午,小白像往常一样在花园里晒太阳,突然它听到了旁边传来的欢笑声。小朋友们在草地上玩耍,追逐一种黑色的小球。小白被他们的快乐所吸引,忍不住站起身,优雅地走过去,想要一起参与。
#
# 小朋友们看到小白,纷纷停下手中的游戏,围了上来。小白于是决定用它最擅长的技巧——爬树,来展示自己。它灵巧地爬上一棵大树,树叶在阳光下闪烁,小白在树枝间跳跃着,时而发出轻柔的喵声,吸引了大家的目光。
#
# 小朋友们欢呼着为小白加油,兴奋得手舞足蹈。就在这个时候,小白发现树下的一个小女孩始终没有加入他们的欢笑。她坐在树下,低着头,似乎有些伤心。小白察觉到了她的情绪,决定下树去看看。
#
# 小白轻轻走到小女孩的身旁,蹭了蹭她的腿。小女孩抬起头,看到小白,顿时露出了微笑。小白用它温暖的身体靠在小女孩身上,仿佛在说:“没关系,我在这里陪着你。”
#
# 小女孩轻轻抚摸着小白的毛发,渐渐放松了心情。原来,她今天因为和朋友发生了争执而感到不快乐。小白似乎听懂了,围着小女孩转了几圈,然后又在她脚边坐下,像是在安慰她。
#
# 看到小女孩的脸上逐渐浮现出笑容,小白感到非常高兴。于是,它陪着小女孩,和她一起观察周围的小虫子和花朵,讲述着它一天的冒险。渐渐地,小女孩的心情变得越来越好,终于站起身,跟着小白一起回到了小朋友们中间。
#
# 小朋友们看到小女孩重新笑了,纷纷向她招手,邀请她一起玩耍。小白也在旁边欢快地跳跃,与小朋友们一起奔跑。那个午后,阳光洒满了整个花园,欢声笑语回荡在小镇的每一个角落。
#
# 从那天起,小白不仅是小镇的宠物,更成了所有孩子们的朋友。它用自己的温暖和善良,教会了大家如何去关心身边的人,传递快乐与爱。每当有小朋友感到沮丧时,小白总会出现在他们身边,陪伴他们,带来幸福的瞬间。
#
# 笑话:
# 为什么猫喜欢电脑?
#
# 因为它们喜欢用鼠标!🐱🖱️
#
# 诗歌:
# 在阳光下的窗台上,
# 懒洋洋的猫咪睡,
# 柔软的身影如云彩,
# 梦中追逐着轻风飞。
#
# 微微摇摆的尾巴,
# 像是夜空中闪烁的星,
# 一双明亮的眼睛里,
# 藏着无尽的秘密和灵灵。
#
# 轻步如风的猎手,
# 在花丛间悄然行,
# 每一次优雅的跳跃,
# 都如舞者般轻盈。
#
# 有时调皮捣蛋,
# 翻倒水杯,撒欢跑,
# 又在夜静时静卧,
# 伴我入梦,柔情满。
#
# 猫咪,你是家中的灵魂,
# 带来欢笑,温暖与安宁,
# 在每一个寻常的日子里,
# 你如星辰,闪烁不息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60# 基于 Functional API 的并行化工作流实现
from langchain_openai import ChatOpenAI
from langgraph.func import entrypoint, task
llm = ChatOpenAI(model="gpt-4o-mini")
def call_llm_1(topic: str):
"""第一个 LLM 调用,生成初始笑话"""
msg = llm.invoke(f" 写一个关于 {topic} 的笑话 ") # 调用 LLM 以根据主题写一个笑话
return msg.content # 返回笑话
def call_llm_2(topic: str):
"""第二个 LLM 调用,生成故事"""
msg = llm.invoke(f" 写一个关于 {topic} 的故事 ") # 调用 LLM 以根据主题写一个故事
return msg.content # 返回故事
def call_llm_3(topic):
"""第三个 LLM 调用,生成诗歌"""
msg = llm.invoke(f" 写一首关于 {topic} 的诗歌 ") # 调用 LLM 以根据主题写一首诗歌
return msg.content # 返回诗歌
def aggregator(topic, joke, story, poem):
""" 将故事、笑话和诗歌组合成单个输出 """
combined = f" 这是一个关于 {topic} 的故事、笑话和诗歌! \n\n" # 开始组合输出
combined += f" 故事:\n{story}\n\n" # 将故事添加到组合输出中
combined += f" 笑话:\n{joke}\n\n" # 将笑话添加到组合输出中
combined += f" 诗歌:\n{poem}" # 将诗歌添加到组合输出中
return combined # 返回组合输出
# 入口点装饰函数定义并行化工作流
def parallel_workflow(topic: str):
joke_fut = call_llm_1(topic) # 执行 call_llm_1 任务并获取笑话以进行并发执行
story_fut = call_llm_2(topic) # 执行 call_llm_2 任务并获取故事以进行并发执行
poem_fut = call_llm_3(topic) # 执行 call_llm_3 任务并获取诗歌以进行并发执行
return aggregator(topic, joke_fut.result(), story_fut.result(), poem_fut.result()).result()
# 在所有并发任务完成后执行 aggregator 任务
# 从聚合器获取最终结果
# 调用并行化工作流并流式传输每个步骤的更新
for step in parallel_workflow.stream("cats", stream_mode="updates"):
print(step)
print("\n")
# {'call_llm_1': '为什么猫总是喜欢在电脑上走来走去?\n\n因为它们想要“鼠标”!😸'}
#
#
# {'call_llm_3': '在阳光下懒洋洋, \n小猫咪轻轻长觉, \n毛茸茸的身影舞, \n如梦似幻在家中。\n\n眼中闪烁星辰光, \n俏皮玩耍灵动长, \n爬上窗台望远方, \n世界万千尽在掌。\n\n夜幕降临月光照, \n轻声细语伴风笑, \n神秘影子忽闪动, \n夜猫子独自流浪。\n\n温暖的怀抱中, \n柔软的心灵相依, \n小小生命,情无尽, \n与我同在,乐无比。'}
#
#
# {'call_llm_2': '从前,在一个宁静的小村庄里,住着一只名叫咪咪的可爱小猫。咪咪全身雪白,只有尖尖的耳朵和小鼻子是粉红色的。她的眼睛像两颗明亮的绿色宝石,总是闪烁着聪明和好奇的光芒。\n\n咪咪非常喜欢冒险,每天她都会在村庄里悠闲地游荡,探索四周的世界。春天,咪咪在盛开的花丛中追逐蝴蝶;夏天,她会在小溪边嬉戏,享受阳光的温暖;秋天,她则爱钻进落叶堆中,玩耍得不亦乐乎;而冬天,她在雪地里打滚,留下一个个可爱的猫爪印。\n\n有一天,咪咪在森林边缘发现了一条小路。出于好奇,她决定沿着小路走去。一路上,她遇到了很多新朋友,比如一只友好的小松鼠、一只聪明的狐狸,还有一只温柔的老母鸡。它们都热情地邀请咪咪一起玩。\n\n就在咪咪和新朋友们玩得开心的时候,突然天空乌云密布,雷声隆隆,一场暴风雨即将来临。咪咪慌忙找地方躲避,她想到了那棵古老的大树。大树的树干粗壮,树冠茂密,能够为她遮风挡雨。\n\n咪咪蜷缩在大树下,感到安全和温暖。就在这时,小松鼠和狐狸也跑了过来,它们同样找到了这个避风的地方。三只动物聚在一起,耐心地等待着暴风雨的结束。\n\n雨下得很大,咪咪和朋友们开始互相讲故事,分享各自的经历。他们聊到冬天的雪花,聊到寻找食物的冒险,甚至聊到梦想中的旅行。随着时间的流逝,暴风雨渐渐停了,阳光重新洒下大地,世界恢复了宁静。\n\n当雨后彩虹挂在天边时,咪咪和她的朋友们走出大树,欢呼着迎接新的一天。咪咪意识到,虽然她喜欢独自冒险,但有朋友陪伴的时光,才是最快乐的。\n\n从那以后,咪咪和她的新朋友们一起探索森林的每一个角落,分享快乐与冒险,建立了深厚的友谊。而小村庄的生活,也因她们的故事而充满了欢声笑语。\n\n咪咪明白了,无论人生旅程多么曲折,朋友的陪伴才是最珍贵的宝藏。'}
#
#
# {'aggregator': ' 这是一个关于 cats 的故事、笑话和诗歌! \n\n 故事:\n从前,在一个宁静的小村庄里,住着一只名叫咪咪的可爱小猫。咪咪全身雪白,只有尖尖的耳朵和小鼻子是粉红色的。她的眼睛像两颗明亮的绿色宝石,总是闪烁着聪明和好奇的光芒。\n\n咪咪非常喜欢冒险,每天她都会在村庄里悠闲地游荡,探索四周的世界。春天,咪咪在盛开的花丛中追逐蝴蝶;夏天,她会在小溪边嬉戏,享受阳光的温暖;秋天,她则爱钻进落叶堆中,玩耍得不亦乐乎;而冬天,她在雪地里打滚,留下一个个可爱的猫爪印。\n\n有一天,咪咪在森林边缘发现了一条小路。出于好奇,她决定沿着小路走去。一路上,她遇到了很多新朋友,比如一只友好的小松鼠、一只聪明的狐狸,还有一只温柔的老母鸡。它们都热情地邀请咪咪一起玩。\n\n就在咪咪和新朋友们玩得开心的时候,突然天空乌云密布,雷声隆隆,一场暴风雨即将来临。咪咪慌忙找地方躲避,她想到了那棵古老的大树。大树的树干粗壮,树冠茂密,能够为她遮风挡雨。\n\n咪咪蜷缩在大树下,感到安全和温暖。就在这时,小松鼠和狐狸也跑了过来,它们同样找到了这个避风的地方。三只动物聚在一起,耐心地等待着暴风雨的结束。\n\n雨下得很大,咪咪和朋友们开始互相讲故事,分享各自的经历。他们聊到冬天的雪花,聊到寻找食物的冒险,甚至聊到梦想中的旅行。随着时间的流逝,暴风雨渐渐停了,阳光重新洒下大地,世界恢复了宁静。\n\n当雨后彩虹挂在天边时,咪咪和她的朋友们走出大树,欢呼着迎接新的一天。咪咪意识到,虽然她喜欢独自冒险,但有朋友陪伴的时光,才是最快乐的。\n\n从那以后,咪咪和她的新朋友们一起探索森林的每一个角落,分享快乐与冒险,建立了深厚的友谊。而小村庄的生活,也因她们的故事而充满了欢声笑语。\n\n咪咪明白了,无论人生旅程多么曲折,朋友的陪伴才是最珍贵的宝藏。\n\n 笑话:\n为什么猫总是喜欢在电脑上走来走去?\n\n因为它们想要“鼠标”!😸\n\n 诗歌:\n在阳光下懒洋洋, \n小猫咪轻轻长觉, \n毛茸茸的身影舞, \n如梦似幻在家中。\n\n眼中闪烁星辰光, \n俏皮玩耍灵动长, \n爬上窗台望远方, \n世界万千尽在掌。\n\n夜幕降临月光照, \n轻声细语伴风笑, \n神秘影子忽闪动, \n夜猫子独自流浪。\n\n温暖的怀抱中, \n柔软的心灵相依, \n小小生命,情无尽, \n与我同在,乐无比。'}
#
#
# {'parallel_workflow': ' 这是一个关于 cats 的故事、笑话和诗歌! \n\n 故事:\n从前,在一个宁静的小村庄里,住着一只名叫咪咪的可爱小猫。咪咪全身雪白,只有尖尖的耳朵和小鼻子是粉红色的。她的眼睛像两颗明亮的绿色宝石,总是闪烁着聪明和好奇的光芒。\n\n咪咪非常喜欢冒险,每天她都会在村庄里悠闲地游荡,探索四周的世界。春天,咪咪在盛开的花丛中追逐蝴蝶;夏天,她会在小溪边嬉戏,享受阳光的温暖;秋天,她则爱钻进落叶堆中,玩耍得不亦乐乎;而冬天,她在雪地里打滚,留下一个个可爱的猫爪印。\n\n有一天,咪咪在森林边缘发现了一条小路。出于好奇,她决定沿着小路走去。一路上,她遇到了很多新朋友,比如一只友好的小松鼠、一只聪明的狐狸,还有一只温柔的老母鸡。它们都热情地邀请咪咪一起玩。\n\n就在咪咪和新朋友们玩得开心的时候,突然天空乌云密布,雷声隆隆,一场暴风雨即将来临。咪咪慌忙找地方躲避,她想到了那棵古老的大树。大树的树干粗壮,树冠茂密,能够为她遮风挡雨。\n\n咪咪蜷缩在大树下,感到安全和温暖。就在这时,小松鼠和狐狸也跑了过来,它们同样找到了这个避风的地方。三只动物聚在一起,耐心地等待着暴风雨的结束。\n\n雨下得很大,咪咪和朋友们开始互相讲故事,分享各自的经历。他们聊到冬天的雪花,聊到寻找食物的冒险,甚至聊到梦想中的旅行。随着时间的流逝,暴风雨渐渐停了,阳光重新洒下大地,世界恢复了宁静。\n\n当雨后彩虹挂在天边时,咪咪和她的朋友们走出大树,欢呼着迎接新的一天。咪咪意识到,虽然她喜欢独自冒险,但有朋友陪伴的时光,才是最快乐的。\n\n从那以后,咪咪和她的新朋友们一起探索森林的每一个角落,分享快乐与冒险,建立了深厚的友谊。而小村庄的生活,也因她们的故事而充满了欢声笑语。\n\n咪咪明白了,无论人生旅程多么曲折,朋友的陪伴才是最珍贵的宝藏。\n\n 笑话:\n为什么猫总是喜欢在电脑上走来走去?\n\n因为它们想要“鼠标”!😸\n\n 诗歌:\n在阳光下懒洋洋, \n小猫咪轻轻长觉, \n毛茸茸的身影舞, \n如梦似幻在家中。\n\n眼中闪烁星辰光, \n俏皮玩耍灵动长, \n爬上窗台望远方, \n世界万千尽在掌。\n\n夜幕降临月光照, \n轻声细语伴风笑, \n神秘影子忽闪动, \n夜猫子独自流浪。\n\n温暖的怀抱中, \n柔软的心灵相依, \n小小生命,情无尽, \n与我同在,乐无比。'}
1.5 协调器—工作者
协调器—工作者(Orchestrator-Worker)工作流模式专为子任务需求事先未知且需要在执行期间动态确定的复杂任务而设计。在此模式中,中央增强型 LLM 充当协调器 (Orchestrator),负责将初始任务分解为更小、更易于管理的子任务,并将这些子任务委派给充当工作者 (Worker)的增强型 LLM。一旦工作者增强型 LLM 完成了分配的子任务,协调器就会将它们的各个输出合成为连贯的最终结果。这种动态任务分解和委派是协调器—工作者与更简单的并行化工作流不同的地方。虽然并行化工作流通常用于处理预定义的子任务,但协调器—工作者更为灵活,可以适应每个输入的具体情况并动态生成必要的子任务。
协调器—工作者工作流特别适合难以或不可能预测必要子任务的复杂场景。例如编码任务,当被要求实现复杂功能或修复错误时,需要修改的文件数量以及每个文件中需要更改的性质通常取决于任务描述的具体情况。协调器增强型 LLM 可以分析任务,可能使用检索增强功能来访问项目文档或代码上下文,识别需要修改的文件和 代码,然后将每个部分的代码修改任务委派给各个工作者增强型 LLM,这些 LLM 可能配备有代码编辑工具。同样,在涉及从多个来源收集和分析信息的复杂搜索任务中, 协调器可以动态确定要检索的相关来源,并将每个来源的信息收集和分析委派给不同的工作者增强型 LLM。表面上,协调器—工作者与并行化工作流相似,它也使用了多个 LLM 调用。它们的区别在于灵活性。在协调器—工作者中,子任务不是预定义的, 而是由协调器根据特定输入动态确定的,从而为处理复杂和不可预测的任务提供了更具适应性和智能性的方法。
LangGraph 为实现协调器—工作者工作流提供了强大的支持,特别是通过其 Send API,允许在运行时动态创建工作者节点,使协调器节点能够根据需要将子任务分派给工作者节点。每个工作者都独立运行,维护自己的状态,它们的输出被收集在协调器可访问的共享状态键中。这种共享状态机制对于协调器收集所有工作者的输出并将其合成为最终结果至关重要(如图所示)。在 LangGraph 中定义协调器—工作者工作流时,协调器节点通常会生成概述子任务的计划,然后使用 Send API 为每个子任务动态创建和分派工作者节点。LangGraph 中的 Graph API 和 Functional API 都可以用于实现协调器—工作者工作流,Graph API 的图结构链路更易于实现流程变更。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112# 基于 Graph API 的协调器—工作者工作流实现
from langgraph.types import Send
import operator
from typing import Annotated, List
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# 结构化输出模式,用于规划报告章节
class Section(BaseModel):
name: str = Field(
description="报告章节的名称",
)
description: str = Field(
description="本章节中涵盖的主要主题和概念的简要概述",
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="报告的章节",
)
llm = ChatOpenAI(model="gpt-4o-mini")
planner = llm.with_structured_output(Sections, method="function_calling")
# 协调器—工作者工作流的图状态定义
class State(TypedDict):
topic: str # 报告主题
sections: list[Section] # 由协调器规划的报告章节列表
completed_sections: Annotated[
list, operator.add
] # 将所有工作者并行写入此键,使用 operator.add 进行列表连接
final_report: str # 最终合成报告
# 工作者状态定义,特定于工作者节点
class WorkerState(TypedDict):
section: Section
completed_sections: Annotated[list, operator.add] # 将工作者也写入共享的completed_sections键
# 图中的节点
def orchestrator(state: State):
""" 协调器,使用结构化输出生成报告计划 """
# 使用 planner LLM 和结构化输出生成报告计划
report_sections = planner.invoke(
[
SystemMessage(content="生成报告计划。"), # planner LLM 的系统消息
HumanMessage(content=f"这是报告主题:{state['topic']}"), # 包含报告主题的用户消息
]
)
return {"sections": report_sections.sections} # 返回计划中的章节,更新状态中的 sections
def llm_call(state: WorkerState):
""" 工作者根据分配的章节详细信息编写报告章节 """
# 使用 LLM 根据章节的名称和描述生成报告章节的内容
section = llm.invoke(
[
SystemMessage(
content="根据提供的章节的名称和描述编写报告章节,每个章节中不包含序言。使用 Markdown 格式。" # 工作者 LLM 的系统消息
),
HumanMessage(content=f"这是章节的名称:{state['section'].name} 和描述:{state['section'].description}" # 包含章节详细信息的用户消息
)
]
)
# 将生成的章节内容写入共享的'completed_sections'键
return {"completed_sections": [section.content]}
def synthesizer(state: State):
""" 将各个章节的输出合成为完整报告 """
# 从共享状态中检索已完成章节的列表
completed_sections = state["completed_sections"]
# 将已完成章节格式化为单个字符串用于最终报告
completed_report_sections = "\n\n---\n\n".join(completed_sections)
return {"final_report": completed_report_sections} # 返回最终报告, 更新状态中的 final_report
# 条件边函数,用于将工作者动态分配给计划中的每个章节
def assign_workers(state: State):
""" 使用 Send API 将工作者分配给计划中的每个章节,以实现动态工作者创建 """
# 使用 Send API 为每个章节动态创建和发送 llm_call 工作者节点
return [Send("llm_call", {"section": s}) for s in state["sections"]]
# 使用 StateGraph 构建协调器—工作者工作流
orchestrator_worker_builder = StateGraph(State)
# 将节点添加到图中
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)# 工作者节点
orchestrator_worker_builder.add_node("synthesizer", synthesizer)
# 定义边以连接节点并建立协调器—工作者流程
orchestrator_worker_builder.add_edge(START, "orchestrator") # 将开始节点连接到协调器节点
orchestrator_worker_builder.add_conditional_edges(
"orchestrator", assign_workers, ["llm_call"] # 从协调器节点到使用 Send API 动态创建的工作者节点的条件边
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer") # 工作者节点完成任务后,将其连接到合成器节点
orchestrator_worker_builder.add_edge("synthesizer", END) # 将合成器节点连接到结束节点
# 编译协调器—工作者工作流图
orchestrator_worker = orchestrator_worker_builder.compile()
# 使用示例报告主题调用协调器—工作者工作流
state = orchestrator_worker.invoke({"topic": "创建关于 LLM 缩放定律的报告"})
from IPython.display import Markdown
Markdown(state["final_report"]) # 以 Markdown 格式显示最终报告1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75# 基于 Functional API 的协调器—工作者工作流实现
from typing import List
from pydantic import BaseModel, Field
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langgraph.func import entrypoint, task
# 结构化输出模式,用于规划报告章节
class Section(BaseModel):
name: str = Field(
description="报告章节的名称",
)
description: str = Field(
description="本章节中涵盖的主要主题和概念的简要概述",
)
class Sections(BaseModel):
sections: List[Section] = Field(
description="报告的章节",
)
llm = ChatOpenAI(model="gpt-4o-mini")
planner = llm.with_structured_output(Sections, method="function_calling")
def orchestrator(topic: str):
""" 协调器,使用结构化输出生成报告计划 """
# 使用 planner LLM 和结构化输出生成报告计划
report_sections = planner.invoke(
[
SystemMessage(content="生成报告计划。"), # planner LLM 的系统消息
HumanMessage(content=f"这是报告主题:{topic}"), # 包含报告主题的用户消息
]
)
return report_sections.sections # 返回计划中的章节
def llm_call(section: Section):
""" 工作者根据分配的章节详细信息编写报告章节 """
# 使用 LLM 根据章节的名称和描述生成报告章节的内容
result = llm.invoke(
[
SystemMessage(
content="根据提供的章节的名称和描述编写报告章节,每个章节中不包含序言。使用 Markdown 格式。" # 工作者 LLM 的系统消息
),
HumanMessage(content=f"这是章节的名称:{section.name} 和描述:{section.description}" # 包含章节详细信息的用户消息
)
]
)
return result.content # 返回生成的章节内容
def synthesizer(completed_sections: list[str]):
""" 将各个章节的输出合成为完整报告 """
# 将已完成章节格式化为单个字符串用于最终报告
final_report = "\n\n---\n\n".join(completed_sections)
return final_report # 返回最终报告
# 入口点装饰函数定义协调器—工作者工作流
def orchestrator_worker(topic: str):
sections = orchestrator(topic).result() # 执行协调器任务以获取报告章节计划
section_futures = [llm_call(section) for section in sections] # 并行动态创建和执行每个章节的工作者任务
final_report = synthesizer(
[section_fut.result() for section_fut in section_futures] # 在所有工作者任务完成后执行合成器任务
).result() # 获取最终合成的报告
return final_report # 返回最终报告
# 使用示例报告主题调用协调器—工作者工作流
report = orchestrator_worker.invoke("创建关于 LLM 缩放定律的报告")
from IPython.display import Markdown
Markdown(report) # 以 Markdown 格式显示最终报告
1.6 评估器—优化器
评估器—优化器(Evaluator-Optimizer)是 Anthropic 的智能体系统分析中的另一种工作流模式,它体现了一种迭代改进过程,模仿了人类通常通过反馈和修订来改进其工作的方式。在此模式中,一个增强型 LLM 调用【通常称为“生成器”或“优化器”(Optimizer)】负责生成初始响应,而另一个增强型 LLM 调用【通常称为“评估器”(Evaluator)】的任务是提供对此响应的反馈,然后将此反馈返回给生成器增强型 LLM,提示其改进和完善后续输出。我们可以重复进行生成、评估和反馈的循环, 直到获得令人满意的结果或达到预定义的迭代次数。当存在可以明确表达和评估的标准,并且迭代改进能够显著增加输出价值时,评估器—优化器工作流尤其有效。
评估器—优化器工作流的有效性取决于两个关键因素。首先,初始的增强型 LLM 调用应该能够根据反馈显著改进响应,这与人类写作过程类似,在人类写作过程中,来自编辑的反馈可以显著增强文档效果。其次,评估器必须能够提供有用的和可操作的反馈,以指导生成器进行改进。评估器—优化器工作流模式非常适合那些需要实现高质量和细致入微的输出的任务(需要多次迭代)。例如文学翻译任务,增强型 LLM(翻译器)的初始翻译可能捕获了文本的字面意义,但遗漏了细微的差别、 文体元素或文化背景。经过训练或提示的评估器可以关注这些方面,提供有关改进的 反馈,如建议更恰当的措辞或考虑文化背景,翻译器可以根据此反馈在下一次迭代中 改进其翻译效果。同样,对于需要全面收集信息的复杂搜索任务,评估器可以评估目前为止收集的信息是否足够,或者是否需要进一步搜索和分析,并使用检索工具将搜索结果与现有的知识库进行比较。根据评估结果,可以迭代搜索过程,从而实现更彻底和更完整的信息搜索。
在 LangGraph 中,评估器—优化器工作流通过定义一个生成器节点和一个评估器节点来实现,如果需要进一步改进,则使用条件边从评估器返回生成器(如图所示)。此工作流中的状态通常包括生成的输出和来自评估器的反馈,允许生成器节点在后续迭代中访问和利用此反馈。LangGraph 中的 Graph API 和 Functional API 都可以用于创建评估器—优化器工作流,从而为迭代过程提供不同级别的抽象和控制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79# 基于 Graph API 的评估器—优化器工作流实现
from typing_extensions import TypedDict, Literal
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph, START, END
# 结构化输出模式,用于评估、定义反馈结构
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="判断笑话是否有趣。", # 评估等级:有趣或不好笑
)
feedback: str = Field(
description=" 如果笑话不好笑,那么提供改进它的反馈。", # 如果笑话不好笑,则提供改进它的反馈
)
llm = ChatOpenAI(model="gpt-4o-mini")
# 用于评估的增强型 LLM,配置为输出 Feedback 模式
evaluator = llm.with_structured_output(Feedback)
# 评估器—优化器工作流的图状态定义
class State(TypedDict):
joke: str
topic: str
feedback: str
funny_or_not: str
# 图中的节点
def llm_call_generator(state: State):
"""LLM 生成笑话,可能会结合之前评估器的反馈"""
if state.get("feedback"): # 检查状态中是否存在反馈
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话,但要考虑反馈: {state['feedback']}") # 调用 LLM 生成笑话,结合反馈
else:
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话") # 调用 LLM 生成初始笑话,没有反馈
return {"joke": msg.content} # 返回生成的笑话,更新状态中的 joke
def llm_call_evaluator(state: State):
"""LLM 使用结构化输出评估生成的笑话"""
grade = evaluator.invoke(f"评估笑话 {state['joke']}") # 调用评估器 LLM 来评估笑话
return {"funny_or_not": grade.grade, "feedback": grade.feedback} # 返回评估等级和反馈,更新状态中的 funny_or_not 和 feedback
# 条件边函数,用于根据评估结果进行路由,创建反馈循环
def route_joke(state: State):
""" 根据评估器的反馈,路由回笑话生成器或结束 """
if state["funny_or_not"] == "funny": # 检查笑话是否被评估为有趣
return "Accepted" # 如果笑话被接受,则结束
elif state["funny_or_not"] == "not funny": # 检查笑话是否被评估为不好笑
return "Rejected + Feedback" # 如果笑话被拒绝,则路由回生成器,结合反馈进行改进
# 使用 StateGraph 构建评估器—优化器工作流
optimizer_builder = StateGraph(State)
# 将节点添加到图中
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)
# 定义边以连接节点并建立反馈循环
optimizer_builder.add_edge(START, "llm_call_generator") # 将开始节点连接到生成器节点
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator") # 将生成器节点连接到评估器节点
optimizer_builder.add_conditional_edges(
"llm_call_evaluator",
route_joke,
{ # 由 route_joke 返回的名称:要访问的下一个节点的名称
"Accepted": END, # 如果笑话被接受,则结束
"Rejected + Feedback": "llm_call_generator", # 如果笑话被拒绝则路由回生成器,创建反馈循环,
},
)
# 编译评估器—优化器工作流图
optimizer_workflow = optimizer_builder.compile()
# 使用示例主题调用评估器—优化器工作流
state = optimizer_workflow.invoke({"topic": "Cats"})
print(state["joke"])
# 为什么猫总是喜欢坐电脑上?
#
# 因为它们想要“掌控鼠标”!😸1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57# 基于 Functional API 的评估器—优化器工作流实现
from typing_extensions import Literal
from pydantic import BaseModel, Field
from langgraph.func import entrypoint, task
# 结构化输出模式,用于评估、定义反馈结构
class Feedback(BaseModel):
grade: Literal["funny", "not funny"] = Field(
description="判断笑话是否有趣。", # 评估等级:有趣或不好笑
)
feedback: str = Field(
description=" 如果笑话不好笑,那么提供改进它的反馈。", # 如果笑话不好笑,则提供改进它的反馈
)
llm = ChatOpenAI(model="gpt-4o-mini")
# 用于评估的增强型 LLM,配置为输出 Feedback 模式
evaluator = llm.with_structured_output(Feedback)
# 工作流中的节点,定义为任务
def llm_call_generator(topic: str, feedback: Feedback):
"""LLM 生成笑话,可能会结合之前评估器的反馈"""
if state.get("feedback"): # 检查状态中是否存在反馈
msg = llm.invoke(f"写一个关于 {topic} 的笑话,但要考虑反馈: {feedback}") # 调用 LLM 生成笑话,结合反馈
else:
msg = llm.invoke(f"写一个关于 {topic} 的笑话") # 调用 LLM 生成初始笑话,没有反馈
return msg.content # 返回生成的笑话
def llm_call_evaluator(joke: str):
"""LLM 使用结构化输出评估生成的笑话"""
feedback = evaluator.invoke(f"评估笑话 {joke}") # 调用评估器 LLM 来评估笑话
return feedback # 返回评估器的反馈
# 入口点装饰函数定义评估器—优化器工作流
def optimizer_workflow(topic: str):
feedback = None # 将反馈初始化为 None,用于第一次迭代
while True: # 迭代反馈循环的开始
joke = llm_call_generator(topic, feedback).result() # 执行生成器任务以创建笑话
feedback = llm_call_evaluator(joke).result() # 执行评估器任务以评估笑话并获取反馈
if feedback.grade == "funny": # 检查笑话是否被评估为有趣
break # 如果笑话有趣,则退出循环
return joke # 返回最终优化的笑话
# 调用评估器—优化器工作流并流式传输每个步骤的更新
for step in optimizer_workflow.stream("Cats", stream_mode="updates"):
print(step)
print("\n")
# {'llm_call_generator': '为什么猫咪总是不参与社交活动?\n\n因为它们更喜欢“喵”独处!'}
#
#
# {'llm_call_evaluator': Feedback(grade='funny', feedback='这个笑话利用了‘喵’这个词的双关性,把猫咪的叫声与独处的‘要独处’相结合, create了幽默的效果。简洁明了,也很可爱,容易引起共鸣。')}
#
#
# {'optimizer_workflow': '为什么猫咪总是不参与社交活动?\n\n因为它们更喜欢“喵”独处!'}
1.7 总结
在本节中,我们借鉴了 Anthropic 在智能体系统模式分类方面的基础工作,探讨了五种基本的在智能体系统中可用的工作流模式:提示链、路由、并行化、协调器— 工作者和评估器—优化器。这些工作流都建立在增强型 LLM 的概念之上,为构建和管理 AI 智能体系统的复杂性提供了独特的方法,尤其是在 LangGraph 框架内。提示链提供了一种线性的、循序渐进的任务分解方法,非常适合通过顺序处理可以提高准确性的场景。路由引入了智能输入分类,从而能够专门处理各种请求并优化资源分配。 并行化利用并发的增强型 LLM 处理来提高速度和增强可靠性,适用于可以分解为独立子任务或受益于多重视角的任务。协调器—工作者提供了动态任务分解和委派功能, 这对于运行复杂、不可预测的任务至关重要。评估器—优化器实施了迭代改进循环, 从而可以通过反馈和修订来实现高质量、细致入微的输出。这些常用的智能体系统的工作流模式的特征比较如表所示。
工作流模式 用例 优势 复杂性 LangGraph 突出显示的功能 提示链 顺序执行任务,提高准确性 提高准确性、 过程受控、循序渐进 低 Graph / Functional API、条件边 路由 多样化输入,专门处理 优化提示、关注点分离、提高效率 中 Graph / Functional API、 条件边、路由器节点 并行化 独立子任务, 提高速度,多 重视角 提高速度、置信度更高、多方面处理能力 中 Graph / Functional API、 并行节点、聚合器节点 协调器—工作者 复杂、不可预测的任务 动态任务分解、灵活、自适应 高 Graph / Functional API、 Send API、 动态工作者 评估器—优化器 迭代改进、高质量输出 提高输出质量、输出结果细致入微、迭 代改进 中 Graph / Functional API、 反馈循环、状态管理 这些工作流模式并非互斥的,而是可以组合使用它们或对它们进行定制,以满足广泛的 AI 应用程序需求。对工作流模式或其组合的选择,在很大程度上取决于特定的任务要求、所需的控制级别,以及复杂性、延迟和性能之间的权衡。LangGraph 凭借其 Graph API 和 Functional API,为实现这些工作流提供了一个通用的平台,并且提供了持久化、流式传输和调试等对于构建健壮且可维护的智能体系统至关重要的功能。理解增强型 LLM 并有效利用这些智能体系统的工作流是开发者构建复杂 AI 应用程序的关键,这些应用程序不仅功能强大,而且结构良好、可预测且更易于管理。
2、多智能体架构
随着构建 AI 智能体技术的不断进步,我们所要解决的任务的复杂性通常超越了单一的、单片式的智能体设计。在面对多方面的难题时,即使是具有广泛工具访问权限的增强型 LLM 形式的单一智能体也会变得笨重,这可能会导致糟糕的决策、不堪重负的上下文窗口,以及难以扩展功能或使功能专门化。多智能体系统的概念因此变得非常有价值。我们不再依赖单一智能体来处理所有的事情,而是将 AI 应用程序分解为一组更小的、更专注的智能体,每个智能体都具有特定的职责和专业知识,并且, 这些智能体进行交互和协作,以实现系统的总体目标。这种模块化方式不仅反映了人类团队合作的重要性,而且在 AI 开发中也具有显著的优势。
多智能体系统的核心在于支持构建复杂的业务逻辑。它们通过采用模块化、专业化和受控通信方式,为构建更复杂和可管理的 AI 应用程序提供了途径。模块化是 一个关键优势,因为将复杂的智能体分解为更小的、独立的智能体,简化了开发、测试和维护流程。让每个智能体都可以进行独立的开发和改进,可减轻整个系统的认知负担,并使调试变得更加容易。针对特定领域或任务量身定制专家智能体,如研究智能体、数学智能体、计划智能体等,而不是让通才智能体在各领域苦苦挣扎。这种专业化提高了每个智能体在特定领域的性能和准确性,有助于构建更强大的整体系统。 最后,多智能体架构提供了对智能体交互的控制。与仅仅依靠 LLM 函数调用进行智能体间通信不同,多智能体系统允许开发者显式定义和管理智能体如何通信、交换信息以及协调其行动。这种显式控制对于构建可预测、可靠和可审计的 AI 系统至关重要。
LangGraph 为构建和编排多智能体系统提供了强大的框架,其基于图的架构本身就非常适合表示和管理多个智能体之间的交互(如图所示)。LangGraph 提供了各种连接智能体的模式,每种模式都有其自身的优势和用例。在本节中,我们将探索 LangGraph 中几种关键的多智能体架构,特别关注主管(Supervisor)架构和分层(Hierarchical)架构, 并简要介绍网络(Network) 架构, 展示 LangGraph Supervisor 和 Swarm 类库是如何简化多智能体系统实现的。
![image-20251206144012822]()
2.1 主管架构
主管架构是多智能体系统的核心模式,当需要明确的编排点来管理和指导任务流时,它尤其有效。在这种架构中,指定的主管智能体充当中央协调员的角色, 监督和指导多个专业智能体的活动。想象一下,人类团队中的项目经理(主管智能体)的作用与之类似,接收请求,将任务委派给最合适的专业智能体,然后综合管理响应。专业智能体与外部世界(包括用户)之间的通信大多通过主管智能体进行路由。这种集中控制使主管架构可预测、可管理,并且非常适合需要结构化协调的任务。
要重点理解主管架构与1.5节中讨论的协调器—工作者工作流之间的关系,虽然它们有一些相似之处,但它们在不同的抽象级别上运行,并服务于不同的目的。
乍一看,主管架构可能与协调器—工作者工作流相似。这两种模式都涉及一个中央协调实体(架构中的主管智能体,工作流中的协调器),该实体指导并将工作委派给其他组件(专业智能体与工作者)。二者也都适用于从分解和专业知识中受益的复杂任务。在这两种模式中,中央协调实体都负责制定关于任务分配的决策,以及综合管理结果。甚至可以说,主管智能体是一种协调器——协调其智能体团队的活动。
尽管存在这些相似之处,但主管架构与协调器—工作者工作流在范围和目的上有本质区别。
- 架构模式与工作流模式:主管架构是一种用于组织多个智能体并定义其交互的高级系统架构,它指示如何在多智能体系统中构建智能体,以及实现智能体间的通信。相比之下,协调器—工作者工作流是一种较低级别的任务执行模式,它描述了如何对单个复杂任务进行分解和处理。它是一种可以在各种架构中使用的工作流,包括在作为主管架构一部分的智能体中使用。
- 关注智能体管理与关注任务执行:主管架构的主要关注点是管理和协调多个自主智能体,其中主管智能体决定在给定上下文和问题的情况下调用哪个智能体。协调器—工作者工作流的主要关注点是通过将单个复杂任务分解为多个子任务并将其分发给工作者来高效执行这个复杂任务,其中协调器负责计划和管理特定任务。
- 组件性质:在主管架构中,核心组件是智能体——自主实体,具有决策能力和专门角色。在协调器—工作者工作流中,组件一般是 LLM 调用或函数—— 工作者通常更简单,是更具针对性的任务单元,旨在执行预定义的子任务。
- 应用范围:主管架构描述了多智能体系统的整体组织架构,可能涵盖许多不同类型的任务,以及在较长时间内发生的交互。协调器—工作者工作流通常被应用于更广泛系统中的特定复杂任务。例如,主管架构中的单个专业智能体可能在内部利用协调器—工作者工作流来完成主管智能体分配给它的特别复杂的子任务。
换一个角度,我们可以想象一家大型公司,主管架构就像公司的组织结构,首席执行官(主管智能体)管理不同的部门(专业智能体),例如营销部、销售部和工程部,他决定让哪个部门参与项目。协调器—工作者工作流就像工程部内部使用的项目管理方法,工程经理(协调器)使用此方法将大型软件项目(任务)分解为较小的编码任务,并将它们分配给各个开发者(工作者)。协调器—工作者工作流是工程部 (或任何部门)用于完成工作的工具,而主管架构是整个公司的组织和管理方式。
本质上,主管架构为多智能体系统提供了组织框架,而协调器—工作者工作流是智能体在该架构内可以使用的任务执行策略。
在 LangGraph 中,主管架构通过在图中指定特定节点作为专业智能体,并引入中央主管节点得到了有效实现。这个主管节点通常由 LLM 驱动,负责根据图的当前状态和手头的任务,决定接下来激活哪个智能体节点。主管节点有效地充当动态路由器,将执行流程导向最合适的专业智能体,从而隐式地选择和启动该智能体所体现的工作流。控制此流程的关键机制是 LangGraph 中 Command 对象的使用。当主管智能体做出决策时,它会返回一个 Command 对象,该 Command 对象指定要调用的下一 个智能体节点。这个 Command 对象不仅决定了控制流程,还可以携带状态更新,允许在主管智能体和专业智能体之间传递信息,确保跨工作流阶段保持上下文。主管架构的工作流时序图如图所示。
LangChain 团队新推出的 LangGraph Supervisor 类库提供了一种简化的方式来创建基于主管架构的多智能体系统。它简化了定义专业智能体的过程——每个专业智能体都可以潜在地体现特定的智能体工作流,以及设置主管智能体来管理它们的过程。 该类库在主管智能体如何做出路由决策,以及如何在整个对话历史记录中管理智能体输出方面提供了灵活性。下面以一个使用 LangGraph Supervisor 类库的实际例子来说明核心概念。在此示例中,我们创建了一个主管智能体来管理两个专业智能体:math_expert (数学智能体)和 research_expert(研究智能体)。
1
pip install langgraph-supervisor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
model = ChatOpenAI(model="gpt-4o-mini")
# (1)为专业智能体定义工具
# 定义表示每个专业智能体工具的函数
# add 和 multiply 用于数学智能体,web_search 用于研究智能体
def add(a: float, b: float) -> float:
""" 两个数字相加 """
return a + b
def multiply(a: float, b: float) -> float:
""" 两个数字相乘 """
return a * b
def web_search(query: str) -> str:
""" 在网络上搜索信息 """
return (
"以下是 FAANG 公司 2024 年的员工人数:\n"
"1. **Facebook (Meta)**: 67,317 名员工。\n" "2. **Apple**: 164,000 名员工。\n"
"3. **Amazon**: 1,551,000 名员工。\n"
"4. **Netflix**: 14,000 一名员工。\n"
"5. **Google (Alphabet)**: 181,269 名员工。"
)
# (2)使用 create_react_agent 创建专业智能体
# 使用 LangGraph 的工具函数 create_react_agent 创建每个专业智能体 # 为每个专业智能体配置特定的模型、工具、名称和提示
math_agent = create_react_agent(
model=model,
tools=[add, multiply], # 数学智能体的工具是 add 和 multiply
name="math_expert", # 标识数学智能体的名称
prompt=" 你是一位数学专家,始终使用一个工具。" # 指导数学智能体行为的提示
)
research_agent = create_react_agent(
model=model,
tools=[web_search], # 研究智能体的工具是 web_search
name="research_expert", # 标识研究智能体的名称
prompt="你是一位世界一流的研究专家,可以进行网络搜索,不要做任何数学运算。" # 指导研究智能体行为的提示
)
#(3)使用 create_supervisor 创建主管工作流
# 使用 LangGraph Supervisor 的 create_supervisor 创建主管工作流
# 传递专业智能体列表、主管智能体的模型以及主管智能体的提示
workflow = create_supervisor(
[research_agent, math_agent], # 由主管智能体管理的专业智能体列表
model=model, # 主管智能体的模型
prompt="你是一位团队主管,管理一位研究专家和一位数学专家。研究专家能够利用网络搜索工具进行查询。", # 指导主管智能体行为的提示
)
#(4)编译并运行工作流
# 将工作流编译为 LangGraph 应用程序,并使用用户消息调用它
app = workflow.compile() # 将工作流编译为可执行的 LangGraph 应用程序
result = app.invoke({ # 使用用户消息调用已编译的应用程序
"messages": [
{
"role": "user",
"content": "2024 年 FAANG 公司的总员工人数是多少? " # 用户查询
}
]
})
print(result["messages"][-1].content)
# 截至2024年,FAANG 公司的总员工人数为 1,977,586 名。具体数据如下:
#
# 1. **Facebook (Meta)**: 67,317 名
# 2. **Apple**: 164,000 名
# 3. **Amazon**: 1,551,000 名
# 4. **Netflix**: 14,000 名
# 5. **Google (Alphabet)**: 181,269 名
#
# 如需进一步的信息,请随时告诉我!在此示例中,langgraph-supervisor 库中的 create_supervisor 函数封装了构建主管架构的复杂性。它接收预构建的专业智能体列表(research_agent、math_agent)、主管智能体的 LLM 模型,以及指导主管智能体行为的提示。compile() 方法将主管工作流转换为可执行的 LangGraph 应用程序,可以随时通过用户输入进行调用。在调用时, 主管智能体会智能地将用户的查询路由到适当的专业智能体(在本示例中,有研究智能体和数学智能体),并编排该过程以得出最终答案。
2.2 分层架构
虽然与单智能体系统相比,主管架构在管理复杂性方面有了显著进步,但是在处理大量专业智能体或高度复杂的任务时,即使是主管智能体也可能不堪重负。随着主管智能体下属的智能体数量的增长,主管智能体的决策效率可能会降低,并且它需要管理的上下文可能会变得过于庞大和复杂。为了解决可扩展性问题,我们可以引入多智能体分层架构。
分层架构涉及创建主管智能体的主管。这里不是让单个主管智能体直接管理所有的专业智能体,而是将专业智能体组成团队,每个团队由团队级主管智能体管理, 并且,引入“顶层主管”来监督和协调这些团队主管智能体的活动。这样就创建了一个分层结构,类似于人类团队中的组织层次结构。假如有一个“研究团队”主管智能体,管理研究智能体和数学智能体,以及一个“写作团队”主管智能体,管理写作智能体和出版智能体。“顶层主管”将根据用户的请求决定是聘请“研究团队”还是“写作团队”。这种分层方法大大减轻了任何单个主管智能体的认知负荷,因为每个主管 智能体都负责管理一个更小、更专注的智能体或主管智能体组。它还增强了层次结构中不同级别的模块化和专业化。
LangGraph Supervisor 类库可以自然扩展以支持分层系统。我们可以为每个团队创建单独的主管工作流,然后将它们组合在“顶层主管”下。这种可组合性是 LangGraph 及其 Supervisor 类库的关键优势,允许我们相对轻松地构建复杂的多层智能体系统。下面我们通过一个例子来说明这一点。在前面例子的基础上,我们创建一 个分层系统,其中包含一个“研究团队”主管智能体和一个“写作团队”主管智能体, 由“顶层主管”管理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119from langchain_openai import ChatOpenAI
from langgraph_supervisor import create_supervisor
from langgraph.prebuilt import create_react_agent
# 这里推荐使用工具调用能力较强的 LLM,如 OpenAI GPT-4o
model = ChatOpenAI(model="gpt-4o-mini")
# (1)定义研究团队智能体及其主管智能体
# 定义研究团队(math_expert 和 research_expert)的工具和智能体
def add(a: float, b: float) -> float:
""" 两个数字相加 """
return a + b
def multiply(a: float, b: float) -> float:
""" 两个数字相乘 """
return a * b
def web_search(query: str) -> str:
""" 在网络上搜索信息 """
return (
"以下是 FAANG 公司 2024 年的员工人数:\n"
"1. **Facebook (Meta)**: 67,317 名员工。\n" "2. **Apple**: 164,000 名员工。\n"
"3. **Amazon**: 1,551,000 名员工。\n"
"4. **Netflix**: 14,000 名员工。\n"
"5. **Google (Alphabet)**: 181,269 名员工。"
)
math_agent = create_react_agent(
model=model,
tools=[add, multiply],
name="math_expert",
prompt="你是一位数学专家。"
)
research_agent = create_react_agent(
model=model,
tools=[web_search],
name="research_expert",
prompt=" 你是一位世界一流的研究专家,可以进行网络搜索。不要做任何数学运算。"
)
research_team_supervisor = create_supervisor(
[research_agent, math_agent], # 研究团队内的智能体
model=model,
prompt="你正在管理一个研究团队,该团队由研究专家和数学专家组成。研究专家能够利用网络搜索工具进行查询。",
# 指导研究团队主管智能体行为的提示
)
research_team = research_team_supervisor.compile(name="research_team") # 编译研究团队工作流并命名
# (2)定义写作团队智能体及其主管智能体
# 定义写作团队(writing_expert 和 publishing_expert)的工具和智能体
def write_report(topic: str) -> str:
""" 撰写关于给定主题的报告 """
return f" 关于 {topic} 的报告:(报告的详细内容)"
def publish_report(report: str) -> str:
""" 发布报告 """
return f" 报告已发布:{report}"
writing_agent = create_react_agent(
model=model,
tools=[write_report],
name="writing_expert",
prompt="你是一位写作专家。"
)
publishing_agent = create_react_agent(
model=model,
tools=[publish_report],
name="publishing_expert",
prompt="你是一位出版专家。"
)
writing_team_supervisor = create_supervisor(
[writing_agent, publishing_agent], # 写作团队内的智能体
model=model,
prompt="你正在管理一个写作团队,该团队由写作专家和出版专家组成。", # 指导写作团队主管智能体行为的提示
)
writing_team = writing_team_supervisor.compile(name="writing_team") # 编译写作团队工作流并命名
# (3)定义顶层主管
# 定义顶层主管以管理研究团队和写作团队
top_level_supervisor_agent = create_supervisor(
[research_team, writing_team], # 传递已编译的研究团队和写作团队工作流作为智能体
model=model,
prompt="你是一位顶层主管,管理研究团队和写作团队。", # 指导顶层主管行为的提示
)
top_level_supervisor = top_level_supervisor_agent.compile(name="top_level_supervisor") # 编译顶层主管工作流并命名
# (4)调用顶层主管
# 使用用户查询调用顶层主管
result = top_level_supervisor.invoke({
"messages": [
{
"role": "user",
"content": "2024 年 FAANG 公司的总员工人数是多少? " # 用户查询
}
]
})
print(result["messages"][-1].content)
# 截至2024年,FAANG公司的总员工人数大约为1,977,586名员工。以下是各公司的员工数:
#
# 1. **Facebook (Meta)**: 67,317名员工
# 2. **Apple**: 164,000名员工
# 3. **Amazon**: 1,551,000名员工
# 4. **Netflix**: 14,000名员工
# 5. **Google (Alphabet)**: 181,269名员工
#
# 这些数据可能会有所变动,因此建议查阅最新的公司官方报告以获取准确数字。在这个示例中,我们首先定义和编译了两个团队工作流:research_team 和 writing_team,每个工作流都有自己的主管智能体和专业智能体。然后,创建了一 个 top_level_supervisor 来管理这些已编译的团队工作流作为其智能体。关键是将已编译的工作流(research_team、writing_team)作为智能体传递给在定义 top_level_supervisor_agent 时使用的 create_supervisor 函数。通过主管智能体和智能体团队的这 种嵌套创建了所需的分层结构。当在 top_level_supervisor 上调用用户查询时,它首先决定哪个团队最相关(在本示例中为 research_team),然后将任务委派给相应的团 队主管智能体,而团队主管智能体又管理其专业智能体来响应请求。这种分层方法显著提高了复杂多智能体系统的可扩展性和可管理性。
2.3 网络架构
虽然主管架构和分层架构提供了结构化的控制,但网络架构为多智能体系统提供了一种更分散、更灵活的方法。在这种模型中,每个智能体都可以直接与其他网络中的任何智能体进行通信(多对多连接),而不需要中央主管调解所有的交互。一个智能体根据其当前状态、整体系统目标,以及可能已交换的消息自主决定接下来调用哪个智能体。
网络架构在以下情况下尤其有利。
- 任务本质上是协作式的,需要涌现行为。当无法预测智能体交互的最佳顺序时, 网络允许进行更动态和自适应的协作。
- 需要去中心化决策。将控制权分散到智能体之间可以带来具备更强大容错能力的系统,因为一个智能体的故障不一定会使整个系统瘫痪。
- 专业智能体需要灵活交互。如果不同的智能体拥有在任务的各点可能相关的独特专业知识,则网络允许的信息交换会更流畅。
然而,网络架构也引入了需要开发者必须仔细管理的复杂性,并且需要仔细设计以确保多个智能体在协同工作时目标的明确性和行为的连贯性。Swarm 架构是网络架构的一个特别有趣且实用的子类型。正如在 LangChain 团队所构建的 LangGraph Swam 类库中实现的那样,它是一种特定类型的网络架构,其中智能体根据其专业化动态地相互移交控制权。在 Swarm 架构中,智能体被设计为特定领域或任务的专家, 并且系统动态地将对话路由到最适合处理当前上下文的智能体。Swarm 的一个关键特征是它能够“记住”上次激活的智能体。这可以确保在同一个对话线程的后续交互中,系统能够智能地从该智能体处恢复,从而保持上下文和连续性。Swarm 架构的工作流时序图如图所示。
Swarm 架构(langgraph-swarm 类库)具有以下关键特征。
- 具有移交功能的多智能体协作。Swarm 擅长实现专业智能体之间的协作。它为智能体配备了“移交工具”,允许它们显式地将控制权和上下文转移到被认为更适合执行当前任务的其他智能体中。
- 动态智能体路由。Swarm 内对话的路由是动态且依赖上下文的。智能体根据其专业知识和当前的对话状态做出关于移交的决策,从而创建一个灵活且自适应的系统。
- 通过记忆保持对话的连续性。Swarm 维护对话历史记录并“记住”给定线程中上次激活的智能体。这种记忆对于在多次交互中提供连贯且连续的用户体验至关重要。
- 可定制的移交机制。诸如 langgraph-swarm 之类的类库提供了用于创建和定制移交机制的工具。这允许开发者根据其特定的应用程序需求定制移交过程, 包括定义移交触发器、指定在智能体之间传递的信息,以及创建自定义移交工具。
1
pip install langgraph-swarm
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langgraph_swarm import create_handoff_tool, create_swarm
# 这里推荐使用工具调用能力较强的 LLM,如 OpenAI GPT-4o
model = ChatOpenAI(model="gpt-4o-mini")
def add(a: int, b: int) -> int:
""" 两个数字相加 """
return a + b
# (1)创建专业智能体(Alice 和 Bob)
# 使用 create_react_agent 定义两个专业智能体 : Alice 和 Bob
# Alice 是一位数学专家,拥有一个 add 工具和一个移交给 Bob 的移交工具
alice = create_react_agent(
model,
[add, create_handoff_tool(agent_name="Bob")], # Alice has an add工具和一个移交给 Bob 的移交工具
prompt="你是 Alice,一位加法专家,使用这个工具完成所有加法运算。",
name="Alice",
)
# Bob 说话的语气像一个海盗,并且拥有一个移交给 Alice 以寻求数学帮助的移交工具
bob = create_react_agent(
model,
[create_handoff_tool(agent_name="Alice",
description=" 请务必将所有数学问题转移给 Alice,她可以帮助你解决数学问题 ")],
# Bob 拥有一个移交给 Alice 的移交工具
prompt="你是 Bob,你说话的语气像一个海盗。",
name="Bob",
)
# (2)创建用于对话记忆的内存存档点
# InMemorySaver 用于短期记忆,这对于保持对话的连续性至关重要
checkpointer = InMemorySaver()
# (3)使用 create_swarm 创建 Swarm 工作流
# create_swarm 函数使用 Alice 和 Bob 设置 Swarm 架构
# default_active_agent="Alice" 表示将 Alice 设置为新对话的默认激活的智能体
workflow = create_swarm(
[alice, bob], # Swarm 中的智能体列表
default_active_agent="Alice" # 用于启动新对话的默认激活的智能体
)
# (4)使用内存存档点编译工作流
# 编译 Swarm 工作流,传递内存存档点以进行内存管理
app = workflow.compile(checkpointer=checkpointer)
# (5)在多轮对话中调用 Swarm
# 多次调用 Swarm,模拟对话线程
# config={"configurable": {"thread_id": "1"}} 确保消息在同一个对话线程中被跟踪
config = {"configurable": {"thread_id": "1"}}
turn_1 = app.invoke( # 第一轮——用户想和 Bob 说话
{"messages": [{"role": "user", "content": "我想和 Bob 说话"}]},
config,
)
print('第一轮的输出:', turn_1["messages"][-1].content) # 第一轮的输出
turn_2 = app.invoke( # 第二轮——用户提出一个数学问题
{"messages": [{"role": "user", "content": "5 + 7 等于多少? "}]},
config,
)
print('第一轮的输出:', turn_2["messages"][-1].content) # 第二轮的输出
# 第一轮的输出: Ahoy, matey! What brings ye to me swashbucklin' shores? Speak up, and let’s sail the seas of conversation together! Arrr! 🏴☠️
# 第一轮的输出: 5 + 7 等于 12。- 在此示例中,langgraph-swarm 中的 create_swarm 函数简化了 Swarm 网络式多智能体系统的创建,它接收智能体列表并设置动态路由和移交机制。InMemorySaver 内存存档点确保对话历史记录和活动智能体跨轮次持久存在,从而实现有状态且连续的多智能体交互。
诸如 langgraph-Swarm 的类库提供了自定义选项,我们可以根据特定需求定制网络式交互行为。关键的自定义选项如下所示。
- 自定义移交工具。开发者可以创建自定义移交工具,并且可以修改工具的名称、 描述、调用参数,以及在移交期间在智能体之间传递的信息。这允许对智能体通信和上下文传输进行细粒度控制。
- 智能体实现。虽然默认假设是通过多个智能体共享的单个 messages 数据结构进行通信,但开发者可以自定义智能体实现来使用不同的状态模式和消息键。 这对于创建具有私有消息历史记录或专门状态管理要求的智能体非常有用。 但是,这需要仔细管理智能体和 Swarm 之间的状态转换与通信路径。
- 内存管理。有效的内存管理对于 Swarm 架构至关重要,尤其是在长时间运行的对话中。选择合适的内存存档点并尽可能集成长期记忆解决方案,对于维护上下文并使智能体能够随着时间的推移来学习和适应至关重要。
2.4 总结
在本节中,我们探讨了 LangGraph 中的多智能体架构,重点介绍了主管架构和分层架构,并简要讨论了网络架构。主管架构为编排专业智能体提供了中央控制点, 而分层架构通过引入主管智能体的主管(创建团队和团队经理)扩展了这一概念,以管理更广泛的复杂性。网络架构为协作和涌现任务行为提供了更分散、更灵活的替代方案。LangGraph 及其 Supervisor 和 Swarm 类库提供了用于实现基于主管的架构的优秀工具,简化了健壮的、可扩展的和可管理的多智能体系统的创建。通过利用这些模式和工具,开发者可以构建复杂的 AI 应用程序,这些应用程序通过智能体协作有 效地完成复杂任务。主管架构、分层架构和网络架构的比较如表所示。
多智能体架构 结构 复杂性管理 可扩展性 灵活性 用例 主管架构 中央主管智能体、专业智能体 集中控制,可管理中等复杂性 中 低 需要明确协调、 明确定义的专业化任务 分层架构 顶层主管智能体、团队主动智能体、智能体 分层控制,降低每个主动智能体的负载,高度模块化 高 中 复杂任务,有大量智能体, 需要组织结构 网络架构 分散式智能体、多对多通信 分布式控制,涌现行为,更难管理 高 高 协作任务,动态环境、不可 预测的工作流
3、情境感知智能体架构
- 传统的 AI 应用程序,尤其是那些使用了 LLM 的应用程序,主要提供基于聊天的用户体验。虽然聊天模式对用户友好且易于实施,但它本质上将发起和管理交互的责任完全放在人类用户身上。我们作为人类用户,必须积极参与到对话式的互动中, 明确地提示 AI 智能体开始工作,并指导它完成任务的每个步骤。这种交互式模型虽然适用于许多场景,但引入了显著的交互开销,限制了我们利用 AI 大规模提升自身生产力的潜力,并且约束了 LLM,使其无法被充分利用。
- 一种变革性的替代方案是情境感知智能体(Ambient Agent)架构。这种架构设想 AI 智能体在后台主动且持续地运行,勤勉地监控相关信息流并自主地对重要事件做出反应。与保持休眠状态直到被明确提示的基于聊天的智能体形成鲜明对比的是, 情境感知智能体旨在“监听”情境信号——连续的数据流,例如电子邮件收件箱、日历订阅源、新闻行情、应用程序日志或传感器数据等。在检测到预定义的触发器或识别出这些信号中的模式后,情境感知智能体基于嵌入式规则、学习行为或实时分析自主地启动操作。特别重要的是,精心设计的情境感知智能体架构会优先考虑用户注意力,仅在绝对必要时才需要人工输入。这种情况发生在智能体识别出需要人类判断的高价值机会、遇到需要人类指导的决策瓶颈或寻求反馈以改进其正在进行的操作之时。
- 情境感知智能体架构代表了人机交互范式的根本转变。它摆脱了通过聊天窗口进行的直接、持续的互动,转向更具共生的关系,在这种关系中,AI 在后台无缝地增强人类能力。这种架构转变对于释放 AI 助手的真正潜力至关重要,将它们从被动式工具转变为主动式、智能的合作伙伴,以更自然、侵入性更小,以及可扩展性更强的方式增强我们的生产力和决策能力。
3.1 架构模式
- 有效的情境感知智能体的设计和实施依赖几种关键的架构模式,这些模式使其能够实现主动和后台操作。
- 事件驱动架构:从根本上说,情境感知智能体是事件驱动的系统,其架构以持续监控事件流为中心。这些事件充当智能体活动的主要触发器,取代了明确的用户命令。这需要强大的机制来订阅、过滤和处理来自不同数据源的相关事件。
- 异步处理与持久化:由于其后台性质和潜在的长时间运行的任务,情境感知智能体严重依赖异步处理。其架构必须支持非阻塞操作,使智能体能够并发处理多个事件并管理长时间运行的工作流,而不会占用资源或中断用户工作流。特别重要的是强大的持久化层,它允许智能体在每个步骤存档点检查其 状态,使其能够暂停执行、等待外部输入(尤其是人工反馈),并从中断的确切位置无缝恢复操作。
- 智能触发与过滤:为了避免持续的通知淹没用户,情境感知智能体架构必须 结合复杂的触发和过滤机制。并非每个事件都需要用户立即采取行动或引起注意。该架构应支持定义智能触发器,以根据预定义的规则、学习模式或智能体本身进行实时分析来识别真正重要的事件。这确保了仅在高价值场景中进行人工交互。
- 模块化和专业化的智能体设计:为了管理复杂性并实现专业化,情境感知智能体架构通常采用模块化设计,将功能分解为专业的智能体组件。这种模块化具有允许集中开发、更轻松地维护,以及在整个系统中组合和协调不同智能体能力的能力。在构建和部署复杂的情境感知智能体时,第2节中讨论的多智能体架构如主管架构和分层架构变得尤为重要。
- 可信赖且有效的情境感知智能体架构的基石是“人机环路”模式的战略集成。 当真正需要人工输入时,该架构必须促进与用户的无缝沟通且上下文丰富。这需要具有完善的界面,用于向用户呈现信息、征求特定反馈,以及使用户能够轻松提供指导、 执行更正或批准。这种模式不仅可以寻求人工干预,而且需要架构智能体,以便在关键时刻有效地利用人类智能。
- 在情境感知智能体架构中,特定的“人机环路”模式控制着智能体与用户交互的 方式和时间。这些模式不仅仅是交互风格,更是定义智能体参与模型的集成架构组件。
- 通知模式:在架构上,通知模式涉及从智能体到用户的单向通信通道。智能体在检测到重要事件后,会生成通知消息并将其推送到用户界面(例如, LangChain 团队开发的 Agent Inbox —— 第10章将详细介绍此应用)。该架构模式专注于高效传递信息丰富的通知,确保及时提醒用户,而不需要用户立即采取行动。
- 询问模式:询问模式引入了请求—响应交互。当智能体遇到需要人工输入的决策点时,其架构会触发“中断”,暂停工作流的执行并为用户生成结构化问题。该架构模式必须管理暂停的工作流状态,向用户清晰地呈现问题(通常通过专用用户界面),然后处理用户的响应以恢复工作流的执行。
- 审查模式:审查模式为高风险操作提供关键的控制机制。在架构上,它在智能体的工作流中插入强制性的人工审查阶段。在执行可能产生重大后果的操作之前,智能体会生成审查请求,向用户呈现建议的操作(及其基本原理)。 该架构模式必须提供清晰的界面,供用户检查、编辑、批准或拒绝建议的操作。 此外,用户决策决定了智能体执行的后续流程。
- 这些“人机环路”模式不仅仅是附加组件,更是基本的架构组件,塑造了智能 体的行为,建立了用户信任,并实现了主动式 AI 系统的实际部署。
3.2 人机环路交互设计
为了在 LangGraph 中有效地实现人机环路交互,尤其是在与 Agent Inbox 等用户界面集成时,设计人机环路的交互对象结构体(例如,HumanInterrupt 和 HumanResponse)至关重要。下面示例中的 Python 代码使用 TypedDict 定义了 HumanInterrupt 和 HumanResponse 结构体。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24from typing import TypedDict, Literal, Optional, Union
# HumanInterruptConfig:定义人工中断操作的配置选项
class HumanInterruptConfig(TypedDict):
allow_ignore: bool # 布尔值,是否允许用户忽略中断
allow_respond: bool # 布尔值,是否允许用户发送自由格式的响应
allow_edit: bool # 布尔值,是否允许用户编辑操作参数
allow_accept: bool # 布尔值,是否允许用户按原样接受操作
# ActionRequest:定义向人类请求的操作
class ActionRequest(TypedDict):
action: str # 字符串,操作的描述性名称或标题
args: dict # 字典,与操作关联的参数(例如,工具调用参数)
# HumanInterrupt:表示人工中断请求的主要模式
class HumanInterrupt(TypedDict):
action_request: ActionRequest # 有关请求操作的详细信息
config: HumanInterruptConfig # 人工响应的配置选项
description: Optional[str] # 可选字符串,中断的详细描述,可以是 Markdown 格式
# HumanResponse:从 Agent Inbox 用户界面接收的人工响应模式
class HumanResponse(TypedDict):
text: Literal['accept', 'ignore', 'response', 'edit'] # 来自用户的响应类型(accept、ignore、response、edit)
args: Union[None, str, ActionRequest] # 与响应关联的参数,根据 type 而变化。其中,accept 和 edit 包含可能已修改的参数;response 包含用户文本响应的 字符串;ignore 表示 None下面的 Python 代码演示了如何在 LangGraph 的工作流中使用 HumanInterrupt 结构体。此示例模拟了一个场景,其中智能体在图函数中需要人工输入来决定是否使用特定的参数调用工具。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71from typing import TypedDict, Literal, Optional, Union
from langchain_core.messages import ToolMessage, HumanMessage, AIMessage
from langgraph. constants import START, END
# 假设 llm 和 tools_by_name 已被定义(如前面的例子所示)
# 定义图状态
class AgentState(MessagesState):
pass # 继承消息状态,用于对话历史记录
# 定义可能触发人工中断的图函数
def agent_node(state: AgentState):
""" 智能体节点,决定是否调用工具或请求人工输入 """
messages = state["messages"]
last_message = messages[-1]
# 假设智能体决定调用工具并具有工具调用详细信息
tool_call_name = "hypothetical_tool"
tool_call_args = {"input_arg": "example_value"}
request: HumanInterrupt = {
"action_request": {
"action": tool_call_name, # 操作名称是工具名称
"args": tool_call_args # 操作参数是工具参数
},
"config": {
"allow_ignore": true, # 允许用户忽略工具调用
"allow_respond": true, # 允许用户提供自由格式的响应
"allow_edit": true, # 允许用户编辑工具参数
"allow_accept": true # 允许用户按原样接受工具调用
},
"description": f"智能体建议使用参数 `{tool_call_args}`调用工具: `{tool_call_name}`。你批准吗? ", # Agent Inbox 用户界面的描述
}
# 调用 interrupt 函数,在列表中传递 HumanInterrupt 请求
response_list = interrupt([request])
response = response_list[0] if response_list else None # 从列表中提取第一个响应
if response:
if response['type'] == "accept":
# 用户接受了工具调用,继续执行工具调用
tool_result = tools_by_name[tool_call_name].invoke(response['args']) # 使用(可能已修改的)参数执行工具
output_message = ToolMessage(content=str(tool_result),tool_call_id=last_message.tool_calls[0].id) # 使用结果创建 ToolMessage
elif response['type'] == "edit":
# 用户编辑了工具调用参数,使用编辑后的参数执行工具
edited_args = response['args']['args'] # 从 ActionRequest 中提取编辑后的参数
tool_result = tools_by_name[tool_call_name].invoke(edited_args) # 使用编辑后的参数执行工具
output_message = ToolMessage(content=str(tool_result), tool_call_id=last_message.tool_calls[0].id) # 使用结果创建 ToolMessage
elif response['type'] == "response":
# 用户提供了文本响应,据此处理
user_response_text = response['args'] # 提取用户的文本响应
output_message = AIMessage(content=f"用户响应:{user_response_text}。根据响应继续进行操作。") # 创建 AIMessage 以确认响应
# 添加逻辑以处理 user_response_text 并确定工作流中的后续步骤
elif response['type'] == "ignore":
# 用户忽略了人工中断,据此处理
output_message = AIMessage(content="人工中断被忽略。继续进行操作,不进行工具调用。") # 创建 AIMessage,指示中断被忽略
# 添加逻辑以继续执行替代工作流路径
else:
output_message = AIMessage(content="未知的人工响应类型。") # 处理意外的响应类型
else:
# 未收到人工响应(例如,中断处理超时或错误)
output_message = AIMessage(content="未收到人工响应,继续进行操作,不进行干预。") # 处理未收到响应的情况
return {"messages": [output_message]} # 返回更新的消息状态
# 构建 LangGraph 工作流
builder = StateGraph(AgentState)
builder.add_node("agent_step", agent_node) # 根据需要为工作流添加边和其他节点
builder.add_edge(START, "agent_step")
builder.add_edge("agent_step", END)
workflow = builder.compile()
# 示例调用(为了简化演示)
inputs = {"messages": [HumanMessage(content="启动工作流并可能触发中断。")]}
result = workflow.invoke(inputs)
print(result) # 输出工作流的最终状态
3.3 用LangGraph实现情境感知智能体架构
LangGraph 专为构建强大复杂的情境感知智能体而设计。LangGraph 具有如下几个特性,特别是将这些特性与 LangGraph 平台(将在下一章中详细介绍)结合使用时, 使其成为构建这类主动式系统的绝佳选择。
- 内置持久层支持:LangGraph 由持久层提供支持,该持久层在每次操作(或图的节点执行完成)之后都保存智能体的状态。这允许智能体基本上“暂停” 执行并等待用户反馈,对于启用人机环路模式和短期对话记忆非常重要。
- 内置人机环路支持:LangGraph 本地支持人机环路模式,内置的持久层是重要组成部分。此外,LangGraph 还添加了 interrupt 函数,这是一种与最终用户通信的新内置方法。
- 内置长期记忆:LangGraph 内置了长期记忆(本质上是命名空间的键值存储,支持语义搜索),这使智能体可以轻松地在人机环路交互后更新其“记忆”。
- 内置 Cron 任务:许多情境感知智能体都按计划运行以检查新事件。 LangGraph 平台内置的 Cron 任务可以支持此功能。
为了介绍情境感知智能体在 LangGraph 中的实际实现,我们考虑一个电子邮件助手的使用场景。电子邮件助手可以在后台持续运行,监控电子邮件收件箱中的新邮件。其在 LangGraph 中构建的工作流大致概括如下。
- (1)事件触发器(Cron 任务):利用 LangGraph 平台创建 Cron 任务来定期触发电子邮件助手工作流,启动检查指定收件箱中的新邮件。
- (2)电子邮件的检索与分析:智能体检索新的、未读的电子邮件,并使用 LLM 分析其内容。此分析旨在了解电子邮件的意图、紧迫性和所需的操作。
- (3)决策与操作分支:根据对电子邮件的分析,智能体采用路由工作流模式对下一步的行动进行分类处理。
- 自动响应:对于需要直接或简单回复的电子邮件(例如,确认、基本信息请求), 智能体使用提示链工作流自主起草电子邮件并发送响应。
- 人机环路“询问”中断:对于需要进行主观决策的电子邮件(例如,会议邀请),智能体使用 interrupt() 函数暂停执行并生成 HumanInterrupt 对象。 此对象遵循 HumanInterrupt 结构体的定义,其参数如下所示。
- action_request:详细说明智能体正在考虑的操作(例如,“决定是否参加会议”)。
- args:提供用于人工决策的相关信息(例如,会议的详细信息、日期、主题)。
- config:指定是否允许人工响应(例如,allow_respond: True、allow_ignore: True)。
- description:提供关于起草电子邮件的背景信息,并在 Agent Inbox 用户界面中以 Markdown 格式呈现,提示用户审查并采取行动。
- 人机环路“审查”中断:在发送可能包含敏感信息的出站电子邮件之前,智能体再次使用 interrupt() 函数,呈现 HumanInterrupt 对象,其包含的参数如下所示。
- action_request:详细说明建议的操作(例如,“发送电子邮件草稿”)。
- args:包括用于审查的电子邮件草稿内容。
- config:设置 allow_edit: True、allow_accept: True、allow_ignore: True, 让用户完全控制编辑、批准或拒绝电子邮件草稿。
- description:提供关于起草电子邮件的背景信息,提示用户审查并采取行动。
- (4)Agent Inbox 用户界面集成:在专用的 Agent Inbox 用户界面中显示由智能体生成的 HumanInterrupt 对象。此界面充当用户审查智能体的请求、提供响应和管理与情境感知智能体交互的中央枢纽。特别重要的是,使用 Agent Inbox 处理从 HumanResponse 对象到 LangGraph 工作流的格式化和传输。
- (5)工作流的恢复与响应处理:当用户与Agent Inbox 交互并提供 HumanResponse(例如,接受会议邀请、编辑并批准电子邮件草稿、提供对问题的文本响应)时,此响应将被传输回等待的 LangGraph 工作流。然后,interrupt 函数返回 HumanResponse 对象,允许智能体根据用户的输入恢复执行。例如,如果用户批准出站电子邮件草稿,则工作流将继续发送电子邮件;如果用户提供对问题的文本响应, 则工作流将使用此响应来起草相关的电子邮件回复。
- (6)长期学习与记忆更新:通过人机环路交互收集的用户反馈可用于更新智能体的长期记忆。例如,如果用户持续拒绝与特定主题相关的会议邀请,则可以将此偏好存储在智能体的记忆中,以便为未来的自动化决策提供信息,并减少不必要的人工中断。
这个电子邮件助手示例展示了 LangGraph 如何与 Agent Inbox 等专用用户界面结合,构建强大的情境感知智能体。这些智能体在后台主动运行,智能管理人工交互, 并随着时间的推移不断学习和改进。后面将进一步探讨 Agent Inbox 应用的实际能力,以及其在促进与情境感知智能体进行有效的人机环路交互中的作用。
情境感知智能体架构标志着 AI 应用程序设计的重大演进:它突破了被动式聊天界面的局限,转向主动式后台助手,以无缝可扩展的方式增强人类能力。这类智能体通过监听情境信号,并与人际环路模式进行战略性结合,有望大幅降低交互成本、提 升人类生产力,同时充分释放 LLM 的潜力。
LangGraph 凭借内置的持久层支持、人际环路支持、长期记忆及 Cron 任务功能, 为构建和部署这类变革性的情境感知智能体系统提供了理想平台。随着对情境感知智能体架构的持续探索与优化,我们正逐步迈向 AI 与日常工作流无缝融合的未来 —— 这将进一步增强人类能力,让我们得以专注于更高级别的战略任务。基于聊天的智能体和情境感知智能体的比较如表所示。
特征 基于聊天的智能体 情境感知智能体 启动 用户启动 事件驱动(后台、主动式) 交互模型 对话式,回合制 后台操作,通过人机环路进行关键决策 可扩展性 受限于单线程对话 高度可扩展,并发操作 主动性 被动式(响应提示) 主动式(监控事件、启动操作) 延迟容忍度 低(预期实时响应) 高(后台处理,用户对延迟不太敏感) 人类角色 积极参与者,持续互动 主管,在关键时刻提供指导 架构重点 交互式用户体验、对话流程 主动式后台操作、事件驱动、人机环路 用例 问答、创意任务、交互式工具 自动化、监控、后台协助、主动发出警报

