LangGraph的核心API
1、create_react_agent
- LangGraph 作为 AI 智能体开发框架,在灵活性和功能性方面展现出独特优势。 该框架不仅支持高度定制化的智能体架构开发,同时为快速原型设计和开发者入门提 供了便捷工具。其中,create_react_agent 函数作为预构建组件,实现了 ReAct(推理— 行动)模式的快速部署,显著降低了开发门槛。本节将演示 create_react_agent 的核心功能、自定义选项,以及如何在更广泛的 LangChain 生态系统中无缝集成。
1.1 create_react_agent的核心功能和参数
LangGraph 框架中的 create_react_agent 函数为开发者提供了快速构建 ReAct 智能体的标准化方案。该函数通过自动化生成底层图结构,显著简化了智能体开发流程。 其主要参数配置如下所述。
基本参数有两个。
- model:指定语言模型,通常是使用 LangChain 定义的 ChatModel 对话模型对象。建议根据性能需求和成本预算选择合适的 LLM。
- tools:定义 LangChain 工具列表,包括基础工具(计算器 / 搜索引擎)、 复杂集成工具(数据库 /API 连接),确定智能体可执行的操作。
除了这些核心参数,create_react_agent 还提供了几个可选参数,可以实现进一步的自定义。
- prompt:定义系统提示,默认为优化过的 ReAct 专用提示,也可以自定义以调整响应风格、设定特定角色、注入领域知识。
- response_format:定义输出格式,包括 Pydantic 模型,能自动校验输出格式, 适合需要严格数据结构的集成的应用场景。
- checkpointer:提供状态持久化方案,以维护对话上下文,实现跨会话记忆、 保持连贯对话。
- interrupt_before/interrupt_after:提供人机协作控制点,通过指定节点名称列表,开发者可以暂停执行,以进行安全审查、质量校验或修改。
以下是一个基础实现示例(天气查询智能体)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# 初始化语言模型
model = ChatOpenAI(model="gpt-4-turbo", temperature=0)
# 定义一个简单的工具来获取天气信息
def get_weather(city: str):
"""Use this tool to get the weather information for a city."""
if city.lower() == "nyc":
return "It might be cloudy in New York."
elif city.lower() == "san francisco":
return "It's always sunny in San Francisco."
else:
return "Unable to get the weather information for this city."
tools = [get_weather]
# 创建ReAct智能体
graph = create_react_agent(model, tools=tools)
# 调用智能体
inputs = {"messages": [("user", "How's the weather in San Francisco?")]}
response = graph.invoke(inputs)
print(response['messages'][-1].content)
# The weather in San Francisco is currently sunny.- 此示例演示了使用 create_react_agent 构建功能性 ReAct 智能体的最小可行配置。 开发者只需提供语言模型和工具列表就可以快速创建一个天气查询智能体。
在后续章节中,我们将深入探讨 create_react_agent 提供的自定义选项,例如, prompt、checkpointer、interrupt_before 和 response_format 等参数以构建更专业的 AI 智能体系统。
1.2 自定义选项
- create_react_agent 不仅提供了开箱即用的 ReAct 智能体构建能力,更通过多层次的可定制性满足不同场景需求。该函数支持从基础行为到高级功能的全面定制,主要包含以下维度。
1.2.1 自定义系统提示
create_react_agent 的 prompt 参数允许开发者覆盖默认的系统提示,并注入自定义指令,以指导智能体的行为。这是一种基本的自定义技术,可用于塑造智能体的个性、响应风格和整体表现。可以指示智能体以特定语言回复,采用特定语调(如专业、 幽默或简洁),或为其提供额外的背景信息或约束条件。假设希望 ReAct 智能体专门以中文回复,则可通过向 prompt 参数提供“请以中文回复”的系统提示来轻松实现此目标。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
# 初始化语言模型
model = ChatOpenAI(model="gpt-4-turbo", temperature=0)
# 定义一个简单的工具来获取天气信息
def get_weather(city: str):
"""Use this tool to get the weather information for a city."""
if city.lower() == "new york":
return "It might be cloudy in New York."
elif city.lower() == "san francisco":
return "It's always sunny in San Francisco."
else:
return "Unable to get the weather information for this city."
tools = [get_weather]
# 定义用于中文回复的自定义系统提示
chinese_prompt = "Respond in Chinese"
# 使用自定义提示创建ReAct智能体
graph_chinese = create_react_agent(model, tools=tools, prompt=chinese_prompt)
# 示例调用
inputs = {"messages": [("user", "How's the weather in NYC?")]}
response_chinese = graph_chinese.invoke(inputs)
print(response_chinese['messages'][-1].content)
# 纽约的天气可能是多云的。- 通过设置 prompt=chinese_prompt,所有响应都将以中文生成。这证明了此参数对智能体输出的直接控制。
1.2.2 添加对话记忆
对于需要在多轮对话中维护上下文的应用程序,整合记忆功能至关重要。create_react_agent 通过 checkpointer 参数简化了这一过程:开发者只需提供一个 LangGraph Checkpointer 实例,即可让智能体跨由唯一 thread_id 标识的对话线程持久化状态(包括消息历史记录)。LangGraph 提供了多种存档点器实现,例如适用于开发和测试场景的内存存储型 MemorySaver,以及适用于生产环境的 SQLiteSaver。
以下示例演示了如何向 ReAct 智能体添加内存聊天记忆功能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
# 初始化语言模型
model = ChatOpenAI(model="gpt-4-turbo", temperature=0)
# 定义一个简单的工具来获取天气信息
def get_weather(city: str):
"""Use this tool to get the weather information for a city."""
if city.lower() == "new york":
return "It might be cloudy in New York."
elif city.lower() == "san francisco":
return "It's always sunny in San Francisco."
else:
return "Unable to get the weather information for this city."
tools = [get_weather]
# 初始化内存存档点
memory = MemorySaver()
# 创建具有记忆功能的 ReAct 智能体
graph_with_memory = create_react_agent(model, tools=tools, checkpointer=memory)
# 首次交互
config = {"configurable": {"thread_id": "user_thread_1"}} # 唯一线程 ID
inputs_1 = {"messages": [("user", "How's the weather in sf?")]}
response_1 = graph_with_memory.invoke(inputs_1, config=config)
print(response_1['messages'][-1].content)
# The weather in San Francisco is currently sunny. Enjoy the beautiful day!
# 同一线程中的第二次交互:智能体记住上下文
inputs_2 = {"messages": [("user", "How is chicago?")]}
response_2 = graph_with_memory.invoke(inputs_2, config=config)
print(response_2['messages'][-1].content)
# I'm unable to retrieve the weather information for Chicago at the moment. Please try again later.- 在此示例中,MemorySaver() 用于创建基于内存的存档点。通过将此存档点传递 给 checkpointer 参数,并在同一对话的每次交互中通过 config 字典提供一致的 thread_ id,AI 智能体不仅能保留对话历史记录,而且可以根据上下文对后续问题做出响应, 如关于另一个城市的第二次交互所示。选择合适的 Checkpointer 实现对于生产应用程序至关重要,以确保 LangGraph 智能体的记忆管理既可靠又可扩展。
1.2.3 整合人机环路工作流
对于需要人工监督的应用场景,create_react_agent 通过 interrupt_before 参数支持人机环路工作流。该参数允许开发者指定需要暂停执行的节点列表(在 ReAct 智能体中通常设为 [“tools”]),从而在关键操作前设置断点,实现人工审查和操作修改。
要注意的是,这必须配合 Checkpointer 使用,以确保中断期间状态持久化。以下示例演示了如何启用在工具调用之前暂停的人机环路工作流。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from langgraph.checkpoint.memory import MemorySaver
# 初始化语言模型
model = ChatOpenAI(model="gpt-4-turbo", temperature=0)
# 定义一个简单的工具来获取天气信息
def get_weather(city: str):
"""Use this tool to get the weather information for a city."""
print("city:", city)
if city.lower() == "new york":
return "It might be cloudy in New York."
elif city.lower() == "san francisco":
return "It's always sunny in San Francisco."
else:
return "Unable to get the weather information for this city."
tools = [get_weather]
# 初始化内存存档点
memory = MemorySaver()
# 创建启用人机环路的ReAct智能体,在工具之前中断
graph_hitl = create_react_agent(model, tools=tools, interrupt_before=["tools"], checkpointer=memory)
# 首次交互:智能体将在工具调用之前暂停
config_hitl = {"configurable": {"thread_id": "user_thread_hitl"}}
inputs_hitl = {"messages": [("user", "How's the weather in sf?")]}
stream = graph_hitl.stream(inputs_hitl, config=config_hitl, stream_mode="values")
for output in stream:
print(output) # 打印流输出以观察智能体的状态
# 此时,智能体已暂停。人工可以使用graph_hitl.get_state(config_hitl)检查状态, # 并可以在继续之前修改工具调用,使用 graph_hitl.stream(None, config_hitl, stream_mode="values")
# {'messages': [HumanMessage(content="How's the weather in sf?", additional_kwargs={}, response_metadata={}, id='486b71ed-413a-4101-95d5-9d541b806b88')]}
# {'messages': [HumanMessage(content="How's the weather in sf?", additional_kwargs={}, response_metadata={}, id='486b71ed-413a-4101-95d5-9d541b806b88'), AIMessage(content='', additional_kwargs={'refusal': None}, response_metadata={'token_usage': {'completion_tokens': 15, 'prompt_tokens': 58, 'total_tokens': 73, 'completion_tokens_details': None, 'prompt_tokens_details': None}, 'model_provider': 'openai', 'model_name': 'gpt-4-turbo-2024-04-09', 'system_fingerprint': 'fp_5603ee5e2e', 'id': 'chatcmpl-CZupF2PNwDj4GbpedPuhxATNH044j', 'finish_reason': 'tool_calls', 'logprobs': None}, id='lc_run--7f450710-b619-4c55-8e4e-587b6a320f02-0', tool_calls=[{'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_0qsLAefauM00ZtAVfjdcEzMX', 'type': 'tool_call'}], usage_metadata={'input_tokens': 58, 'output_tokens': 15, 'total_tokens': 73, 'input_token_details': {}, 'output_token_details': {}})]}- 在此示例中,interrupt_before=[“tools”] 参数配置使智能体在执行任何工具调用前自动暂停。当第一次交互完成后,系统将进入等待状态,此时人工操作员可以审查智能体状态,重点检查拟执行工具调用的合理性,执行必要干预,例如,修改工具调用参数,完全阻止工具执行,确认后继续智能体运行。
通过该机制,开发者可在保持 AI 智能体自动化优势的同时,嵌入必要的人工监督环节,实现人机协同的最佳平衡。
1.2.4 返回结构化输出
在许多应用场景中,需要智能体以结构化的格式返回响应,以便程序化解析和处理输出。create_react_agent 通过 response_format 参数简化了结构化输出的生成过程。 开发者只需提供 Pydantic 模型作为此参数的值,即可指示 AI 智能体在 ReAct 循环结束时进行额外的 LLM 调用,将最终响应根据指定模式格式化。这确保了输出符合预定义的结构,有助于与其他需要结构化数据的系统或应用程序无缝集成。以下示例演示了如何配置 create_react_agent ,使其以 WeatherResponse 格式返回天气信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent
from pydantic import BaseModel, Field
# 初始化语言模型
model = ChatOpenAI(model="gpt-4-turbo", temperature=0)
# 定义一个简单的工具来获取天气信息
def get_weather(city: str):
"""Use this tool to get the weather information for a city."""
if city.lower() == "new york":
return "It might be cloudy in New York."
elif city.lower() == "san francisco":
return "It's always sunny in San Francisco."
else:
return "Unable to get the weather information for this city."
tools = [get_weather]
# 由于硅基流动平台的部分模型暂不支持原生结构化输出,本例推荐使用 OpenAI 原生 模型进行测试
# 定义结构化输出的Pydantic模型
class WeatherResponse(BaseModel):
"""Respond with a weather description."""
conditions: str = Field(description="weather conditions")
# 创建启用结构化输出的ReAct智能体
graph_structured = create_react_agent(model, tools=tools, response_format=WeatherResponse)
# 示例调用
inputs_structured = {"messages": [("user", "How's the weather in sf?")]}
response_structured = graph_structured.invoke(inputs_structured)
print(response_structured["structured_response"]) # 访问结构化响应
# conditions='sunny'- 通过设置 response_format=WeatherResponse,智能体将以 WeatherResponse 对象的形式返回最终答案,该对象可以通过输出字典中的 structured_response 键访问。其中,conditions 字段将包含智能体提取并根据模式格式化的天气信息。这种结构化输出机制不仅可以确保输出符合预定义格式,便于系统间数据交互,可直接对接 API, 无缝接入数据库系统,兼容数据分析管道,还可以通过 (prompt, schema) 元组配置自定义格式化提示,实现更精细的输出控制。
1.2.5 启用长期记忆功能
- 虽然通过 checkpointer 参数实现的对话记忆使智能体能够在单个交互线程中维护上下文,但真正的长期记忆涉及跨多个会话和长时间持续存储与检索信息的能力。 create_react_agent 本身并不直接提供长期语义记忆功能,但其提供了必要的基础和可扩展性,可以将长期记忆功能集成到 ReAct 智能体中。
- 实现长期记忆通常需要借助外部存储解决方案,例如向量数据库或知识图谱,以存储并有效检索与智能体的任务和用户交互相关的信息。其中,基于向量嵌入的语义搜索是使智能体能够根据含义和上下文(而非简单的关键字匹配)检索相关记忆的关键技术。
- 基于 create_react_agent 的智能体添加长期记忆功能的步骤如下所示。
- 选择持久化记忆存储:选择一个合适的记忆存储(例如 InMemoryStore、PostgresStore)来存储智能体记忆。
- 创建记忆管理工具:存储记忆工具向长期记忆添加新信息;检索记忆工具基于语义相似性查询记忆。
- 将记忆工具加入 create_react_agent 的 tools 列表中。
- 修改智能体的提示(可选):指导 AI 智能体使用记忆工具的时机和方式。
- 可选记忆注入提示:自动检索相关记忆,并将其注入 LLM 的上下文。
- create_react_agent 灵活的架构和工具集成功能使其成为构建具有长期记忆的智能体的理想起点。通过将 create_react_agent 与自定义构建的记忆管理工具和持久化存储解决方案相结合,可以创建具有持续学习能力的 ReAct 智能体,实现更个性化、上 下文感知和有效的 AI 助手。
1.3 create_react_agent的应用
create_react_agent 不仅是一个独立工具,更是深度集成于 LangChain 生态系统的重要组件。它与 LangChain 其他模块的协同工作体现在以下方面。
- 直接支持 ChatOpenAI 等 LangChain 对话模型对象,兼容 LangChain 工具库, 既可使用现成工具,也支持自定义工具开发。
- 具有开发体验优势,LangChain 开发者可快速上手,沿用熟悉的开发模式, 保持 LangChain 的灵活度。
- 原生集成 LangSmith 平台,可实现执行流程可视化,中间步骤检查,进行性能监控与优化,问题诊断与调试。
create_react_agent 封装 ReAct 核心逻辑,简化图构建过程,进行快速原型开发, 还支持对话记忆,实现长期记忆,提供人机环路、结构化输出,适合新手入门学习, 构建从简单信息检索到复杂交互等各种 AI 智能体系统。
该组件作为连接 LangGraph 执行引擎与 LangChain 生态的桥梁,既保留了底层框架的强大能力,又提供了符合开发者习惯的接口规范,形成了完整的智能体开发生态。
create_react_agent 的参数总结,如表所示。
model langchain.chat_models.base.BaseChatModel 用于智能体推理和决策的 LangChain 聊天模型 必需 tools List[langchain_core.tools.BaseTool] 智能体可以使用的 LangChain 工具列表,用于与外部世界交 互或访问特定信息 必需 prompt Union[str, Callable] 自定义系统提示,用于指导 AI 智能体的行为和角色。可 以是字符串或接受状态并返回提示的可调用对象 可选 response_format Union[BaseModel, Tuple[str, BaseModel]] 结构化输出的格式。可以是 Pydantic 模型,也可以是包含自定义提示和 Pydantic 模型的元组,用于最终的结构化输出生成步骤 可选 checkpointer langgraph.checkpoint.base.Checkpointer LangGraph 存档点实例,用于在对话线程中实现状态持久化和对话记忆 可选 interrupt_before/interrupt_ after List[str] 节点名称列表,指定 AI 智能体执行应暂停的节点,以实现人机环路工作流。通常设置为 [“tools”] ,即在 ReAct 智能体工具调用前暂停 可选 store langgraph.store.base.BaseStore 内存存储实例,用于集成长期记忆功能。通过 prompt 参数或工具注入。 请注意, store 参数本身不是 create_react_agent 的直接参数, 而是通过其他可定制参数集成 可选
2、Functional API
- 在深入探讨 Functional API 之前,为了更好地理解其设计理念和优势,我们有必 要先回顾一下 LangGraph 的 Graph API。正如本书之前介绍的那样,Graph API 是一种基于图的编程范例,允许开发者通过显式定义节点和边来构建复杂的 AI 工作流。 StateGraph 作为 Graph API 的核心组件,提供了用于编排节点执行流程并管理状态转换的框架。StateGraph 支持创建有向无环图,其中节点代表工作流中的各步骤(例如, LLM 调用、工具使用、数据处理),而边则定义了节点间的依赖关系和执行顺序。 Graph API 的强大之处在于其灵活性和对复杂工作流的精细控制能力,非常适合构建具有复杂分支、并行处理和状态管理的 AI 智能体系统。
- 然而,LangChain 团队认识到,并非所有 AI 应用程序都适合图编程模型。许多开发者更习惯传统命令式编程范式。为了弥合这一差距并扩大 LangGraph 的适用范围,LangChain 推出了 Functional API。
2.1 Functional API的优势
- Functional API 的设计初衷源于对开发者体验的深入思考。虽然 Graph API 在构建复杂智能体架构方面具有无可比拟的灵活性和控制力,但其图编程范式对于习惯命令式或函数式编程的开发者来说存在较高的入门门槛。Functional API 正是为了满足这类开发者的需求而诞生的,它提供了更加平缓的 LangGraph 学习路径。
- Functional API 的主要优势包括以下五个方面。
- 降低学习门槛。
- 为不熟悉图编程的开发者提供更友好的入门方式。
- 通过抽象图定义细节,让开发者专注业务逻辑。
- 使用熟悉的 Python 函数和控制流构建工作流。
- 显著减轻认知负担,加快应用开发速度。
- 渐进式集成。
- 支持在现有 Python 代码库中逐步引入 LangGraph 功能。
- 无须大规模重构为图结构。
- 通过装饰器增强现有函数,添加持久化等特性。
- 提升开发效率。
- 简化工作流构建和迭代流程。
- 特别优化线性流程的开发体验。
- 减少样板代码,加速原型设计。
- 支持快速实验和优化 AI 应用。
- 范式互补性。
- 与 Graph API 形成互补关系而非替代关系。
- 共享相同的底层运行时引擎。
- 支持在单一应用中混合使用两种范式。
- 复杂组件仍可使用 Graph API。
- 简单工作流适合 Functional API。
- 为不同场景提供最佳工具选择。
- 核心功能民主化。
- 确保人机协作机制、状态持久化、记忆管理和流式处理等关键功能对各类开 发者开放。
- 降低高级功能的使用门槛。
- 促进更广泛 AI 应用的智能化升级。
- 降低学习门槛。
- Functional API 从根本上扩展了 LangGraph 的适用边界,使其强大能力能够服务更广泛的开发者群体,同时保持对编程范式选择的灵活性,让开发者可以根据项目需求和个人专长选择最适合的开发方式。
2.2 核心组件:@entrypoint和@task
- Functional API 的构建基于两个基本装饰器:@entrypoint 和 @task。这些装饰器以函数为中心的方式定义和编排工作流的构建块。
2.2.1 @entrypoint:定义工作流边界和入口点
@entrypoint 装饰器用于将 Python 函数标记为 LangGraph 工作流的执行入口点。 负责封装工作流的整体逻辑并管理其执行流程,包括处理长时间运行的任务、支持持 久化和人机环路中断。
当函数被 @entrypoint 装饰后,LangGraph 会将其转换为一个 Pregel 对象实例, 该实例提供以下方法,用于实时获取处理进度、基于存档点恢复工作流,以及持久化支持。
- invoke():同步执行工作流。
- ainvoke():异步执行工作流。
- stream():同步流式处理。
- astream():异步流式处理。
@entrypoint 装饰器通过 checkpointer 参数提供自动状态保存 / 恢复、工作流中断续接等功能。其输入 / 输出必须支持 JSON 可序列化,以确保状态持久化和可靠性。可注入参数(仅关键字参数)如下所示。
- previous:Any = None:访问前次执行状态。
- store:BaseStore,长期记忆存储接口。
- writer:StreamWriter,自定义数据流写入。
- config:RunnableConfig,运行时配置访问。
典型定义示例如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore, BaseStore
from langgraph.types import StreamWriter
from langchain_core.runnables import RunnableConfig
from typing import Any
store = InMemoryStore()
checkpointer = MemorySaver()
def my_workflow(
user_input: dict, # 输入参数
*,
previous: Any = None, # 用于先前状态的可注入参数
store: BaseStore, # 用于长期内存存储的可注入参数
writer: StreamWriter, # 用于自定义流写入器的可注入参数
config: RunnableConfig # 用于运行时配置的可注入参数
) -> str:
""" 一个复杂的演示入口点参数的工作流 """
api_key_from_config = config["configurable"].get("api_version")
# 访问 kwargs
writer(f"工作流 '{config["metadata"]["thread_id"]}' 以 API 版本启动:{api_key_from_config}") # 使用注入参数的工作流逻辑
return f"工作流处理的输入: {user_input}"
# 示例调用
config = {"configurable": {"thread_id": "complex_workflow_1"}}
result = my_workflow.invoke({"message": "Hello"}, config)
print(result)
# 工作流处理的输入: {'message': 'Hello'}def my_workflow(…) → str:定义了工作流函数 my_workflow,接受 user_input: dict 作为输入,并使用类型提示 → str 声明返回值为字符串类型。
previous:Any = None。
- 功能:访问工作流先前状态。
- 用途:实现短期记忆功能。
store:BaseStore。
- 功能:长期内存存储接口。
- 用途:持久化数据存取。
writer:StreamWriter。
- 功能:自定义数据流写入。
- 用途:实时数据流传输。
config:RunnableConfig。
- 功能:运行时配置访问。
- 用途:获取执行环境参数。
配置参数访问示例:api_key_from_config = config[“configurable”].get(“api_version”) 通过 config[“configurable”] 字典获取 @entrypoint 装饰器传递的自定义参数。
工作流配置示例:config = {“configurable”: {“thread_id”: “complex_workflow_1”}} 指定工作流实例标识。对于有状态工作流(使用 checkpointer 时),thread_id 用于隔离不同实例的状态。
该示例展示了 @entrypoint 装饰器的参数使用方式,以及如何在装饰函数中访问各类可注入参数,充分体现了 Functional API 通过简洁的装饰器语法实现高级工作流功能的能力。
2.2.2 @task:封装工作单元
@task 装饰器用于定义 LangGraph 工作流中的各工作单元。它的调用返回结果是一个 Python 的 Future 对象。
@task 的详细参数如下。
- name:(Optional[str])。
- 描述:标识任务。主要用于 LangSmith 中的日志记录和跟踪,有助于在执行跟踪中识别任务并监视工作流进度。
- 默认值:装饰函数的名称。
- 示例:name=”fetch_weather_data”。
- retry:(Optional[RetryPolicy])。
- 描述:故障重试策略定义。例如,最大重试次数和重试条件(例如,特定异常类型)。
- 默认值:None。默认情况下不执行自动重试。
- 示例:retry=RetryPolicy(max_attempts=3, retry_on=ValueError),在 ValueError 异常时最多重试 3 次。
- kwargs:(Any)。
- 描述:预留扩展参数并保留字段,无实际功能。
- 默认值:None。
- name:(Optional[str])。
@task 装饰函数的函数签名要求如下。
- 第一个参数(输入):必须接受至少一个位置参数作为任务输入数据。
- 不支持注入参数:如 previous、store、writer 或 config,在入口点工作流的上下文中运行,并接收来自入口点的输入数据。
- JSON 可序列化输出:@task 函数的返回值必须是 JSON 可序列化的,以确保存档点正常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27from langgraph.func import task
from langgraph.types import RetryPolicy
retry_policy = RetryPolicy(max_attempts=2, retry_on=TimeoutError)
def fetch_api_data(api_endpoint: str) -> dict:
"""从带有重试策略的 API 端点获取数据的任务"""
import time, random
time.sleep(random.random()) # 模拟网络延迟
if random.random() < 0.3: # 模拟 30% 的超时概率
raise TimeoutError("API 请求超时")
return {"status": "success", "data": f" 来自 {api_endpoint} 的数据"}
# 在入口点内调用任务的示例
def data_processing_workflow(endpoint_url: str) -> dict:
"""调用 fetch_api_data 任务的工作流"""
api_result = fetch_api_data(api_endpoint=endpoint_url).result()
return {"workflow_result": "数据已处理", "api_response": api_result}
config = {"configurable": {"thread_id": "task_params_workflow_1"}}
result = data_processing_workflow.invoke("https://api.example.com/data", config)
print(result)
# {'workflow_result': '数据已处理', 'api_response': {'status': 'success', 'data': ' 来自 https://api.example.com/data 的数据'}}- retry_policy = RetryPolicy(max_attempts=2, retry_on=TimeoutError) 创建了一个 RetryPolicy 实例,配置为在发生 TimeoutError 时最多重试 2 次。
- @task(name=’’api_data_fetcher’’, retry=retry_policy) 使用 @task 装饰器定义任务。其中,name=”api_data_fetcher” 为任务指定了一个名称,便于在 LangSmith 跟踪中标识此任务。
- retry=retry_policy 将之前定义的 retry_policy 应用于此任务。这意味着如果 fetch_api_data 函数抛出 TimeoutError,LangGraph 运行时将自动重试该任务最多两次。
- def fetch_api_data(api_endpoint: str) → dict 定义了实际的任务函数 fetch_api_ data。它接受 api_endpoint: str 作为输入,并使用类型提示→ dict 注释了返回值类型为字典。
- time.sleep(random.random()) 和 if random.random() < 0.3: raise TimeoutError(“API 请求超时”) 用于模拟网络延迟和 API 请求可能超时的场景,以便演示重试策略的效果。在实际应用中,fetch_api_data 函数将执行真正的 API 调用。
- @entrypoint(checkpointer=MemorySaver()) def data_processing_ workflow(endpoint_url: str) → dict 定义了一个名为 data_processing_workflow 的入口点,并配置了 MemorySaver 存档点。此入口点函数演示了如何在工作流中调用 @task 函数。
- api_result = fetch_api_data(api_endpoint=endpoint_url).result() 在 data_processing_ workflow 入口点内调用 fetch_api_data 任务。.result() 方法用于同步等待任务完成并获取其结果。如果 fetch_api_data 任务失败(抛出 TimeoutError),那么 LangGraph 运行时将根据配置的 retry_policy 自动重试该任务。
此示例说明了如何定义具有自定义名称和重试策略的 @task,展示了 @task 装饰器提供的参数选项,以及如何在工作流中使用重试策略来提高应用程序的鲁棒性。
2.3 使用Functional API构建和执行工作流
- 使用 Functional API 构建工作流涉及定义 @entrypoint 和 @task 函数,然后在@entrypoint 函数中编排它们的执行。在 @entrypoint 函数中使用标准 Python 构造的控制流隐式地定义了工作流的结构。
2.3.1 定义工作流逻辑
工作流逻辑主要在 @entrypoint 装饰函数的正文中定义。此函数编排 @task 函数 的执行,并使用标准 Python 控制流(例如,if、for、while 语句)来确定工作流的路径。
在 @entrypoint 函数中,我们可以完成以下任务。
- 调用 @task 函数:调用 @task 函数以执行各工作单元。请记住,调用 @task 函数会立即返回类似 Future 的对象,而不是结果本身。
- 检索任务结果:要获取 @task 的结果,请使用 .result() 方法进行同步检索, 或使用 await 进行异步检索。使用 .result() 时,工作流执行将阻塞,直到任务 完成并且其结果可用。
- 实现控制流:使用 Python 的条件语句(if、elif、else)和循环(for、while) 基于工作流输入、任务结果或其他条件来控制任务执行的顺序。这定义了工作流的动态路径。
- 处理人机环路:在 @task 函数中加入 interrupt() 函数,以暂停工作流执行并在工作流中的特定点请求人工输入。
- 管理状态(短期记忆):利用 previous 可注入参数和 entrypoint.final() 来管理短期记忆,以及同一线程工作流调用的状态持久化。
- 与长期记忆交互:store 可注入参数 , 用于持久化内存存储交互,以进行长期内存操作,例如存储用户首选项或检索历史数据。
- 流式传输自定义数据:使用 writer 可注入参数从工作流中流式传输自定义数据事件,从而实现实时监控和用户反馈。
- 返回最终结果:@entrypoint 函数应返回工作流的最终结果。此输出将在工作流执行完成后可供调用方使用。或者,使用 entrypoint.final() 将返回值与保存在存档点中以供下次调用的值分离。
为了说明 @entrypoint 中的工作流逻辑,请参见以下示例,其中包含了控制流和任务调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
def is_even(number: int) -> bool:
""" 检查数字是否为偶数的任务 """
return number % 2 == 0
def multiply_by_two(number: int) -> int:
""" 将数字乘以 2 的任务 """
return number * 2
def number_workflow(input_number: int) -> int:
""" 根据数字是奇数还是偶数处理数字的工作流 """
if is_even(input_number).result():
result = multiply_by_two(input_number).result()
return result
else:
return input_number # 若为奇数,则返回原始数字
config = {"configurable": {"thread_id": "workflow_3"}}
print(number_workflow.invoke(4, config)) # 输出:8(偶数,已相乘)
print(number_workflow.invoke(5, config)) # 输出:5(奇数,直接返回)- @task def is_even(number: int) → bool 和 @task def multiply_by_two(number: int) → int 定义了两个简单的 @task 函数,分别用于检查数字是否为偶数和将数字乘以 2。
- @entrypoint(checkpointer=MemorySaver()) def number_workflow(input_number: int) → int 定义了名为 number_workflow 的入口点,并配置了 MemorySaver 存档点。
- if is_even(input_number).result() 和 result = multiply_by_two(input_number).result() 在 number_workflow 入口点内调用 is_even 任务并使用 .result() 方法同步等待其结果。基于 is_even 任务的布尔结果,工作流逻辑会分支。
此示例演示了如何在 Functional API 工作流中使用控制流(if/else 语句)和任务 调用来构建条件逻辑。工作流的行为会随 is_even 任务的结果而动态变化。这展示了 Functional API 的灵活性。
2.3.2 执行工作流
- 定义 @entrypoint 函数后,可以使用 invoke、ainvoke、stream 和 astream 方法执行它,类似于 LangChain Runnable 对象的执行方式。
- invoke(input, config=None):同步执行工作流,阻塞直到完成并返回最终结果。
- ainvoke(input, config=None):异步执行工作流,返回可等待以获取最终结果的协程。
- stream(input, config=None, stream_mode=[‘’updates’’]):执行工作流并按发生 顺序流式传输更新。stream_mode 参数指定要流式传输的数据类型(例如, updates 表示工作流进度,messages 表示 LLM 词元,custom 表示自定义数据事件)。返回生成器,该生成器产生流式数据块。
- astream(input, config=None, stream_mode=[“updates”]):stream 的异步版本, 返回异步生成器。
1 | config = {"configurable": {"thread_id": "workflow_4"}} |
- 此代码段演示了如何使用 .invoke() 方法以同步方式执行 Functional API 工作流。 同步执行适用于需要立即获得工作流结果的场景,例如,在命令行界面或脚本中运行 工作流。执行工作流时,通常需要提供 config 字典。config 字典对于指定 thread_id 至关重要。因为 LangGraph 使用 thread_id 来管理不同工作流实例的持久化和状态。 对于具有长期内存的工作流,config 字典还可以用于传递特定于用户的标识符或其他上下文信息。
- config = {‘’configurable’’: {‘’thread_id’’: ‘’workflow_4’’}}:配置对象,指定工作流执行的 thread_id 为 ‘’workflow_4’’。
- result = my_workflow.invoke(“Synchronous Input”, config):使用 .invoke() 方法 同步调用 my_workflow 入口点。’’Synchronous Input’’ 字符串作为输入传递给工作流。执行将阻塞,直到工作流完成执行并返回最终结果。
1 | config = {"configurable": {"thread_id": "workflow_5"}} |
此代码段演示了如何使用 .stream() 方法流式执行 Functional API 工作流。流式执 行适用于需要向用户提供实时反馈或处理长时间运行工作流的场景。通过订阅不同的流(例如,updates messages 和 custom),可以接收不同类型的流式数据。
- config = {‘’configurable’’: {‘’thread_id’’: ‘’workflow_5’’}} 配置对象,指定工作流执行的 thread_id 为 workflow_5。
- for chunk in my_workflow.stream(‘’Streaming Input’’, config, stream_ mode=[‘’updates”]) 使用 .stream() 方法以流式方式执行 my_workflow 入口点。 Streaming Input 字符串作为输入传递给工作流。
- stream_mode=[“updates”] 指定只流式传输 updates 流。updates 流通常包含有 关工作流执行进度的信息,例如,已完成的任务和中间结果。
对于人机环路工作流,在 interrupt() 之后恢复是通过再次调用 stream、invoke 或 ainvoke 方法来实现的,但需要同时提供 Command 对象作为输入。Command 对象封装了 resume 值,该值是从人工收集的数据,并且是继续从中断点开始的工作流执行 所需的数据。可以使用 None 输入和相同的 thread_id 调用入口点在错误发生后尝试恢复工作流。
1
2
3
4
5
6
7
8from langgraph.types import Command
config = {"configurable": {"thread_id": "workflow_6"}}
# 假设工作流在human_feedback任务中中断
# 通过人工输入的内容来恢复工作流
resume_command = Command(resume="Human provided input")
for chunk in graph.stream(resume_command, config): # 假设 graph 是具有中断的工作流
print(f" 恢复流式块 : {chunk}")此代码段演示了如何使用 Command 对象和 .stream() 方法恢复 Functional API 工作流的执行,该工作流之前已在人机环路场景中被 interrupt() 函数中断。恢复机制允许工作流在中断后从断点处继续执行,并用了人工提供的输入或其他恢复数据。
- config = {‘’configurable’’: {‘’thread_id’’: ‘’workflow_6’’}} 配置对象,指定工作流 恢复执行的 thread_id。重要的是,线程 ID 与中断的工作流实例的线程 ID 相同。
- resume_command = Command(resume=’’Human provided input’’) 创建了一个 Command 对象,用于封装恢复命令。
- resume=’’Human provided input’’ 中的 resume 参数指定了在中断点处需要给工作流提供的恢复数据。在本例中,假设中断发生在 human_feedback 任务处, 并且需要人工输入数据作为恢复数据。
- for chunk in graph.stream(resume_command, config) 使用 .stream() 方法恢复之前中断的工作流。resume_command 对象作为输入传递给 .stream() 方法,以指示这是一个恢复操作并提供恢复数据。
2.4 与LangChain和LangSmith集成
与 Graph API 一样,Functional API 与 LangChain 生态系统深度集成。使用 Functional API 构建的工作流可无缝地利用 LangChain 的各类模型和工具对象实现。 这确保了兼容性,并允许开发者在 Functional API 范例中利用他们现有的 LangChain 知识和组件。
此外,使用 Functional API 创建的工作流还受益于 LangSmith 集成。LangSmith 为 Functional API 工作流提供跟踪、调试和监视的功能,就像为 Graph API 工作 流提供这些功能一样。执行跟踪(包括任务输入、输出和中间步骤)可以被记录到 LangSmith 中,便于调试、性能分析和迭代工作流改进。这种可观测性对于使用 Functional API 构建可靠且可用于生产的 AI 应用程序至关重要。
为了进一步强调 LangChain 集成,请考虑此示例。该示例在 @task 中使用了 LangChain 的 ChatOpenAI 对话模型。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20from langchain_openai import ChatOpenAI
from langgraph.func import task, entrypoint
from langgraph.checkpoint.memory import MemorySaver
llm = ChatOpenAI(model="Qwen/Qwen3-8B")
def generate_response(user_query: str) -> str:
""" 生成使用 LangChain LLM 的响应的任务 """
response = llm.invoke(user_query).content
return response
def chatbot_workflow(query: str) -> str:
""" 使用 LangChain LLM 的简单聊天机器人工作流 """
agent_response = generate_response(query).result()
return agent_response
config = {"configurable": {"thread_id": "chatbot_1"}}
response = chatbot_workflow.invoke(" 你好,你好吗? ", config)
print(f" 聊天机器人响应:{response}")此示例清楚地表明,Functional API 工作流可以轻松地与 LangChain 组件集成。 通过将 LangChain LLM 封装在 @task 函数中,开发者可以利用 LangGraph 的持久化、 可观测性和其他功能来构建由 LangChain 提供支持的 AI 应用程序。下图则展示了 Functional API 在 LangSmith 中的可视化表现。
- llm = ChatOpenAI(model=”Qwen/Qwen3-8B”) 初始化一个 LangChain ChatOpenAI 对话模型实例。这演示了 Functional API 工作流与 LangChain 组件的无缝集成。
- @task def generate_response(user_query: str) → str 定义了一个 @task 函数 generate_response,它接受 user_query: str 作为输入,并使用 LangChain LLM 生成文本响应。
- response = llm.invoke(user_query).content 在 generate_response 任务内部,调用 LangChain ChatModel 的 .invoke() 方法来根据用户查询生成响应。.content 属性用于提取 LLM 响应的文本内容。
- @entrypoint(checkpointer=MemorySaver()) def chatbot_workflow(query: str) → str 定义了一个名为 chatbot_workflow 的入口点,并配置了 MemorySaver 存档 点。此工作流旨在创建一个简单的聊天机器人。
2.5 常见工作流模式
- Functional API 支持多种常见的工作流模式。了解这些模式可以帮助开发者有效地利用 API 并构建强大的 AI 智能体系统。
2.5.1 任务的并行执行
@task 函数的异步特性天然支持并行执行。开发者可在 @entrypoint 函数中:
- 对于异步工作流:使用 asyncio.gather 并发多个 @task 调用。
- 对于同步工作流:顺序调用各 future 对象的 .result() 方法。
这种并行机制特别适合 I/O 密集型任务(如调用 LLM API),能显著提升工作流性能。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30from langgraph.func import task, entrypoint
from langgraph.checkpoint.memory import MemorySaver
from typing import Any
def task_one(item):
""" 模拟任务一 """
return f" 任务一处理 : {item}"
def task_two(item):
""" 模拟任务二 """
return f" 任务二处理 : {item}"
def parallel_workflow(items: list) -> dict[str, Any]:
""" 演示并行任务执行的工作流 """
futures = [task_one(item) for item in items] # 并行启动任务
results_task_one = [f.result() for f in futures] # 等待 task_one 结果
futures = [task_two(item) for item in items] # 并行启动任务
results_task_two = [f.result() for f in futures] # 等待 task_two 结果
return {"task_one_results": results_task_one, "task_two_ results": results_task_two}
config = {"configurable": {"thread_id": "parallel_workflow_1"}}
items_to_process = ["item_a", "item_b", "item_c"]
parallel_results = parallel_workflow.invoke(items_to_process, config)
print(parallel_results)
# {'task_one_results': [' 任务一处理 : item_a', ' 任务一处理 : item_b', ' 任务一处理 : item_c'], 'task_two_ results': [' 任务二处理 : item_a', ' 任务二处理 : item_b', ' 任务二处理 : item_c']}- Functional API 通过 @task 和 @entrypoint 装饰器实现高效的并行任务处理。示例中定义了两个任务函数 task_one 和 task_two 来模拟不同的处理步骤。工作流入口 parallel_workflow 使用 MemorySaver 存档点机制,通过列表推导式并发启动多个 @task 调用:首先生成 task_one 的 future 对象列表实现非阻塞并行执行,随后通过遍历 future 对象列表并调用 .result() 方法同步等待所有任务完成;接着以相同模式并行执行 task_two。
这一示例充分发挥了 Functional API 的异步优势,特别适合需要顺序执行多个并行阶段的 I/O 密集型场景(如批量调用 LLM API),通过 future 对象的非阻塞特性和显式结果同步机制,既能实现任务并行化提升吞吐量,又能确保阶段间的执行顺序和依赖关系。整个流程展现了如何优雅地协调并发执行与结果同步,为构建高性能 AI 工作流提供了标准化范式。
2.5.2 调用子图和其他入口点
Functional API 与 Graph API 具有天然的互操作性,开发者可以根据实际需求灵活选择或混合使用这两种编程范式。在系统架构层面,这两种 API 能够实现深度整合:既可以在 @entrypoint 函数中直接调用 Graph API 构建的工作流,也可以在 Graph 工作流中嵌入 Functional API 模块。这种双向调用机制为工作流设计提供了极大的灵活性——开发者可采用 Graph API 构建复杂的多节点业务流程,同时利用 Functional API 实现轻量级的任务编排逻辑。值得注意的是,系统还支持 @entrypoint 函数之间的相互调用,以及从 @task 中触发其他 @entrypoint 工作流,这种分层调用结构不仅实现了业务逻辑的模块化封装,更形成了可递归组合的工作流体系。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
def inner_workflow(input_value: str) -> str:
""" 内部工作流 """
return f" 内部工作流处理 : {input_value}"
# 父工作流,将重用存档点
def outer_workflow(input_value: str) -> dict:
""" 调用内部工作流的外部工作流 """
inner_result = inner_workflow.invoke(input_value) # 调用内部工作流
return {"outer_input": input_value, "inner_result": inner_result}
config = {"configurable": {"thread_id": "outer_workflow_1"}}
result = outer_workflow.invoke("Outer Input", config)
print(result)
# {'outer_input': 'Outer Input', 'inner_result': ' 内部工作流处理 : Outer Input'}在示例中定义了两个层级的工作流:inner_workflow 作为基础处理单元;outer_ workflow 作为组合控制器。这种设计具有三个主要特点。
- 存档点继承机制。当 outer_workflow 调用 inner_workflow 时,被调用的工作流会自动继承调用方的存档点配置,包括 MemorySaver 存储和 thread_id。这种设计确保了跨工作流的状态一致性,同时避免了重复配置。
- 显式调用语法。通过 .invoke() 方法实现的显式调用(如 inner_workflow.invoke(input_value)),既保持了调用语法的清晰度,又明确了工作流边界。这种设计比隐式调用更有利于调试和状态追踪。
- 组合式架构支持原子工作流的多次复用、 多层级的业务抽象、可独立测试的组件单元。这种嵌套结构特别适合需要分阶段状态管理的复杂业务场景,例如:
- 需要保存中间结果的 LLM 链式调用;
- 分步骤执行的批处理任务;
- 带故障恢复机制的长时间运行流程。
通过合理运用工作流组合和嵌套,开发者可以构建出既能保持组件独立性,又能实现复杂业务协同的 AI 应用架构。
2.5.3 流式传输自定义数据
@entrypoint 函数中的 writer 参数支持在工作流执行期间流式传输自定义数据事件。这对于向用户提供实时反馈、记录工作流进度或与监视系统集成非常有用。可将任何 JSON 可序列化数据写入 writer,这些数据将作为工作流输出的一部分进行流式传输,从而使客户端可订阅自定义数据流。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import StreamWriter
def streaming_workflow(input_data: str, writer: StreamWriter) -> str:
""" 演示自定义数据流式传输的工作流 """
writer(" 工作流开始处理输入......") # 自定义流数据
result = f" 已处理 : {input_data}"
writer(" 工作流处理完成。") # 更多自定义流数据
return result
config = {"configurable": {"thread_id": "streaming_workflow_1"}}
for chunk in streaming_workflow.stream("Example Input", config, stream_mode=["custom", "updates"]):
print(f" 流式块 : {chunk}")
# 流式块 : ('custom', ' 工作流开始处理输入......')
# 流式块 : ('custom', ' 工作流处理完成。')
# 流式块 : ('updates', {'streaming_workflow': ' 已处理 : Example Input'})在定义 streaming_workflow 入口点函数时,通过 checkpointer=MemorySaver() 配置了内存存档点机制。该函数的参数列表中包含一个特殊的 writer 参数,其类型标注为 StreamWriter。需要注意的是,只有当参数名称确认为 writer 且类型标注严格为 StreamWriter 时,LangGraph 运行时才会自动注入 StreamWriter 实例。
在函数实现中,可以通过调用 writer 函数将自定义数据流式传输到自定义流中, 例如输出工作流开始和结束的提示信息。这种方式支持多次调用,能够实现多批次数据的流式传输。当以流模式执行工作流时,可以通过 stream_mode 参数指定需要订阅的数据流类型,包括自定义流和状态更新流。执行过程中,系统会按顺序返回包含各类数据的混合数据块。
此示例展示了如何利用 Functional API 实现自定义数据的流式传输功能。通过在 入口点函数中声明 writer 参数并在适当位置调用其方法,开发者可以方便地将自定义数据实时推送给客户端,从而为系统监控和用户交互提供更丰富的实时反馈能力。
2.5.4 实现重试策略
对于可能出现瞬时性故障的任务,@task 装饰器提供了完善的重试机制支持。用户可以根据实际需求配置不同的重试策略,包括指数退避、固定延迟等多种方式,并灵活设定触发自动重试的特定条件。这一功能显著提升了工作流系统的可靠性和容错能力,有效避免了因临时性问题导致的流程中断,确保任务能够顺利完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25from langgraph.func import task, entrypoint
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import RetryPolicy
retry_policy = RetryPolicy(max_attempts=3, retry_on=ValueError) # 在 ValueError 时重试
def unreliable_task() -> str:
""" 可能失败并且需要重试的任务 """
import random
if random.random() < 0.7: # 模拟 70% 的失败率
raise ValueError(" 任务失败 !")
return " 任务成功 "
def retry_workflow(input: str) -> str:
""" 在任务上使用重试策略的工作流 """
result = unreliable_task().result()
return result
config = {"configurable": {"thread_id": "retry_workflow_1"}}
try:
result = retry_workflow.invoke("Test input", config)
print(f" 工作流结果 : {result}")
except ValueError as e:
print(f" 工作流在重试后失败 : {e}")在任务重试机制的实现中,RetryPolicy 实例可配置为在特定异常发生时自动重试。例如,retry_policy = RetryPolicy(max_attempts=3, retry_on=ValueError) 表示:当 任务抛出 ValueError 异常时,系统将最多重试 3 次。其中 max_attempts 参数指定最 大重试次数,retry_on 参数定义触发重试的异常类型。此外,RetryPolicy 还支持更复杂的重试条件配置,包括多种异常类型组合或自定义谓词函数。
通过 @task(retry=retry_policy) 装饰器,开发者可以将配置好的重试策略应用到具体任务函数。例如,在 unreliable_task 函数中,可以使用随机数模拟 70% 的失败概率:当 random.random() 小于 0.7 时抛出 ValueError 异常。工作流入口函数应当使用 try…except 块捕获可能的异常,确保即使在最大重试次数后任务仍然失败时,系统也能妥善处理。
此示例展示了 Functional API 通过 @task 装饰器的 retry 参数提供的高效重试策略。此重试策略显著提升了工作流的鲁棒性,使其能够自动处理瞬时性错误,同时避免了复杂的错误处理代码。
2.5.5 管理状态和内存
Functional API 提供了完善的短期和长期内存管理机制,使开发者能够构建具有状态感知能力的 AI 智能体。
在短期内存管理方面,系统通过 previous 参数维护线程内的对话上下文,配合 entrypoint.final() 方法实现当前会话的状态处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16# 在 Functional API 中实现短期记忆
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
from typing import Any
def counter_workflow(increment: int, *, previous: Any = None) -> entrypoint.final[int, int]:
""" 使用先前状态维护计数器的工作流 """
current_count = previous or 0
new_count = current_count + increment
return entrypoint.final(value=current_count, save=new_count) # 返回 prev,保存 new
config = {"configurable": {"thread_id": "counter_workflow_1"}}
print(counter_workflow.invoke(1, config)) # 输出:0(返回先前的计数)
print(counter_workflow.invoke(2, config)) # 输出:1(返回先前的计数)
print(counter_workflow.invoke(3, config)) # 输出:3(返回先前的计数)counter_workflow 入口点函数通过 @entrypoint(checkpointer=MemorySaver()) 装饰器定义,并配置了 MemorySaver 存档点。该函数签名包含以下两个关键部分。
- increment:int 作为输入参数。
- previous:Any = None 作为可注入参数,用于获取工作流前次状态。
函数返回类型标注为 → entrypoint.final[int, int],其中:
第一个 int 表示返回给调用方的值。
第二个 int 表示需要保存到存档点的状态值。
函数内部实现逻辑如下:
- current_count = previous or 0 实现状态初始化:
- 首次调用时 previous 为 None,current_count 初始化为 0。
- 后续调用时 current_count 取 previous 注入值。
- new_count = current_count + increment 计算新状态值。
- return entrypoint.final(value=previous, save=new_count) 返回结果;
- value=previous 确保每次返回前次状态值。
- save=new_count 将更新后的值保存为下次调用的初始状态。
三次调用过程具体如下。
首次调用 counter_workflow.invoke(1, config):
previous=None → current_count=0。
new_count=0+1=1。
返回 value=0,保存 save=1。
第二次调用 counter_workflow.invoke(2, config):
- previous=1 → current_count=1 。
- new_count=1+2=3。
- 返回 value=1,保存 save=3。
第三次调用 counter_workflow.invoke(3, config):
- previous=3 → current_count=3。
- new_count=3+3=6。
- 返回 value=3,保存 save=6。
此示例完整展示了 Functional API 通过 previous 参数和 entrypoint.final 类型实现的状态管理机制,使工作流能够跨多次执行维护上下文信息。
长期内存管理则通过 store 可注入参数实现,结合 LangGraph Store 的持久化存储能力,支持跨线程或会话的数据保存与读取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19# 在 Functional API 中实现长期记忆
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
from langgraph.store.memory import InMemoryStore
from langgraph.store.base import BaseStore
store = InMemoryStore() # 长期内存存储
def retrieve_user_preferences(user_id: str, store: BaseStore):
""" 从长期内存中检索用户偏好的任务 """
preferences = store.get(("user_preferences", user_id), "preferences") # 命名空间键
return preferences
# 注入存储
def personalized_workflow(user_id: str, query: str, store: BaseStore):
""" 访问长期记忆的工作流 """
user_prefs = retrieve_user_preferences(user_id, store).result()
# 在工作流逻辑中使用 user_prefs
return f" 为用户 {user_id} 个性化的工作流,偏好 : {user_prefs}"store = InMemoryStore() 创建了一个 InMemoryStore 实例,用作长期内存存储。 在实际应用中,建议使用更持久的存储解决方案,如向量数据库或关系数据库。
- @task def retrieve_user_preferences(user_id: str, store: BaseStore) 定义了一个 @task 函数,用于获取用户偏好数据。
- store: BaseStore:函数签名中包含了 store: BaseStore 参数。虽然 @task 函 数本身不直接支持可注入参数,但可以通过调用它的 @entrypoint 函数传递 store 参数。
- preferences = store.get((“user_preferences”, user_id), “preferences”): 使 用 store.get() 方法从长期内存存储中检索用户偏好数据。
- @entrypoint(checkpointer=MemorySaver(), store=store) def personalized_workflow(user_id: str, query: str, store: BaseStore):定义了个性化工作流的入 口点。
此概念代码片段概述了 Functional API 如何支持长期内存集成。通过在 @entrypoint 装饰器中配置 store 参数,并在入口点函数和任务函数中访问 store 可注入参数,实现跨工作流实例和用户会话持久化。
2.6 常见陷阱
使用 Functional API 时需注意常见陷阱并遵循最佳实践,以确保 LangGraph 应用的稳健性和效率。关键决策点在于确定逻辑应封装在 @task 中还是直接在 @ entrypoint 函数中实现。虽然小型工作流可直接在 @entrypoint 中编码,但 @task 为复杂生产级应用提供显著优势。
建议在以下场景使用 @task。
- 涉及 I/O 绑定或耗时操作时,如网络请求(API 调用、数据库查询、LLM 调 用)或大量计算任务,应使用 @task。这允许 LangGraph 异步执行这些操作, 提高工作流并发性和响应能力。@task 还能确保这些长时间运行的操作得到正确存档点,对工作流恢复和人机环路场景至关重要。
- 操作可能遇到瞬时故障,如网络故障、API 超时、速率限制等时,应将其封装在 @task 中并使用 retry 参数,实现自动重试以增强工作流鲁棒性。
- 需要存档点和恢复功能时,如希望操作结果被可靠保存并在工作流恢复时重用(例如人机环路中断或系统故障后),应使用 @task。只有 @task 结果会自动执行存档。
- 需要可观察性和跟踪时,将操作包装在 @task 函数中,可使它们在 LangSmith 中被单独跟踪。这种细粒度可观察性对调试、性能监视和了解工作流执行流程非常宝贵。
- 要求确定性工作流执行时,如人机环路等场景,应将可能不确定的操作(如 API 调用、随机数生成、时间敏感逻辑)封装在 @task 函数中,确保恢复工作流将遵循相同步骤序列并检索先前计算结果。
以下情况可谨慎在 @entrypoint 中使用直接代码:
- 逻辑简单且纯粹是计算性时,如非常简单的 CPU 绑定操作,保证快速可靠且不涉及任何 I/O 或潜在故障,可考虑直接在 @entrypoint 中实现。但即使逻辑简单,通常也建议使用 @task 以保持一致性和面向未来。
- 用于控制流和编排时,@entrypoint 函数本身旨在编排工作流逻辑,包括条件语句、循环和调用 @task 函数。可在 @entrypoint 中使用直接代码实现这些编排目的。
1
2
3
4
5
6
7
8
9
10
11
12# 不正确的工作流——API 调用不在 @task 中
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import MemorySaver
import requests
def weather_workflow(city_name: str) -> str:
"""不正确的工作流——API 调用不在 @task 中"""
api_url = f"https://weather-api.example.com/weather?city={city_ name}"
response = requests.get(api_url) # 直接 API 调用:潜在的陷阱
weather_data = response.json()
return f" 城市 {city_name} 的天气:{weather_data['condition']}"1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18# 正确的工作流——API 调用在 @task 中
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
import requests
def get_weather_data(api_url: str) -> dict:
""" 从 API 获取天气数据的任务 """
response = requests.get(api_url)
response.raise_for_status() # 为错误的响应引发 HTTPError(4xx 或5xx)
return response.json()
def weather_workflow(city_name: str) -> str:
"""正确的工作流:API 调用在@task中"""
api_url = f"https://weather-api.example.com/weather?city={city_ name}"
weather_data = get_weather_data(api_url).result() # API 调用现在 是一个任务
return f" 城市 {city_name} 的天气:{weather_data['condition']}"在代码实现中,不正确的示例是直接将 requests.get() 调用放在 @entrypoint 函数内部,这种做法存在三个主要问题。
- 存档点缺失:工作流中断后恢复时,API 调用会被重复执行,可能导致重复请求和数据不一致。
- 容错能力缺失:对于因网络故障导致工作流失败的场景, 缺乏自动重试机制。
- 可观测性受限:LangSmith 无法单独跟踪 API 调用,增加调试难度。
正确示例是将 API 调用封装在 @task 装饰的 get_weather_data 函数中,这解决了上述问题。
需要注意的是,@entrypoint 和 @task 的输入 / 输出必须是 JSON 可序列化的, 推荐使用 Python 原生类型或 Pydantic 模型,以避免序列化错误。
文件操作、邮件发送等具有副作用的操作必须封装在 @task 中,确保工作流恢复时的确定性行为,防止副作用被重复执行。
2.7 Functional API与Graph API的比较
Functional API 与 Graph API 的比较如表所示。
功能 Functional API Graph API (StateGraph) 控制流 隐式,由 Python 函数结构定义 显式,由图节点和边定义 状态管理 隐式,函数作用域,previous 参数 显式,State 对象,reducers 存档点 在 @entrypoint 执行后,任务结果更新 在每个 Superstep(节点执行) 之后 可视化 不直接支持 图可视化随时可用 代码风格 以函数为中心,命令式 / 函数式 以图为中心,声明式 复杂性处理 适用于更简单、线性的工作流 非常适合复杂、分支和并行工作流 最佳用例 快速原型设计,更简单的工作流, 与现有代码集成 复杂的智能体架构,多智能体系统,需要细粒度控制的工作流 何时选择 Functional API:
- 快速原型验证与概念测试。
- 处理线性或简单分支逻辑的工作流。
- 现有代码库的渐进式改造场景。
- 偏好标准 Python 控制流的开发模式。
- 不需要可视化辅助的调试或理解的场景。
何时选择 Graph API:
- 构建多智能体协同系统。
- 实现复杂状态机与决策树。
- 需要精确控制执行路径的流程。
- 涉及多方组件交互的分布式工作流。
- 依赖可视化调试的复杂架构。
重要提示:两种 API 都构建在相同的 LangGraph 运行时之上,并且可以在同一应用程序中一起使用。开发者可以根据项目需求选择最适合的 API,将 Functional API 的简洁性与 Graph API 的表达能力结合起来,以满足不同场景的需求。
总之,Functional API 通过提供以函数为中心的替代方案,扩展了 LangGraph 的可访问性和吸引力,使开发者能够使用更熟悉和直观的编程范例来更好地应用 LangGraph 的核心功能——持久化、内存、人机环路和流式传输。无论是快速原型设计、集成现有代码库,还是偏好基于函数的工作流设计方法,Functional API 都提供了强大而灵活的工具集。
同时,Graph API 则为复杂智能体架构、多智能体系统和需要细粒度控制的工作流提供了更强大的支持。通过可视化工作流和声明式定义,Graph API 在调试和复杂场景中表现出色。
通过同时提供 Functional API 和 Graph API,LangGraph 确保开发者可以选择最符合其技能、项目需求和开发风格的范例,从而推动更多创新,并促进 LangGraph 在构建下一代 AI 应用程序中的广泛应用。
3、API的选择
本节设计了一个简易的决策树,如图所示,以帮助开发者选择合适的 API, 之后还会通过真实场景的用例来说明在不同情况下如何做出最佳选择。
![image-20251130112209123]()
3.1 LangGraph API选择决策树
- 此决策树旨在指导开发者根据其项目需求和偏好选择最合适的 LangGraph API。 下面将分解每个决策点及其背后的原理。
- 开始:使用 LangGraph 构建 AI 智能体。这是决策流程的初始阶段。假设开发者计划使用 LangGraph 框架构建 AI 智能体, 现需评估并选择最适合项目需求的 API 接口方案。
- 简单的 ReAct 智能体是否满足应用需求。
- “是”分支:使用 create_react_agent。
- 分析:create_react_agent 作为预置解决方案,通过封装 ReAct 模式有效减少了样板代码和配置时间,支持快速原型开发和功能型 ReAct 智能体部署。该 API 特别适用于开发效率优先且标准 ReAct 架构能够满足需求的场景。
- 副分支:使用FunctionalAPI或GraphAPI ——若开发者对ReAct模式了解有限, 同时希望深入掌握 LangGraph 框架核心机制,可直接选用功能更完备的 Functional API 或 Graph API 进行开发。
- “否”分支:进入下一个决策点。
- “是”分支:使用 create_react_agent。
- 是否偏好基于函数的编程和快速开发。
- “是”分支:使用 Functional API。
- 分析:Functional API 在易用性和功能性之间实现了平衡。该接口既支持使用 LangGraph 核心功能(包括状态持久化、记忆管理、人机交互和流式处理),又能通过标准 Python 函数和控制流进行开发。相较于 Graph API,Functional API 减少了冗余代码,显著提升了中等复杂度工作流的开发效率。这一设计特别适合偏好函数式编程、不采用图结构开发模式的开发者群体。
- “否”分支:使用 Graph API(灵活性和可扩展性的默认选择)。
- 分析:Graph API 作为构建 LangGraph 应用程序的通用范式,虽然初期学习曲线较为陡峭,但提供了最强大的功能支持。即便当前需求相对简单,选择 Graph API 也 能为未来可能增长的复杂性或需求变更预留充足的扩展空间。当应用场景没有明确指向 create_react_agent 或 Functional API 时,Graph API 可作为稳健的默认选择方案。
- “是”分支:使用 Functional API。
3.2 API选择的案例分析
- 本节尝试用更贴近现实场景的案例,说明 API 的选择。
案例一:简单的天气信息聊天机器人
- 场景:开发一个基于天气工具的聊天机器人,用于查询多个预定义城市的天气信息。
- API 选择(对于经验丰富的开发者——ReAct 专家):create_react_agent。
- 理由:对于 ReAct 专家来说,create_react_agent 是最快捷和最容易的方式。他们已经了解 ReAct 模式,并且可以利用 create_react_agent 快速部署聊天机器人,而无须深入研究 Functional API 或 Graph API 的复杂性。
- API 选择(对于 LangGraph 新手开发者——没有 ReAct 经验):Functional API。
- 理由:刚接触 LangGraph 但对 Python 函数感到舒适的开发者可能会选择 Functional API 作为起点。它允许他们在构建功能性聊天机器人的同时,以更熟悉的编程风格学习 LangGraph 概念,然后再尝试更复杂的 Graph API。对于想要了解底层机制的初学者来说,create_react_agent 似乎像个“黑匣子”。
案例二:电子商务平台的复杂客户服务智能体
- 场景:开发一个客户服务智能体,它可以处理多轮对话,将查询路由到不同的部门,将复杂的问题升级到人工智能体,访问产品知识库,处理订单,并与 CRM 系统集成。
- API 选择(对于经验丰富的开发者——Graph API 专家):Graph API。
- 理由:经验丰富的开发者自然会为这种复杂的应用程序而选择 Graph API 。他们可以利用自己的专业知识,使用 Graph API 的强大功能设计高度定制和可扩展的智能体架构。
- API 选择(对于中级开发者——有 Functional API 经验):Graph API,需要投入时间学习。
- 理由:具有 Functional API 经验但刚接触 Graph API 的开发者仍然可能会为此复杂应用程序选择 Graph API,因为他们认识到 Graph API 对于处理复杂的需求是必不可少的。
带有人工审核和批准的博客文章草稿生成器
- 场景:创建一个工具,用于生成博客文章草稿,暂停以进行人工审核和编辑, 然后在人工批准后最终确定文章。
- API 选择(对于经验丰富的开发者——Functional API 专家):Functional API。
- 理由:熟悉 Functional API 的开发者能够运用函数和 interrupt 机制高效构建该应用程序,充分发挥现有技能实现快速开发。针对此类应用场景,Functional API 在功能完备性与实现简洁性之间实现了理想平衡,其基于函数式的开发范式与内容创作及审核的线性流程特性高度契合。
- API 选择(对于 LangGraph 新手和初级开发者):若可接受简化的 ReAct 类行为, 可选用 create_react_agent;若以学习为目的,则推荐 Functional API。
- 理由:刚接触 LangGraph 的初级开发者往往会优先选择 create_react_agent,因其操作简便;但如果希望深入了解 LangGraph 并构建更具定制化的工作流(例如人机环路),Functional API 会是更优的学习路径。
案例四:个性化电影推荐系统
- 场景:构建一个推荐系统,该系统根据长期用户偏好、实时观看历史记录和复杂的排名算法提供个性化的电影推荐。
- API 选择(无论开发者经验如何):Graph API。
- 理由:由于个性化推荐系统固有的复杂性,Graph API 几乎总是最合适的选择。Graph API 的灵活性和控制力对于构建健壮且可用于生产的推荐引擎至关重要。
案例五:自动化数据提取和转换管道
- 场景:开发一个管道,用于从多个来源提取数据,执行数据验证和清理,将数据转换为所需的格式,并将其加载到数据仓库中。
- API 选择(对于中级开发者——熟悉 Python 脚本):Functional API。
- 理由:熟悉 Python 脚本和基于函数的编程的开发者可以使用 Functional API 有效地构建此管道。数据管道的线性性质与 Functional API 的优势非常吻合。
