LangGraph的状态图结构

1、核心概念

  • LangGraph 的核心在于其简洁而强大的状态图结构,这一模型的基石由四个核心原语构成:状态(State)、节点(Node)、边(Edge)和命令(Command)。可以将这四个核心原语比作搭建智能体系统的基础模块,只有理解它们的概念及其相互作用方式,才能构建出功能强大的 AI 智能体系统。

1.1 状态

  • 在 LangGraph 中,状态是贯穿智能体系统运行始终的核心概念。我们可以将其理解为智能体的“短期记忆”“工作记忆”或者“临时共享数据空间”,它承载着智能体在执行过程中产生的各种信息,包括用户输入、中间结果、工具输出、对话历史等,如图所示。状态不仅是节点间信息传递的桥梁,也是智能体进行决策和行为调整的重要依据。状态的有效管理,是构建具有上下文感知能力的 AI 智能体的基础。

    image-20251101120357128
  • LangGraph 在状态定义上提供了极大的灵活性,允许开发者根据实际需求选择合适的数据结构。可以使用 Python 标准库中的 typing.TypedDict 定义具有类型提示的字典结构状态;也可以使用 dataclasses 或者 Pydantic 来定义状态模型。TypedDict 适合快速定义简单状态结构,而 dataclasses 和 Pydantic 提供了更强大的数据建模和验证能力。尤其是 Pydantic,它支持运行时数据验证,能确保状态的类型和取值符合预期, 这对于构建健壮的 AI 智能体系统至关重要。 例如,可以使用 Pydantic 定义状态模型, 明确指定字段类型(如字符串、整型)并通过校验器约束字段取值范围。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    from typing_extensions import TypedDict
    from pydantic import BaseModel,field_validator

    # 使用TypedDict定义状态
    class TypedDictState(TypedDict):
    user_input: str
    agent_response: str
    tool_output: str

    # 使用Pydantic定义状态,并进行数据验证
    from pydantic import BaseModel,field_validator

    class PydanticState(BaseModel):
    user_input: str
    agent_response: str
    tool_output: str
    mood: str = "neutral" # 默认情绪状态为 neutral

    @field_validator('mood')
    @classmethod
    def validate_mood(cls, value):
    if value not in ["happy", "sad", "neutral"]:
    raise ValueError(" 情绪状态必须是 'happy', 'sad' 或 'neutral'")
    return value
  • 在 LangGraph 系统中,状态作为第一个参数传递给每个节点。节点既可以从状态中读取信息,也可以写入新信息,实现节点间的数据共享和状态更新。状态中的键 (Key)可视为系统中的一个信息通道,节点通过这些通道进行数据交换。默认情况下,节点返回新的状态值会覆盖之前的状态值。此外,LangGraph 还提供了私有 / 公共状态、输入 / 输出状态结构体、状态归约器(State Reducer),以及消息 (Message) 与 MessageState 等高级机制,以满足更复杂的状态管理需求。

1.1.1 私有状态与公共状态

  • 在构建复杂的智能体系统时,我们常常需要区分公共状态 (Public State) 和私有状态 (Private State)。公共状态指需要在多个节点间共享的信息,是整个图结构中所有节点都可见的“共享数据空间”,如对话历史、用户偏好、最终回复等。私有状态则仅在特定节点内部使用,用于存储中间计算结果、临时变量或者敏感数据。这种区分有助于提高系统的模块化程度和安全性,避免不必要的信息泄露和状态污染, 如图所示。

    image-20251101122023631
  • LangGraph 采用多结构体(Schema)机制管理私有状态。我们可以为整个图定义一个全局公共状态结构体(Overall State), 用于存储节点间共享信息。同时,我们可以为特定的节点定义其专属私有状态结构体(Private State)。当节点被定义为接收私有状态输入时,LangGraph 会自动处理状态的转换和传递,确保私有状态只在该节点及其后续的流程中可见,不影响公共状态或其他节点。例如,定义 OverallState 存储全局对话信息,同时为工具调用节点定义 ToolState 作为私有状态,包含工具的配置参数、认证信息等不需要在整个图结构中暴露的信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    from langgraph.graph import StateGraph
    from typing_extensions import TypedDict


    # 定义全局的公共状态结构体Schema
    class OverallState(TypedDict):
    user_input: str
    agent_response: str


    # 定义节点的私有状态结构体Schema
    class ToolState(TypedDict):
    api_key: str
    tool_config: dict


    # 定义一个使用私有状态的节点
    def tool_node(state: ToolState) -> OverallState:
    # 节点逻辑,例如调用工具 API并根据ToolState中的配置进行操作
    api_client = create_api_client(state['api_key'], state['tool_config'])
    response = api_client.call_api(state['user_input'])
    return {"agent_response": response} # Return the updated public state


    # Graph 构建代码
    builder = StateGraph(OverallState) # 图使用公共状态 OverallState
    builder.add_node("tool_node", tool_node) # 添加工具节点

1.1.2 输入/输出结构体

  • LangGraph 支持为整个图定义输入 / 输出结构体(Input/Output Schema)。输入结构体定义了调用图时可接受的输入数据结构,限定了图的初始状态输入范围;输出结构体则定义了图执行结束后返回的状态数据结构,明确了图的最终输出包含的信息内容。通过定义输入 / 输出结构体,我们可以对图的输入和输出进行更严格的类型约束,使图的接口更加清晰和规范,也方便了图与其他系统或组件进行集成。

  • 通常情况下,我们会定义一个完整的内部状态结构体(Internal State),包含图运行过程中所有可能用到的状态键。在此基础上,可以定义更精简的输入结构体和输 出结构体作为图的“外部接口”。输入结构体通常是全局状态结构体的子集,仅包含必要的初始输入状态键。输出结构体同样是全局状态结构体的子集,仅包含需要输出的状态键。这种设计既保持了图内部状态的完整性和灵活性,又对外提供了简洁规范的接口。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph

    # 定义完整的内部状态结构体
    class InternalState(TypedDict):
    user_query: str
    search_results: list[str]
    llm_response: str
    debug_info: str # 内部调试信息,不需要对外暴露


    # 定义输入结构体(只包含user_query)
    class InputSchema(TypedDict):
    user_query: str


    # 定义输出结构体(只包含llm_response)
    class OutputSchema(TypedDict):
    llm_response: str


    # Graph 构建代码,使用 InternalState 作为图的状态结构体
    graph = StateGraph(InternalState, input_schema=InputSchema, output_schema=OutputSchema) # 定义输入 / 输出结构体
    • 调用 graph.invoke(input_data) 时,LangGraph 会根据 InputSchema 对 input_data 进行校验,确保其符合定义。执行结束后,LangGraph 会根据 OutputSchema 过滤最终的状态,只返回 OutputSchema 中定义的那些状态键,实现对输出的控制。输入 / 输出结构体为 LangGraph 图结构提供了边界约束和接口抽象,使其更易于集成到更大的系统中,并提高了代码的可维护性和可读性。

1.1.3 状态归约器

  • 状态归约器是 LangGraph 提供的核心机制,用于自定义状态更新逻辑。它特别适用于处理并发子状态更新、复杂数据结构,以及需要特定合并策略的场景,如图所示。LangGraph 不仅提供了内置归约函数,还支持自定义归约函数,赋予开发者最大灵活性。

    image-20251101124318915
  • 在默认情况下,节点返回的新状态值会覆盖之前的状态值。然而,在某些场景下, 这种简单策略可能不适用。例如,当多个节点并行更新同一个状态键时,需要协调并发状态更新;或者,需要实现累加数值、合并列表、更新对话历史等特定更新逻辑, 而不是简单的覆盖。状态归约器就是为解决这些问题而设计的。

  • 状态归约器是一个定义了新旧状态合并方式的函数。通过 typing.Annotated 类型提示,可以为状态结构体中的每个状态键指定一个归约函数。当多个节点更新同一个状态键时,LangGraph 会调用对应的归约函数,将已有的状态值和新的状态值作为输入,返回合并后的新状态值。LangGraph 内置了常用归约函数,如 operator.add (用 于数值累加和列表合并)、langgraph.graph.message.add_messages(用于消息列表的追加和去重)等。 当然,我们也可以自定义归约函数,以实现更复杂的状态更新逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    from typing_extensions import TypedDict, Annotated
    from langgraph.graph.message import add_messages
    from langchain_core.messages import BaseMessage


    # 定义状态 Schema,并为 message_history 键指定 add_messages Reducer
    class ChatState(TypedDict):
    message_history: Annotated[list[BaseMessage], add_messages]
    user_intent: str
    tool_output: str


    # 在Graph构建时,使用ChatState作为状态Schema
    builder = StateGraph(ChatState)
    • 在上面的例子中,我们为 ChatState 的 message_history 键指定了 add_messages Reducer。当多个节点同时更新 message_history 时,add_messages Reducer 会将新消息追加到已有的消息列表中,而不是直接覆盖。这种声明式的状态更新方式,使我们可以更精细地控制状态演变过程,在处理并发更新、复杂数据结构或者需要特定合并逻辑的场景中尤为重要。
  • 定义自定义状态归约器函数需要创建一个接收两个参数并返回一个值的 Python 函数。

    • left(已有状态值):归约函数关联的状态键当前值。
    • right(新状态更新值):节点返回的更新值。
  • 归约函数的核心逻辑就是将 left 和 right 这两个值合并为新值返回,返回值类型需要与状态键类型一致。例如,对于类型为 list[str] 的状态键 item_list,若需要将新列表追加到已有的列表末尾并去重,则定义如下自定义归约函数。

    1
    2
    3
    4
    5
    6
    7
    8
    def reducer_extend_unique(left: list[str] | None, right: list[str] | None) -> list[str]:
    """
    自定义归约函数,用于合并两个字符串列表,并进行去重
    """
    existing_items = left if left else [] # 如果 left 为 None,则初始化为空列表
    new_items = right if right else [] # 如果 right 为 None,则初始化为空列表
    combined_items = existing_items + new_items
    return list(set(combined_items)) # 使用 set 去重并转换为 list 返回
    • 在 reducer_extend_unique 函数中,我们首先处理了 left 和 right 可能为 None 的 情况,将其初始化为空列表,然后将两个列表合并,使用 set 数据结构进行去重,最后将去重后的结果转换为 list 返回。
  • 定义自定义归约函数后,需要使用 typing.Annotated 将其关联到状态结构体中的目标状态键。例如,将 reducer_extend_unique Reducer 应用于 ChatState 中的 item_list 状态键,以定义状态结构体。

    1
    2
    3
    4
    5
    6
    7
    8
    from typing_extensions import TypedDict, Annotated
    from langgraph.graph.message import add_messages

    class ChatState(TypedDict):
    message_history: Annotated[list[BaseMessage], add_messages]
    user_intent: str
    tool_output: str
    item_list: Annotated[list[str], reducer_extend_unique] # 应用自定义归约函数
    • 这样,当有节点尝试更新 ChatState 中的 item_list 键时,LangGraph 就会自动调用 reducer_extend_unique 函数来进行状态合并,确保 item_list 中的元素是唯一且累计添加的。

1.1.4 Message与MessageState

  • 在构建对话型 AI 智能体时,对对话历史(Conversation History)的管理至关重要。 LangGraph 为此专门引入了 Message(消息) 和 MessageState 的概念,针对对话场景优化状态管理。

  • 消息是表示对话轮次中用户输入、AI 模型回复、工具调用等信息的标准数据结构, 是构成对话历史的基本单元。LangChain 定义了多种消息类型,以结构化的方式,区分对话中不同参与者的发言和不同性质的信息,具体如下所示。

    • HumanMessage:代表人类用户的消息,例如用户的自然语言输入、问题、 指令等。它是对话的起点,驱动着 AI 智能体系统的运转。例如,“你好” “我想预订一张明天去北京的机票” “请总结一下刚才的对话内容”,等等。
    • AIMessage:代表 AI 模型生成的消息,例如模型对用户输入的回复、对话、 澄清问题、指令、总结、创作内容等。它是对话的核心内容,体现了智能体的智能水平和对话能力。例如,“您好!有什么我可以帮您?” “您想预订哪个时间段的机票呢?” “本次对话主要讨论了……” “好的,这是为您总结的对话内容:……”, 等等。
    • ToolMessage:代表工具调用结果。例如, “天气查询工具已成功调用,北京今天天气:晴,24~32 摄氏度” “机票预订工具调用失败,原因:该时间段机票已售罄” “搜索引擎返回了 5 条相关结果,内容如下:……”,等等。
    • SystemMessage:代表系统发出的消息,通常用于引导 AI 模型的行为,例如设定对话目标,约束回复风格,提供背景知识等。它在对话的幕后发挥作用,影响着 AI 模型的表现。例如,“你是一个专业的客服机器人,请使用友好的语气回复用 户” “请基于以下背景知识回答用户问题:……” “本次对话的目标是帮助用户完成机票预订” “请尽可能详细地回复用户,并主动询问用户是否还有其他问题”等。
  • 每种消息类型都结构化地包含了消息内容(content)和元数据,例如消息发送者 (name)、消息 ID(id)等。content 属性存储消息的文本内容,是消息的核心信息。 元数据则提供了消息的上下文信息和控制信息。这种结构化表示便于管理和操作对话历史,包括追溯对话轮次、清晰地区分用户和 LLM 大模型的发言、准确地追踪工具调用过程,并基于消息类型和内容,进行更精细的对话策略控制 (例如,根据用户消息类型判断用户意图,根据工具消息类型判断工具执行状态)。尤其是在使用聊天模型 ChatModel 时,LLM 通常被设计为原生接受一个结构化的消息列表作为输入, 并返回一个 AIMessage 作为回复。

  • MessageState 是 LangGraph 预置的状态结构体,专门简化消息列表的管理。 MessageState 本质上是 TypedDict 的子类,预定义 messages 状态键,并且默认应用 add_messages Reducer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from langgraph.graph import MessagesState

    class MyChatState(MessagesState):
    """
    自定义的ChatState, 继承自MessageState, 自动包含 messages 状态键和 add_messages Reducer
    """
    user_intent: str
    tool_output: str
    # 可以添加其他自定义的状态键
    • 内置 messages 状态键:MessageState 强制预置了 messages 状态键,用于存储对话历史消息列表。开发者无须手动定义,遵循“约定优于配置”的原则,提高开发效率和代码一致性。 messages 是 MessageState 的核心,也是其所有特性的基础。
    • 默认 add_messages Reducer:MessageState 强制为 messages 状态键绑定了 add_messages Reducer,自动将新消息追加到已有的消息列表中,处理消息 ID 的更新和消息去重,深度优化了消息管理的效率和正确性,保证了对话历史消息列表的完整性和一致性。add_messages Reducer 是 MessageState 智能消息管理能力的核心驱动力。
    • 消息序列化与反序列化:add_messages Reducer 还隐式且无缝地集成了消息序列化和反序列化的功能,支持以 JSON 字典格式 (例如 { “type”: “human”, “content”: “ 你好 “ }) 传递消息数据,便于状态的持久化和跨组件数据交换。消息序列化与反序列化功能的无缝集成,使 MessageState 具备了良好的跨组件互操作性和状态持久化能力。
    • 可扩展性:作为 TypedDict 子类,MessageState 允许添加任何自定义状态键, 如 user_intent、tool_output 等状态键,以满足业务需求。MessageState 在提供强大的消息管理能力的同时,也保持了良好的可扩展性。
  • 因此,MessageState 是构建对话型 LangGraph 应用的理想选择。当然,对于特殊的消息管理需求,或者需要完全自定义状态更新逻辑,LangGraph 也支持完全自定义状态结构体和 Reducer。

1.1.5 trim_messages和RemoveMessage在消息状态管理中的应用

  • 在了解 Message 和 MessageState 的基础上,我们可以更深入地探讨 trim_ messages 和 RemoveMessage 这两个工具函数在对话历史管理中的具体应用。

    • trim_messages:该函数主要用于限制消息列表中词元(Token)的总长度, 防止对话历史无限增长导致 LLM 处理效率降低和成本增加。在使用 MessageState 时, 我们可以在节点内部调用 trim_messages 函数对 state[‘messages’] 进行修剪,然后将修剪后的消息列表传递给 LLM。这通常在每次调用 LLM 之前进行,以确保 LLM 接收到的消息列表始终保持在合理长度范围内。 trim_messages 通常不与状态归约器直接配合使用,它更多作为一种在节点内部预处理消息的手段。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      from langchain_openai import ChatOpenAI
      from langchain_core.messages import trim_messages
      from langgraph.graph import MessagesState


      def llm_node(state: MessagesState):
      message_history = state['messages'] # 直接从 MessageState 中获取消息列表
      trimmed_messages = trim_messages(
      message_history,
      max_tokens=1000,
      strategy="last",
      token_counter=ChatOpenAI(model="gpt-4o"),
      allow_partial=False
      )
      llm_response = llm.invoke(trimmed_messages)
      return {"messages": [llm_response]} # 将 LLM 响应添加到消息历史 (通过 add_messages Reducer)
    • RemoveMessage:更准确地说,filter_messages 是基于 RemoveMessage 和 add_messages Reducer 的消息过滤机制,它提供了一种更灵活、更精细的对话历史管理方式。由于 MessageState 已经默认使用了 add_messages Reducer,我们可以很方便 地在节点中生成 RemoveMessage 列表,并将其作为状态更新返回,LangGraph 会自动完成对消息的过滤和删除。这种方式充分利用了状态归约器的强大能力,实现了可定制化的消息状态管理。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      from langgraph.graph import MessagesState
      from langchain_core.messages import RemoveMessage

      def filter_message_node(state: MessagesState): # 节点输入类型为 MessageState
      message_history = state['messages'] # 直接从 MessageState 中获取消 息列表
      messages_to_remove = []
      for message in message_history:
      if should_remove_message(message): # 假设有函数来判断消息是否需 要移除 (例如,移除寒暄语)
      messages_to_remove.append(RemoveMessage(id=message.id))
      return {"messages": messages_to_remove} # 返回RemoveMessage列表,LangGraph 会自动处理

1.2 节点

1.2.1 图添加节点

  • 节点是 LangGraph 图结构中的基本计算单元。每一个节点都封装了一个独立的执行逻辑,例如调用语言模型、执行工具、进行条件判断,或者仅仅是一个简单的数据处理函数。你可以将节点视为 AI 智能体系统中的“执行器”,它们负责完成具体的任务,并驱动着整个 AI 智能体的运转。

  • 在 LangGraph 中,节点本质上就是一个 Python 函数。这个函数接收当前的状态作为输入,并返回一个新的状态(或者状态的更新部分)作为输出。这种函数式的设计使节点具有良好的可测试性和可复用性,同时让 LangGraph 图结构的定义更加清晰和简洁。

    1
    2
    3
    4
    5
    6
    7
    def my_node(state):
    # 从状态中读取数据
    input_data = state.get("some_key")
    # 执行节点执行逻辑 (例如调用 LLM、工具等)
    output_data = process_data(input_data)
    # 返回新的状态(或状态的更新部分)
    return {"some_key": output_data, "another_key": new_value}
  • 值得注意的是,节点函数不一定要返回完整的状态对象,可以只返回需要更新的状态键值对。LangGraph 会自动将节点返回的更新部分合并到当前状态中。例如,若状态包含了 user_input、 agent_response 和 tool_output 三个键,而某个节点只需要更新 agent_response,则该节点函数只需要返回 {“agent_response”: new_response} 即可。 LangGraph 会自动将 new_response 更新到状态的 agent_response 键,而 user_input 和 tool_output 键的值则保持不变。

  • LangGraph 的节点类型非常灵活,任何 Python 可调用对象,只要其接受状态作为第一个参数并返回一个字典,都可以作为 LangGraph 的节点。这为开发者提供了极大的自由度,可将 LangChain 的 Chain、Agent、Tool,甚至是简单的 Python 函数,封装成节点, 从而充分利用 LangChain 生态系统丰富的资源。例如,将预定义的 LCEL 调用链或者 Runnable 对象封装为节点,负责生成智能体的回复;或者将工具函数封装为节点,让智能体具备调用外部 API 的能力;甚至将简单的条件判断函数封装为节点,实现流程的分支控制。节点的多样性和灵活性使 LangGraph 能够适应各种复杂的智能体系统构建需求。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    # 一个包含 LLM 节点的 LangGraph 图
    from langgraph.graph import StateGraph, START, END, MessagesState
    from langchain_core.prompts import ChatPromptTemplate
    from langchain_openai import ChatOpenAI


    # 定义状态结构体
    class ChatState(MessagesState): # 使用 ChatState 替代原有的 State,并继承 自 MessageState
    user_question: str # 保留 user_question 状态键
    llm_response: str # 保留llm_response状态键

    # 定义LLM节点
    def llm_node(state):
    prompt = ChatPromptTemplate.from_messages([
    ("human", "{question}")
    ])
    model = ChatOpenAI(model="gpt-4-turbo")
    chain = prompt | model
    response = chain.invoke({"question": state['user_question']}).content
    return {"llm_response": response}

    # 构建图
    builder = StateGraph(ChatState) # 使用 ChatState 替代原有的 State
    builder.add_node("llm_node", llm_node)
    builder.add_edge(START, "llm_node")
    builder.add_edge("llm_node", END)
    graph = builder.compile()

    # 调用图
    result = graph.invoke({"user_question": " 你好,LangGraph ! "})
    print(result)
    # {'messages': [], 'user_question': ' 你好,LangGraph ! ', 'llm_response': '你好!有什么我可以帮助您的吗?'}
  • 在实际的 AI 智能体系统中,节点执行过程可能会遇到各种瞬时性的错误或异常。 例如,当节点调用外部 API(如 LLM API、搜索引擎 API、 数据库 API 等)时,可能会遇到网络抖动、服务限流、API 超时等问题,导致 API 调用失败或返回错误。为了提高 AI 智能体系统的健壮性和稳定性,避免因瞬时错误导致流程中断,LangGraph 提供了节点重试(Node Retries)机制。通过为节点配置重试策略 (Retry Policy), LangGraph 可以在节点执行失败时自动按照预设的策略重试,提高节点容错能力,确保流程平稳运行。

1.2.2 节点重试策略

  • 为 LangGraph 节点配置重试策略时,使用 builder.add_node() 方法,要通过 retry 参数传入 RetryPolicy 对象。该对象定义了具体的重试策略,包括重试条件、重试次数、 重试间隔、退避策略等。LangGraph 提供了 langgraph.pregel.RetryPolicy 类,用于创建 RetryPolicy 对象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    import operator
    import sqlite3
    from typing import Annotated, Sequence
    from typing_extensions import TypedDict
    from langchain_openai import ChatOpenAI
    from langchain_community.utilities import SQLDatabase
    from langchain_core.messages import AIMessage, BaseMessage
    from langgraph.graph import StateGraph, START, END
    from langgraph.types import RetryPolicy # 导入RetryPolicy类

    # 定义数据库和模型
    db = SQLDatabase.from_uri("sqlite:///:memory:")
    model = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct")


    # 定义图的状态和逻辑节点
    class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]


    def query_database(state):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}


    def call_model(state):
    response = model.invoke(state["messages"])
    return {"messages": [response]}


    # 定义图 builder
    builder = StateGraph(AgentState)

    # 为 query_database 节点配置重试策略:针对 sqlite3.OperationalError 异常 进行重试
    builder.add_node(
    "query_database",
    query_database,
    retry=RetryPolicy(retry_on=sqlite3.OperationalError), # 配置RetryPolicy,retry_on 参数指定重试条件为sqlite3.OperationalError异常
    )

    # 为model节点配置重试策略:最多重试 5 次 (默认重试条件)
    builder.add_node(
    "model",
    call_model,
    retry=RetryPolicy(max_attempts=5), # 配置 RetryPolicy,max_attempts 参数指定最大重试次数为 5
    )
    builder.add_edge(START, "model")
    builder.add_edge("model", "query_database")
    builder.add_edge("query_database", END)

    graph = builder.compile()
    • query_database 节点配置了 RetryPolicy(retry_on=sqlite3.OperationalError), 其中 retry_on 参数指定了重试条件为 sqlite3.OperationalError 异常。当 query_database 节点执行过程中抛出 sqlite3.OperationalError 异常时,LangGraph 会自动重试,直到节点执行成功 (不再抛出 sqlite3.OperationalError 异常) 或达到最大重试次数 (默认为 3 次)。retry_on 参数可接受单个异常类或异常类列表,本示例中只针对数据库操作错误 sqlite3.OperationalError 进行重试,其他类型的异常 (例如,代码逻辑错误、参数类型错误等)将直接抛出异常。
    • model 节点配置了 RetryPolicy(max_attempts=5),未指定 retry_on 参数时采用默认重试条件,即针对绝大多数异常类型都进行重试 (具体可参考 RetryPolicy 类的 default_retry_on 函数定义)。当 model 节点执行过程中抛出任何异常时, LangGraph 最多重试 5 次。如果重试 5 次后,节点仍然执行失败,则不再重试,抛出异常。max_attempts 参数用于限制最大重试次数,防止节点无限重试,导致系统资源耗尽。
  • RetryPolicy 类提供了丰富的参数,用于定制重试策略,以满足不同应用场景的需求。

    • initial_interval:浮点数,初始重试间隔默认为 0.5s。
    • backoff_factor:浮点数,重试间隔退避(Backoff)倍数,默认为 2.0。例如, 如果 initial_interval=0.5, backoff_factor=2.0,那么第一次重试间隔为 0.5s, 第二次重试间隔为 0.5×2 = 1s,第三次重试间隔为 1×2 = 2s,依次类推,重 试间隔呈指数级增长,避免短时间内频繁重试,加剧下游服务的压力。
    • max_interval:最大重试间隔,默认为 128.0s。即使重试间隔按照退避因子增长, 也不会超过 max_interval 的限制,防止重试间隔无限增长导致的长时间等待。
    • max_attempts:最大重试次数,默认为 3。包括首次执行和后续重试在内,节 点总共最多尝试执行 max_attempts 次。超过最大重试次数后,节点将不再重试, 直接抛出异常。
    • jitter:是否在重试间隔时间上添加随机抖动,默认为 True。添加抖动可以避 免多个客户端同时在同一时刻重试,造成下游服务的瞬时压力,提高系统的鲁棒性。
    • retry_on:指定重试的异常类型。可以接受单个异常类或异常类列表。如果不 指定 retry_on,则默认针对绝大多数异常类型进行重试 (除了如 ValueError、 TypeError 等代码逻辑错误和 OSError 等操作系统错误,以及 requests 和 httpx 等 HTTP 请求库的 5××服务器错误)。建议根据节点的具体功能和可能遇到 的错误类型,精确地设置 retry_on,避免对不应该重试的错误进行不必要的重试,浪费计算资源。
  • 节点重试机制适用于处理瞬时性错误,如下所示。

    • API 调用错误:网络波动、服务限流、API 超时等导致的 API 调用(如 LLM API、工具 API)失败。
    • 数据库操作错误:数据库连接超时、数据库繁忙、SQL 执行错误(如 sqlite3. OperationalError) 等。
    • 资源竞争错误:并发访问共享资源(如文件、缓存)时,可能出现的资源竞争错误。
  • 对于代码逻辑错误、参数类型错误、配置错误等不可恢复的错误,不建议使用 重试机制,直接抛出异常(Fail-fast)。重试机制应该只针对明确可恢复的错误场景。

1.3 边

  • 边在 LangGraph 中负责连接不同节点,定义 AI 智能体系统的执行流程。边决定 了在执行完当前节点后下一步应该执行的节点,将独立的节点串联成有机整体,赋予 图结构动态执行能力。LangGraph 主要支持两种类型的边:普通边和条件边。理解这 两种类型边的特性及应用场景是设计复杂 LangGraph 流程的关键。
    • 普通边定义了节点间固定的、无条件的连接关系,用于构建线性、顺序执行流程。若需要在执行完节点 A 后总是执行节点 B,则可以用 builder.add_edge(start_node, end_node) 在节点 A 和节点 B 之间添加普通边, 其中 start_node 为起始节点,end_node 为目标节点。执行时,系统执行完 start_node 后会无条件转移到 end_node。
    • 条件边提供了基于状态动态路由的能力,用于构建分支的、非线性且动态可变的流程。它通过条件函数定义路由逻辑:该函数接收当前状态作为输入,并根据状态内容动态返回下一步要执行的节点名称(字符串)。LangGraph 会根据返回值,使执行路径随状态变化而变化,从而构建出更智能灵活的系统。条件边使用 builder.add_ conditional_edges(start_node, conditional_function) 来 定 义, 其 中 conditional_function 的返回值必须是图中已定义的节点名称。

1.3.1 意图识别与技能路由

  • 在智能客服系统中,当用户输入问题后,系统首先通过“意图识别节点”识别用户意图 (如查询天气、预订机票、投诉建议等),然后,根据识别出的用户意图,动态跳转到对应的“技能执行节点” (如“查询天气节点”“预订机票节点”“投诉建议处理节点”等)。这时,“意图识别节点”和“技能执行节点”之间就需要使用条件边来连接,实现基于意图识别的动态路由。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    def route_to_skill(state): # 条件函数,根据用户意图路由到不同的技能节点
    user_intent = state['user_intent']
    if user_intent == " 查询天气 ":
    return "weather_query_node" # 跳转到查询天气节点
    elif user_intent == " 预订机票 ":
    return "flight_booking_node" # 跳转到预订机票节点
    elif user_intent == " 投诉建议 ":
    return "complaint_suggestion_node" # 跳转到投诉建议处理节点
    else:
    return END # 无法识别意图,结束流程

    builder.add_conditional_edges("intent_recognition_node", route_to_skill) # 意图识别节点 → 技能执行节点 ( 条件边)

1.3.2 工具选择与结果处理

  • AI 智能体需根据任务目标动态选择工具。“工具选择节点”根据当前状态决定调用哪个工具,并根据执行结果动态决定后续流程:工具执行成功则进入“结果处理节点”进行分析总结;执行失败则进入“错误处理节点”进行恢复或重试。“工具选择节点”和后续处理节点之间,以及“工具执行节点”和结果 / 错误处理节点之间,都需要使用条件边来实现动态路由。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    def route_after_tool_selection(state): # 条件函数,根据工具选择结果路由
    tool_name = state['selected_tool']
    if tool_name == " 搜索引擎 ":
    return "search_tool_node" # 跳转到搜索引擎节点
    elif tool_name == " 计算器 ":
    return "calculator_tool_node" # 跳转到计算器节点
    else:return END

    def route_after_tool_execution(state): # 条件函数,根据工具执行结果路由
    tool_status = state['tool_status']
    if tool_status == " 成功 ":
    return "tool_result_processing_node" # 跳转到结果处理节点
    else:
    return "tool_error_handling_node" # 跳转到错误处理节点

    builder.add_conditional_edges("tool_selection_node", route_after_tool_selection)#工具选择节点→ 工具执行节点(条件边)
    builder.add_conditional_edges("tool_execution_node", route_after_tool_execution)#工具执行节点→ 结果/错误处理节点(条件边)
  • 条件边的引入极大地增强了 LangGraph 图结构的灵活性和动态性,使我们可以 构建出能够根据不同情况采取相应行为的 AI 智能体系统。 例如,可在对话流程中实现意图识别和分支处理;在工具调用场景中,基于执行结果选择后续流程;实现基于智能体自身状态的反思迭代过程。条件边是构建复杂自适应性 AI 智能体系统的关键机制。

  • 普通边和条件边各有适用场景:普通边适合构建线性静态流程;条件边适合构建分支动态流程。实际应用中通常需要将普通边和条件边结合使用,既有线性主干流程,也有动态分支流程,从而构建灵活可控的复杂 AI 智能体系统。边的合理设计是 LangGraph 流程编排的核心。

  • 通过状态、节点和边这三个核心原语的组合,LangGraph 提供了一种强大而灵活的图计算模型,以构建各种复杂的 AI 智能体系统。状态承载信息,节点执行计算, 边定义流程,三者协同构成了 LangGraph 智能体系统的骨架。

1.4 命令

  • 命令(Command)是 LangGraph 新增的强大工具,允许在单个节点中整合状态更新和流程控制逻辑。一般情况下,节点主要负责状态更新,边控制流程跳转。但在实际应用中,需要节点同时完成这两项功能。命令正是为此设计的,打破了节点和边的传统分工,赋予节点更强的流程控制能力。

  • 命令是作为节点的返回值的特殊对象,主要包含以下两个部分。

    • update:状态更新字典,功能与普通节点返回值相同。
    • goto:流程跳转字符串,用于指定下一步执行节点。值必须是图中已定义的节点名称。
  • 使用命令的优势在于将状态更新逻辑和流程控制逻辑紧密结合,使节点功能更内聚强大。在某些场景下,命令能简化图结构定义,提升代码的可读性和可维护性。

  • 使用命令的步骤如下。

    • 从 langgraph.types 模块导入 Command。
    • 节点函数不再直接返回状态更新字典,而是创建一个 Command 对象并返回。 在创建 Command 对象时,需要通过 update 参数指定状态更新字典,通过 goto 参数指定下一个节点的名称。
    • 为节点函数添加类型提示(Type Hint),使用 typing.Literal 指定 goto 参数可能跳转到的节点名称列表。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from langgraph.types import Command
    from typing import Literal


    def my_node(state) -> Command[Literal["node_B", "node_C"]]:
    """ 使用 Command 的节点函数示例 """
    # 节点计算逻辑
    next_node_name = decide_next_node(state) # 根据状态决定下一个节点
    return Command(
    update={"processed_data": result_data}, # 状态更新
    goto=next_node_name # 流程跳转指令
    )
    • 在该示例中,my_node 函数通过 decide_next_node 函数动态决定跳转到 node_ B 或 node_C,同时更新状态中的 processed_data 键。函数返回值类型提示−> Command[Literal[“node_B”,”node_C”]] 主要起到以下作用。
      • 提升代码可读性:清晰标明节点可能跳转到的目标节点,便于理解流程控制逻辑。
      • 实现图结构的可视化渲染:LangGraph 通过类型提示静态分析潜在执行路径,生成准确的可视化图表,而无须实际运行代码即可理解流程。
      • 辅助类型检查:配合 MyPy 等静态类型检查工具,在代码编写阶段提前发现类型错误,如无效节点名称,或者类型不匹配等问题。
  • 因此,在使用命令时,请务必添加正确的类型提示。这不仅是一种良好的 Python 编程习惯,更是流程可视化和正确执行的重要保障。命令和条件边都提供了流程的动态路由能力,在实际应用中,可以遵循以下选择原则。

    • 优先使用命令的场景:
      • 节点需要同时处理状态更新和流程跳转时;
      • 实现多智能体协作中的任务交接行为时;
      • 需要表达“处理—交接”的完整逻辑流程时。
    • 优先使用条件边的场景:
      • 流程跳转决策逻辑相对独立时;
      • 节点主要职责为状态更新时;
      • 需要保持“关注点分离”的设计原则时。
    • 必须使用命令的场景:
      • 子图节点需要跳转至父图节点时;
      • 实现跨图层控制流交接时。

2、流程控制:分支与并发

  • 在实际 AI 智能体中,线性流程有明显局限性,例如包括:
    • 无法实现条件判断和动态路径选择。
    • 无法实现并行任务执行。例如,在一个信息检索系统中,我们可能需要同 时从多个数据源 (如网页搜索、知识库、数据库) 并行检索信息,以提高检索效率 和覆盖面。线性流程无法满足这种并行需求。
    • 执行效率较低,计算资源利用率不足。
  • LangGraph 通过分支 (Branching) 和并发 (Concurrency)机制解决这些问题。 通过分支实现条件判断和动态路由;通过并发实现多任务并行执行,充分利用计算资源,提高系统性能。

2.1 并行分支:扇出与扇入

  • LangGraph 实现并行分支的核心机制是扇出(Fan-out)和扇入(Fan-in)。扇出指从一个节点同时触发多个下游节点,使流程并行地向多个方向分支;扇入指将多个并行分支汇聚到一个共同的下游节点,实现并行流程的同步和汇合。通过扇出和扇入的组合,我们可以构建出复杂的并行数据流图,实现高效的并发处理。

2.1.1 扇出

  • 实现扇出的最简单方式是为一个节点添加多个出边,将其同时连接到多个下游节点。当 LangGraph 执行到该节点时,会并发触发所有出边指向的下游节点,使流程从该节点并行地向多个分支扩散,形成扇出的效果,其流程如图所示。

    image-20251101135333269
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    from typing import Any

    class State(TypedDict):
    aggregate: Annotated[list, operator.add]

    builder = StateGraph(State)
    class ReturnNodeValue: # ReturnNodeValue 节点的定义
    def __init__(self, node_secret: str):
    self._value = node_secret

    def __call__(self, state: State) -> Any:
    print(f"Adding {self._value} to {state['aggregate']}")
    return {"aggregate": [self._value]}

    # 添加节点 A、 B、 C、 D
    builder.add_node("a", ReturnNodeValue("I'm A"))
    builder.add_node("b", ReturnNodeValue("I'm B"))
    builder.add_node("c", ReturnNodeValue("I'm C"))
    builder.add_node("d", ReturnNodeValue("I'm D"))

    # 定义流程:节点 A 扇出到节点 B 和 C,节点 B 和 C 扇入到节点 D
    builder.add_edge(START, "a")
    builder.add_edge("a", "b") # 节点 A 出边 1:指向节点 B
    builder.add_edge("a", "c") # 节点 A 出边 2:指向节点 B
    builder.add_edge("b", "d") # 节点 D 入边 1:来自节点 B
    builder.add_edge("c", "d") # 节点 D 入边 2:来自节点 C
    builder.add_edge("d", END) # END 节点
    graph = builder.compile()
    • 在这个例子中,节点 A 配置了两条出边,分别指向节点 B 和节点 C,实现了从 节点 A 到节点 B 、节点 C 的扇出。当 LangGraph 执行到节点 A 时,会并发启动节点 B 和 节点 C 的执行,而无须等待其中一个节点执行完成。注意,这里是并发执行, 并非真正的并行执行,具体差异请参见后续章节。

2.1.2 扇入

  • 扇入是指将多个并行分支汇聚到同一个下游节点的操作。实现扇入时,需为目标节点添加多条入边,使其能够接收多个上游节点的输入。在 LangGraph 中,当执行至扇入节点时,系统会等待所有上游节点完成执行后,才触发该扇入节点的运行, 以此实现并行流程的同步汇合。例如在上述示例中,节点 D 作为扇入节点,配置了 分别来自节点 B 和节点 C 的两条入边。只有当节点 B 与节点 C 均执行完毕后,节点 D 才会启动执行,从而确保并行分支的正确汇聚。

  • 通过扇出和扇入的灵活组合,我们可以构建出高效的并行数据流图。例如,在上述示例中,节点 A 的扇出并发执行节点 B 和 节点 C,然后通过节点 D 的扇入汇 聚,形成了并行数据流。假设节点 B 和 节点 C 分别需要 3 秒和 5 秒的执行时间,那 么串行执行,需要 8 秒;而并行执行仅需 5 秒,效率提升接近一倍。扇出和扇入是构建 LangGraph 并行流程的基本模式。

  • 需要注意的是,并行分支的状态管理会变得更加重要。如果多个并行执行的节点需要更新同一个状态键,就需要特别注意状态更新冲突问题。状态归约器机制可以帮助我们安全处理并发状态更新。

    1
    2
    3
    4
    5
    import operator
    from typing import Annotated

    class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    • 我们定义了状态结构体 State,并将 operator.add Reducer 应用 到 aggregate 状态键上。operator.add Reducer 的作用是将新的状态值追加到已有的状态值上(如果状态值是列表类型,则进行列表拼接)。当节点 B 和节点 C 并发更新 aggregate 状态键时,operator.add Reducer会确保每个节点的更新都被正确追加到 aggregate 列表中,避免相互覆盖或丢失,从而解决并行状态更新的冲突问题,保证状态数据的一致性和完整性。在构建 LangGraph 并行分支流程时,合理地使用状态归约器是保证状态管理正确性的关键。
  • 完整的并行分支流程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    import operator
    from typing import Annotated, Any
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph, START, END

    class State(TypedDict):
    aggregate: Annotated[list, operator.add]

    def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}

    def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}

    def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}

    def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}

    builder = StateGraph(State)
    builder.add_node(a)
    builder.add_node(b)
    builder.add_node(c)
    builder.add_node(d)
    builder.add_edge(START, "a")
    builder.add_edge("a", "b")
    builder.add_edge("a", "c")
    builder.add_edge("b", "d")
    builder.add_edge("c", "d")
    builder.add_edge("d", END)

    graph = builder.compile()
    graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
    # Adding "A" to []
    # Adding "B" to ['A']
    # Adding "C" to ['A']
    # Adding "D" to ['A', 'B', 'C']
    #
    # {'aggregate': ['A', 'B', 'C', 'D']}
    • 从执行结果可见,节点 A 通过扇出机制并发执行节点 B 和节点 C,然后节点 D 通过扇入机制将两个并行分支汇聚起来,形成完整的并行分支流程。
  • 若某分支包含多个步骤(如节点 B 后需执行节点 B_2),则该如何实现呢?

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    def b_2(state: State):
    print(f'Adding "B_2" to {state["aggregate"]}')
    return {"aggregate": ["B_2"]}

    builder = StateGraph(State)
    builder.add_node(a)
    builder.add_node(b)
    builder.add_node(b_2)
    builder.add_node(c)
    builder.add_node(d)
    builder.add_edge(START, "a")
    builder.add_edge("a", "b")
    builder.add_edge("a", "c")
    builder.add_edge("b", "b_2")
    builder.add_edge(["b_2", "c"], "d")
    builder.add_edge("d", END)
    graph = builder.compile()

    # Adding "A" to []
    # Adding "C" to ['A']
    # Adding "B" to ['A']
    # Adding "B_2" to ['A', 'B', 'C'] Adding "D" to ['A', 'B', 'C', 'B_2']
    #
    # {'aggregate': ['A', 'B', 'C', 'B_2', 'D']}
    • 我们可以使用 ass_edge([“b_2”, “c”], “d”) 来强制节点 D 仅在节点 B_2 和节点 C 都执行完成后才会执行。否则,若分别为节点 B_2 和节点 C 添加独立边,就会导致节点 D 执行两次:在节点 B_2 和节点 C 执行完成后都执行 (无论哪个节点先执行完成)。
  • 如果节点扇出不确定,那么可使用 add_conditional_edges 直接添加条件边。

2.2 并发而非并行

  • 在前面的讨论中,我们一直使用“并行分支”“并行执行”等术语来描述 LangGraph 的流程控制机制。需要明确的是,LangGraph 实现的是并发(Concurrency), 而并非真正的并行 (Parallelism)。这两个概念既有联系又有区别。

  • 并发和并行的主要区别如下。

  • 并行指同时执行多个独立的任务,通常需要多个物理计算资源 (例如,多 核 CPU、多台机器) 真正地同时运行不同的代码,以缩短总执行时间。例如,分布式计算框架 Apache Pregel 将任务分配到多台机器上并行执行。

  • 并发指在单计算资源 (例如,单核 CPU)上“看似同时”执行多个任务, 通过时间片轮转或异步 I/O 实现。宏观上,多个任务“同时”运行,微观上 CPU 仍串行执行。其目的是提高系统的响应性和资源利用率,但不一定能缩短总执行时间(甚 至可能因为任务切换的开销而略微增加)。

  • LangGraph 的并发模型基于 Superstep (超步)的概念构建,借鉴自 Google 的 分布式计算框架 Pregel,以及其他 BSP Bulk Synchronous Parallel(计算模型)。 Superstep 是图执行的基本迭代单元,在每个 Superstep 中,LangGraph 会尽可能地并发执行所有就绪节点 (例如,扇出的下游节点,或满足条件可以执行的节点),并将这些节点的状态更新暂存起来。当 Superstep 内的所有节点都执行完成后,会进行全局同步,状态更新并进入下一个 Superstep,直到满足终止条件 (例如,到达 END 节点,或达到迭代限制),如图所示。

    image-20251101141312393
  • 需要再次强调的是,LangGraph Superstep 实现的是并发而非并行。同一个 Superstep 内的节点在单线程、异步非阻塞的环境下并发执行,并非真正地并行运行在多核 CPU 或分布式集群上 (与 Apache Pregel 不同)。

2.3 递归限制与并行分支

  • 递归限制(Recursion Limit)用于限制 LangGraph 图执行过程中的最大 Superstep 的迭代次数,防止图无限循环执行,耗尽计算资源。递归限制同样适用于并行分支流程,并且需要特别注意以下几点。

    • 递归限制的计数单位是 Superstep,而不是节点。无论一个 Superstep 内部并发执行了多少个节点,都只会计为一次 Superstep 迭代。
    • 在扇出扇入流程中,并发执行的多个节点属于同一个 Superstep,整个流程的 Superstep 迭代次数由执行路径的深度决定,而非并行分支的宽度。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    import operator
    from typing import Annotated, Any
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph, START, END
    from langgraph.errors import GraphRecursionError # 导入 GraphRecursionError


    class State(TypedDict):
    # operator.add 是状态归约器,确保状态键 aggregate 为 appendonly 列表
    aggregate: Annotated[list, operator.add]


    def node_a(state):
    return {"aggregate": ["I'm A"]}


    def node_b(state):
    return {"aggregate": ["I'm B"]}


    def node_c(state):
    return {"aggregate": ["I'm C"]}


    def node_d(state):
    return {"aggregate": ["I'm D"]}


    builder = StateGraph(State)
    builder.add_node("a", node_a)
    builder.add_edge(START, "a")
    builder.add_node("b", node_b)
    builder.add_node("c", node_c)
    builder.add_node("d", node_d)
    builder.add_edge("a", "b")
    builder.add_edge("a", "c")
    builder.add_edge("b", "d")
    builder.add_edge("c", "d")
    builder.add_edge("d", END)
    graph = builder.compile()

    try:
    #设置recursion_limit=3,少于流程正常执行所需的Superstep数量
    graph.invoke({"aggregate": []}, {"recursion_limit": 3})
    except GraphRecursionError as e: # 捕获 GraphRecursionError 异常
    print(f"GraphRecursionError 异常被成功捕获 : {e}")

    # GraphRecursionError 异常被成功捕获 : Recursion limit of 3 reached without hitting a stop condition. You can increase the limit by setting the `recursion_limit` config key.
    # For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/GRAPH_RECURSION_LIMIT
    • 运行以上代码时,GraphRecursionError 异常会被成功捕获,并且输出相应的错误信息,这表明递归限制机制在并行分支流程中仍然有效,能够按照预期限制图执行的最大迭代次数。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      START → a → b → d → END

      c → d → END

      从 START 到 a(第 1 步);
      a 同时分支到 b 和 c(LangGraph 中并行分支会在同一个 “superstep” 中处理,计为 1 步);
      b 和 c 分别到 d(两个分支汇总到 d,计为 1 步);
      d 到 END(第 4 步)。
      因此,整个流程需要 4 个 supersteps 才能完成,超出限制的 3 步
  • 在设计包含并行分支的 LangGraph 流程时,需要根据流程的复杂度合理地设置 recursion_limit 参数。如果设置得过低,则可能导致流程在并行分支执行完成前就因为达到限制而中止,抛出 GraphRecursionError 异常。如果 recursion_limit 设置得过高, 则可能无法有效防止流程陷入无限循环,造成不必要的计算资源消耗。建议通过实验和调优,根据流程的实际迭代次数和复杂度选择一个合适的 recursion_limit 值。对于包含复杂并行分支、可能存在较深 Superstep 迭代的流程,建议适当提高 recursion_ limit 的取值,以确保流程正常执行。

3、MapReduce模式:任务分解与并行处理

  • 在构建复杂 AI 智能体系统时,经常需要处理大规模数据或执行计算密集型任务,如批量处理海量文档、并行生成创意文案、分布式分析用户行为数据等。传统的线性流程往往难以胜任此类任务,效率低下且扩展性差。MapReduce (映射—归约)模式作为经典并行计算模型,为这类问题提供了高效、可扩展的通用解决方案。 LangGraph 框架原生支持 MapReduce 模式,可轻松构建 MapReduce 分支,充分利用并行计算的优势,提高 AI 智能体系统的数据处理能力和性能。

3.1 MapReduce 模式的核心思想

  • MapReduce 模式的核心思想是“分而治之”,将复杂的大规模计算任务分解成两个相互协作的阶段。
    • Map(映射)阶段:“分”的过程。将原始输入数据分割成多个独立子数据集, 并将每个子数据集分配给不同的计算节点 (在 LangGraph 中,可以理解为不同的节点实例) 并行处理。每个计算节点执行相同的“映射”操作,生成中间结果 (通常是 Key-Value 键—值对形式)。Map 阶段的关键特点是“并行”和“独立”,每个子任务之间互不干扰,可以最大程度地利用并行计算资源。
    • Reduce(归约)阶段:“治”的过程。聚合 Map 阶段并行生成的多个中间结果,通过“归约”操作,合并成全局最终结果。Reduce 阶段通常是一个聚合、汇总、筛选、排序的过程,将 Map 阶段的“半成品” 组装成“成品”。Reduce 阶段的关键特点是“聚合”和“归纳”,将 Map 阶段并行处理的结果进行有效的整合和提炼,最终得到我们需要的答案或结果。
  • MapReduce 模式的优势如下所述。
    • 并行处理:分解任务并行执行,显著提高数据处理效率,缩短任务完成时间。
    • 高扩展性:通过增加计算节点来线性扩展计算能力。
    • 容错性:单节点故障不影响整体任务执行。
    • 简化编程模型:开发者只需要关注 Map 和 Reduce 两个阶段的业务逻辑。
  • MapReduce 模式非常适合可分解成独立子任务且最终结果可被聚合的场景。 LangGraph 对 MapReduce 模式的良好支持,使开发者可以充分利用这种强大模型构建高效、智能、可扩展的 AI 智能体系统。

3.2 LangGraph中的MapReduce实现

  • LangGraph 框架通过结合状态、节点、边及 Send API,实现了灵活高效的 MapReduce 模式。构建 MapReduce 流程主要包含以下关键步骤。

3.2.1 定义Map阶段的分割节点

  • 分割节点(Splitter Node)负责将原始输入数据分割成多个独立的、规模较小的子数据集,并动态地生成 Send 对象列表。每个 Send 对象对应一个子任务,并指定目标节点和初始状态。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    from langgraph.types import Send

    def split_input_data(state: OverallState): # 分割节点函数,输入状态为 OverallState
    input_data = state["large_input_data"] # 从状态中获取大规模输入数据
    sub_datasets = split_large_data(input_data, num_sub_tasks=10) # 将大规模数据分割成 10 个子数据集 (假设 split_large_data 函数实现了分割逻辑)
    send_list = []
    for sub_dataset in sub_datasets: # 遍历每个子数据集
    send_list.append(Send("map_node", {"sub_data": sub_dataset}) # 为每个子数据集创建一个 Send 对象,目标节点为 map_node,子任务状态为 {"sub_data": sub_dataset})
    return send_list # 返回 Send 对象列表,用于动态路由到多个 Map 节点实例
    • split_input_data 节点函数首先从状态中获取大规模的输入数据 large_input_data,然后调用 split_large_data 函数(需要开发者自行实现) 将大规模数据分割成多个子数据集 sub_datasets。接着,遍历 sub_datasets 列表,为每个 sub_dataset 创建一个 Send 对象。每个 Send 对象都指定了目标节点名称“map_node”(假设我们后续会定义一个名为 “map_node” 的映射节点),以及处理该子任务的初始状态 {‘’sub_data’’:sub_dataset} (将子数据集作为 sub_data 键的值传递给映射节点)。 最后,将 Send 对象列表 send_list 作为节点函数的返回值返回。分割节点不进行实际的 Map 计算,它的主要职责是“分发任务”,为后续的映射节点准备好子任务和任务指令。

3.2.2 定义Map阶段的映射节点

  • 映射节点(Map Node)负责接收分割节点分发的子任务,并对子任务数据执行实际的 “映射” 计算,生成中间结果,映射节点独立处理子任务,不依赖其他映射节点的执行结果,输出局部中间结果,用于后续 Reduce 阶段聚合。

    1
    2
    3
    4
    5
    6
    7
    class MapState(TypedDict): # 定义映射节点的私有状态结构体,用于接收分割节 点传递的子任务数据
    sub_data: Any # 子任务数据类型可以是任意类型

    def map_node(state: MapState): # 映射节点函数,输入状态为 MapState
    sub_data = state["sub_data"] # 从状态中获取子任务数据
    intermediate_result = process_sub_data(sub_data) # 处理子任务数据, 生成中间结果 (假设 process_sub_data 函数实现了映射计算逻辑)
    return {"intermediate_result":intermediate_result} # 返回中间结果, 用于后续 Reduce 阶段聚合
    • 我们定义了一个 MapState 状态结构体,用于定义映射节点的输入状态,其中包含了 sub_data 状态键,用于接收分割节点通过 Send 对象传递的子任务数据。map_node 函数接收 MapState 作为输入状态,从状态中获取子任务数据 sub_data, 然后调用 process_sub_data 函数(需要开发者自行实现) 对子任务数据执行实际的“映射”计算,生成中间结果 intermediate_result。最后,将中间结果封装到状态更新字典 {‘’intermediate_result’’:intermediate_result} 中返回。映射节点的核心职责是“数据映射”, 将输入的子数据集转换为中间结果,为后续的 Reduce 阶段准备好“原材料”。

3.2.3 定义Reduce阶段的归约节点

  • 归约节点(Reduce Node)负责收集 Map 阶段并行生成的多个中间结果,并将这些中间结果进行“归约”操作,合并成最终结果,通常是 MapReduce 流程的“终点”。

    1
    2
    3
    4
    5
    6
    7
    class ReduceState(TypedDict): # 定义归约节点的状态结构体,用于接收映射节点 输出的中间结果
    intermediate_results: Annotated[list, operator.add] # 使用 operator.add Reducer, 确保中间结果列表为 append-only 列表

    def reduce_node(state: ReduceState): # 归约节点函数,输入状态为 ReduceState
    intermediate_results = state["intermediate_results"] # 从状态中获 取 Map 阶段生成的中间结果列表
    final_result = aggregate_results(intermediate_results) # 聚合中间 结果,生成最终结果 (假设 aggregate_results 函数实现了归约计算逻辑)
    return{"final_result":final_result}# 返回最终结果
    • 我们定义了一个 ReduceState 状态结构体,用于定义归约节点 的输入状态,其中包含了 intermediate_results 状态键,用于收集来自多个映射节点实 例的中间结果。特别注意的是,我们为 intermediate_results 状态键指定了 operator.addReducer。这是非常重要的!因为映射节点是并行执行的,它们会并发地向 intermediate_ results 状态键写入中间结果,使用 operator.add Reducer 可以确保所有映射节点的中间结果都被正确地追加到 intermediate_results 列表中,而不会发生数据的覆盖或丢失,保证 了中间结果的完整性和一致性,为后续的 Reduce 计算提供可靠的数据基础。
    • reduce_node 函数接收 ReduceState 作为输入状态,从状态中获取中间结果列表 intermediate_results,然后调用 aggregate_results 函数(需要开发者自行实现) 对中间结果列表执行实际的“归约”计算,生成最终结果 final_result。最后,将最终结果封装到状态更新字典 {‘’final_result’’:final_result} 中返回。归约节点的核心职责是“结果 归约”,将 Map 阶段并行生成的分散的中间结果聚合,提炼成最终的、全局的、有意义的结果,完成 MapReduce 流程的“最后一步”。

3.2.4 连接MapReduce流程中的节点和边

  • 定义好 MapReduce 流程中的三个核心节点 (分割节点、映射节点、归约节点) 之后,我们需要使用 LangGraph 的边,将这些节点连接起来,构建完整的 MapReduce 流程图,如图所示。MapReduce 流程的边连接方式是相对固定的, 遵循“分割节点 → 映射节点(条件边,使用 Send API)→ 归约节点 → END”的模式。

    image-20251101144705472
    • 分割节点 → 映射节点 (条件边,使用 Send API):分割节点通过条件边 add_conditional_edges 连接到映射节点,条件边的路由函数就是分割节点本身 (例如,示例中的 split_input_data 函数)。分割节点生成的 Send 对象列表,会被 LangGraph 自动解析,动态地将每个 Send 对象路由到目标映射节点 “map_node” 的不同实例,并将 Send 对象中包含的状态数据作为映射节点的输入状态传递过去,实现Map任务的动态分发和并行执行 (扇出)。

      1
      builder.add_conditional_edges("split_node", split_input_data, ["map_node"])#分割节点→ 映射节点(条件边)
    • 映射节点 → 归约节点 (普通边):映射节点通过普通边 add_edge 连接到归约节点。当一个映射节点实例执行完成后,会将状态更新 (包含中间结果) 传递给归约节点。由于映射节点是并行执行的,归约节点会接收到来自多个映射节点实例的状态更新。为了正确地收集和处理这些并发的状态更新,归约节点的输入状态 Schema (如 ReduceState)必须被合理地定义,特别是用于收集中间结果的状态键(如 intermediate_results) 必须指定合适的归约函数(如 operator.add Reducer),确保中间结果被正确地聚合到 归约节点的状态中 (扇入)。

      1
      builder.add_edge("map_node", "reduce_node") # 映射节点 → 归约节点 ( 普通边)
    • 归约节点 → END (普通边):归约节点通过普通边 add_edge 连接 到 END 节点,标志着 MapReduce 流程的结束。当归约节点执行完成后,整 个 MapReduce 流程完成,最终结果 (通常存储在归约节点的状态中) 可以从 LangGraph 图的 invoke 或 stream 方法的返回值中获取。

  • 通过以上步骤即可完成基本的 LangGraph MapReduce 流程构建。在实际应用中, 可根据业务需求灵活地调整和扩展 MapReduce 流程,例如,增加 Map 阶段的节点数量和类型,在 Reduce 阶段之后添加后处理节点,引入错误处理和重试机制,甚至可以嵌套 MapReduce 流程,以构建更复杂、更强大的 AI 智能体系统。LangGraph 提供 的 MapReduce 模式为高效处理大规模数据和计算密集型任务,构建高性能、高扩展性的 AI 应用提供了有力支持。

3.3 MapReduce的应用场景

  • LangGraph 中 MapReduce 有以下典型应用场景。

3.3.1 文档批量处理与分析

  • 对于海量的文档数据 (例如,网页内容、新闻报道、研究论文、合同文本等), 可以使用 MapReduce 模式进行批量处理和分析,例如批量信息提取 (从大量文档中 并行提取关键信息,例如人名、地名、组织机构名等)、批量文本摘要 (并行生成大量文档的摘要,快速概括文档内容)、批量情感分析 (并行分析大量文本的情感 倾向,了解用户情感分布),等等。
  • Map 阶段负责处理单个文档或文档片段,提取局部特征,Reduce 阶段负责汇总所有文档的处理结果,生成全局性的分析报告或结论。
  • 示例:大规模合同文本的关键条款批量提取与风险评估、海量商品评论的情感倾向分析与用户画像构建。

3.3.2 信息并行检索与聚合

  • 在信息检索或问答系统中,用户提出的问题可能需要从多个不同的数据源(例如, 网页搜索、知识库、数据库、本地文档) 检索相关信息,为了提高检索效率和信息覆盖面,可以使用 MapReduce 模式进行并行检索与聚合。
  • Map 阶段负责从单个数据源并行检索相关信息,Reduce 阶段负责合并来自不同 数据源的检索结果,去重、排序、筛选,生成最终的检索结果。
  • 示例:构建多数据源的联合知识库问答系统,实现全网信息与本地知识的融合 检索与问答。

3.3.3 多路对话分支与意图路由

  • 在复杂对话系统中,用户的对话可能根据不同的意图或上下文,需要进入不同 的对话分支或技能执行模块。可以使用 MapReduce 模式实现多路对话分支与意图路由。
  • Map 阶段负责并行探索不同的对话分支 (例如,基于不同的意图理解假设,模拟不同的对话路径),Reduce 阶段负责评估不同对话分支的优劣,选择最佳的对话路径或回复策略。
  • 示例:构建具备多意图识别和多技能执行能力的复杂对话机器人,实现更灵活、 更智能的对话交互。

3.3.4 多智能体协作与任务分解

  • 在多智能体系统中,复杂的任务可以分解成多个子任务,分配给不同的智能体并行协作完成。可以使用 MapReduce 模式协调多智能体的协作。
  • Map 阶段负责将总任务分解成多个子任务,并将子任务分配给不同的智能体节点并行执行,Reduce 阶段负责收集和整合各智能体的执行结果,完成总体任务。
  • 示例:构建多人协作的 AI 团队,例如,AI 产品设计团队 (由设计师 Agent、工 程师 Agent、测试 Agent 并行协作完成产品设计任务)或 AI 研究团队 (由文献检索 Agent、实验分析 Agent、报告撰写 Agent 并行协作完成研究项目)。
  • LangGraph MapReduce 提供了强大的并行处理能力和任务分解机制,使 LangGraph 能够更高效地处理大规模数据、执行复杂计算任务、应对多样化的 AI 应用场景。掌握这一技术将有助于构建更具实用价值的 AI 智能体系统。

3.4 MapReduce的核心API:Send函数

  • Send 函数是 LangGraph 实现动态分支、并行处理,以及 MapReduce 模式的核心机制,负责“动态”“按需”“一对多”地创建分支任务,并将任务数据分发给不同的节点实例进行并行处理。

  • Send 函数的基本用法是 Send(node_name, state),解释如下。

    • node_name:字符串类型,指定接收任务的目标节点且必须是图中已经定义的节点名称。Send 函数会指示 LangGraph 引擎,将任务动态地路由到名为 node_name 的节点。在 MapReduce 模式中,node_name 通常指向映射节点。
    • state:字典类型,指定发送给目标节点 node_name 的状态数据(任务数据), 可以是任何符合 LangGraph 状态 Schema 的数据,也可以与 LangGraph 图的全局状态 Schema 完全不同,拥有独立的 Schema。Send 函数允许我们为每个 并行子任务定制化地传递不同的状态数据,使每个映射节点实例都可以处理不同的子任务数据。在 MapReduce 模式中,state 参数通常用于传递分割后的子数据集。
  • Send 函数通常应用于分割节点的条件边路由函数中,通过生成 Send 对象列表来描述并行子任务。

  • LangGraph 引擎会自动解析 Send 对象列表,并根据 Send 对象中指定的目标节点名称 node_name 和状态数据 state,动态地创建和分发并行子任务。Send 函数是 LangGraph 实现“动态扇出”功能的关键。

    1
    2
    3
    def continue_to_jokes(state: OverallState):  # 代码省略
    send_list = [Send("generate_joke", {"subject": s}) for s in state["subjects"]] # 为每个 subject 创建一个 Send 对象
    return send_list # 返回 Send 对象列表
    • continue_to_jokes 函数遍历state[‘’subjects’’] 列表中的每个 subject, 为每个 subject 创建对应的 Send 对象。每个 Send 对象都指定了目标节点 名称 “generate_joke”及相应的任务数据 {‘’subject’’:s},从而动态生成多个子任务, LangGraph 引擎会自动并行执行这些子任务,完成生成的 Map 阶段。
  • Send API 的特性如下所述。

    • 动态分支:支持运行时动态完成创建和分发并行子任务,分支数量可以根据输入数据或中间状态动态变化,无须预先在图结构中静态定义。
    • 状态定制:允许为每个子任务定制状态数据,支持独立的结构体设计。
    • 控制流与数据流解耦:通过分离任务生成(由分割节点负责)、任务处理(由 映射节点负责)和任务调度(由 LangGraph 引擎负责)的职责,实现更清晰的模块化设计。
  • 这些特性使 Send 函数成为实现 LangGraph MapReduce 模式,以及更复杂的动态 分支和并行处理的关键。掌握 Send API 的用法是深入理解 LangGraph 并发处理机制的“必修课”。

4、子图机制:模块化与复用设计

  • 在构建复杂 AI 智能体系统时,模块化(Modularity)和复用(Reuse)是关键技术。 LangGraph 通过子图 (Subgraph) 机制提供了强大的模块化和复用能力。

4.1 子图的概念与优势

  • 在 LangGraph 中,子图(Subgraph)指嵌套在父图(Parent Graph)内部的图结构, 如图所示。

    image-20251101150223067
  • 子图机制具有以下核心优势。

    • 模块化设计。将复杂系统分解为多个功能单一的子图模块,每个子图模块都专注于特定子任 务。父图负责协调各子图的执行流程,形成完整系统。这种设计有助于降低系统复杂度,提高可维护性。各子图可独立开发测试,提升开发效率。
    • 高复用性。子图作为独立模块可在不同父图中复用。通用功能模块(如文档摘要、信息检 索等)只需开发一次,即可在不同应用中重复使用,显著提升代码复用率和开发效率。
    • 状态隔离。子图拥有独立的状态空间,与父图的状态相互隔离。子图节点只能访问自身状态, 确保模块间的独立性,提高系统安全性和健壮性。
    • 命名空间管理。采用独立的子图名称,有效避免节点命名冲突,支持构建更复杂的图结构。
    • 复杂性控制。
      • 通过层级嵌套的子图设计,将复杂系统分解为多个结构清晰的子系统,使每个 模块的复杂度保持在可控范围内,降低整体系统的开发维护难度。
      • 该机制将软件工程的模块化设计思想引入 AI 智能体开发,为构建复杂系统提供 了有效的方法论支持。

4.2 在LangGraph中定义和使用子图

  • 在 LangGraph 中定义和使用子图主要有两种方式。

4.2.1 将已编译的子图作为节点添加到父图中

  • 这是最基础直接的子图调用方式。当父图与子图需要共享状态键(如“messages” 对话历史状态键),且无须状态转换时,可将已编译的子图(Compiled Graph)作为 特殊“节点”直接嵌入父图。在此模式下,LangGraph 会自动处理父图与子图间通过共享状态键进行的数据传递与状态更新。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 天气查询子图subgraph的定义和编译代码,此处省略
    # 定义父图
    class ParentState(TypedDict): # 父图的状态结构体
    messages: list # 与子图共享 messages 状态键

    builder = StateGraph(ParentState) # 创建父图 StateGraph, 指定状态结构体为 ParentState
    # 父图的其他节点定义,例如 node_1、node_2,此处省略
    builder.add_node("weather_graph", subgraph)

    # 将已编译的子图 subgraph 作为节点添加到父图中,节点名为 weather_graph
    builder.add_edge("node_1", "weather_graph") # 父图节点 node_1 → 子图节 点 weather_graph ( 普通边 )
    graph = builder.compile() # 编译父图
    • 在这个例子中,我们首先定义并编译了“天气查询子图”subgraph,然后在父图中通过 builder.add_node(‘’weather_graph’’, subgraph) 将其添加为节点,命名为“weather_ graph”。父图通过 builder.add_edge(‘’node_1’’, ‘’weather_graph’’) 添加边,将父图的节 点“node_1”连接到“weather_graph”子图节点,实现了父图节点调用子图的功能, 父图和子图通过共享的 messages 状态键进行对话消息的传递和交换。这种方式适用于父图和子图之间状态结构体兼容且需要共享状态键的场景,如多智能体系统中共享对话历史的场景。

4.2.2 使用节点函数调用子图并进行状态转换

  • 当父图和子图的状态结构体可能完全不同,没有共享的状态键时,父图不能直接将已编译的子图作为节点添加,而需要创建一个节点函数(Node Function)作为中介,在该节点函数内部“手动”调用子图,并在调用子图之前,将父图状态转换为子图所需的状态格式 (输入状态转换);在子图执行完成后将其输出状态转换回父图状态格式 (输出状态转换)。通过节点函数实现父图和子图之间状态的“翻译”和“适配”,使状态结构体完全不同的父图和子图也能够协同工作。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # 子图 child_graph 的定义和编译代码,此处省略
    # 定义父图
    class ParentState(TypedDict): # 父图的状态结构体
    my_key: str # Parent state key: my_key

    def call_child_graph(state: ParentState) -> ParentState: # 节点函数 call_child_graph, 输入和输出状态类型都为ParentState
    # 状态转换 : 父图状态 → 子图状态 (ParentState.my_key → ChildState. my_child_key)
    child_graph_input = {"my_child_key": state["my_key"]} # 将父图状 态的 my_key 值,赋给子图状态的 my_child_key 键,作为子图的输入状态

    # 调用子图
    child_graph_output = child_graph.invoke(child_graph_input) # 调 用子图 child_graph,输入状态为 child_graph_input
    # 状态转换 : 子图状态 → 父图状态 (ChildState.my_child_key → ParentState.my_key)
    return {"my_key": child_graph_output["my_child_key"]} # 将子图输 出状态的 my_child_key 值,赋给父图状态的 my_key 键,作为父图的状态更新

    builder = StateGraph(ParentState) # 创建父图 StateGraph,指定状态结构体 为 ParentState
    # 父图的其他节点定义,例如 parent_1、parent_2,此处省略
    builder.add_node("child", call_child_graph) # 将节点函数 call_child_graph 作为节点添加到父图,节点名为 child
    builder.add_edge("parent_1", "child") # 父图节点 parent_1 → 节点函数 call_child_graph ( 普通边 )
    # 其他边定义
    graph = builder.compile() # 编译父图
    • 在这个例子中,父图 ParentState 和子图 ChildState 的状态结构体完全不同,没有共享的状态键。父图通过定义一个节点函数 call_child_graph 来调用子图 child_ graph。在 call_child_graph 函数内部,首先将父图的状态 ParentState 转换为子图 child_graph 能够接受的输入状态 ChildState(通过状态转换代码 child_graph_input = {“my_child_key”: state[“my_key”]} 实现),然后调用子图 child_graph.invoke(child_ graph_input) 执行子图,获取子图的输出状态 child_graph_output,最后再将子图的输 出状态 child_graph_output 转换回父图能够理解的状态格式 ParentState(通过状态转 换代码 return {“my_key”: child_graph_output[“my_child_key”]} 实现),并作为节点函数的返回值返回。
    • 这种方式更加灵活,可以支持父图和子图之间使用完全不同的状态结构体和状态空间,实现更复杂、更模块化的系统设计。例如,在多智能体 RAG 系统中,父图 (例如 Supervisor Agent)可能只需要关注最终的 RAG 报告,而子图(例如 ReAct Agent)可能需要维护详细的对话历史和工具调用记录,父图和子图的状态结构体可以完全不同,这时就需要使用“节点函数 + 状态转换”的方式来调用子图。

4.2.3 总结

  • 选择哪种方式添加和使用子图,主要取决于父图和子图之间状态关系和数据交互需求。
    • 如果父图和子图需要共享状态键(例如,对话历史 messages 等),并且状态结构体兼容,那么优先选择“将已编译的子图作为节点添加到父图”的方式,代码更简洁,效率更高。
    • 如果父图和子图之间没有共享状态键,状态结构体完全不同,或者需要在父图和子图之间进行复杂的状态转换,则必须使用“使用节点函数调用子图, 并进行状态转换”的方式,虽然代码相对复杂一些,但灵活性更高,可以支持更复杂的系统设计和模块化需求。
  • 在实际应用中,可以根据业务需求灵活地选择合适的子图使用方式。对于简单的、 状态共享的子图场景,优先选择第一种方式以简化代码;对于复杂的、状态隔离或需要状态转换的子图场景,则选择第二种方式以获得更大的灵活性和可定制性。掌握这两种子图使用方式,将有助于我们更有效地利用 LangGraph 的子图机制,构建出模块化、可复用、易于维护和扩展的复杂 AI 智能体系统。

5、工具调用:扩展智能体的能力边界

  • 在构建智能体系统的过程中,智能体通过工具调用 (Tool Calling)扩展自身的能力边界,完成更复杂、更实用的任务,例如信息检索、数据分析、代码执行,以及与外部世界的交互等。LangGraph 预置了 ToolNode 组件,极大地简化了工具集成和 调用的流程。

5.1 ToolNode:LangGraph的工具调用中心

  • ToolNode 是 LangGraph 框架预置的一个核心组件,专门用于处理工具调用操作。 作为 LangGraph 的工具调用中心,ToolNode 接收包含消息列表的状态作为输入,输出包含工具调用结果的状态更新。ToolNode 可以无缝地与支持工具调用的 LLM (例 如,OpenAI 的 GPT-4o、 Anthropic 的 Claude 3.5 Sonnet 等) 集成,接收 LLM 在对话过程中生成的工具调用请求。
  • ToolNode 内部封装了工具的执行逻辑,可以根据 LLM 的工具调用请求,动态地调度和调用预先注册的工具,并将工具的执行结果返回。ToolNode 支持同步和异步工具调用,并自动处理工具执行过程中的错误 (例如,工具不存在、参数校验失败、 工具运行时异常)。
  • ToolNode 会将工具的执行结果封装成 ToolMessage 类型的消息,并将其添加到图的状态 (消息历史列表) 中,作为状态更新输出。
  • 该组件有以下优势。
    • 简化工具集成:自动处理工具的注册、调度、执行和结果返回等。
    • 原生支持 LangChain 工具:可以直接使用 @tool 装饰器定义的工具,也支持 RunnableTool 等更灵活的工具形式,充分利用 LangChain 生态系统中丰富的工具资源。
    • 内置错误处理机制:自动捕获工具执行异常,并将错误信息以 ToolMessage 的形式返回给模型,使模型能够感知到工具调用失败,并进行自我修正和重试,从而提高 AI 智能体系统的鲁棒性和可靠性 (错误处理机制可以灵活配置和自定义)。
    • 兼容 ReAct 设计模式:可以与 create_react_agent 等函数配合使用。
  • 总之,ToolNode 极大简化了工具调用的过程,是构建具备工具调用能力 AI 智能体系统的首选组件。

5.2 定义工具:使用@tool装饰器

  • 要使用 ToolNode 进行工具调用,首先需要定义可供 LangGraph 智能体调用的工具函数。LangGraph 推荐使用 LangChain 的 @tool 装饰器来定义工具函数。@tool 装饰器可以将普通的 Python 函数快速地转换成 LangChain Tool,并自动生成工具描述信息和参数 Schema,便于 LLM 理解和调用工具。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from langchain_core.tools import tool


    @tool # 使用 @tool 装饰器,将 Python 函数转换为 LangChain Tool
    def get_weather(location: str): # 定义工具函数 get_weather,location 参 数用于接收城市名称
    """ 获取当前天气 """
    if location.lower() in ["sf", "san Francisco"]: # 工具函数的 docstring 会被作为工具的描述信息,提供给 LLM
    return "It's 16 degrees and foggy."
    else:
    return "It's 32 degrees and sunny."


    @tool
    def get_coolest_cities():
    """ 获取天气最冷的城市名 """
    return "NYC, SF"
    • 我们使用了 @tool 装饰器定义了 get_weather 和 get_coolest_ cities 两个工具函数。get_weather 工具函数接收 location 参数 (城市名称)查询指定城市的天气信息;get_coolest_cities 工具函数用于获取“最酷城市”列表。工具函数 的 docstring 会被 @tool 装饰器自动提取作为工具的描述信息,帮助 LLM 理解工具功能和使用方法。因此,清晰、详细的 docstring 有助于 LLM 正确调用工具。
  • 定义工具有以下关键要素。

    • @tool 装饰器:必须使用 @tool 装饰器将 Python 函数转换为 LangChain Tool 工具。
    • 清晰详细的 docstring:尽可能使用自然语言清晰详细地描述工具的功能、输入参数和输出结果,以便 LLM 更好地理解。
    • 类型提示:建议为工具函数的参数和返回值添加明确的类型提示(Type Hints),例如,location: str、year: int、→ str、→ list[str] 等。
    • 合理的工具名称:可通过 @tool(“自定义工具名称”) 显式指定,应简洁明确,具有描述性,例如,“get_weather_information” “search_wikipedia” 等。
  • 定义好工具函数后,只有将其注册到 ToolNode 组件中才能在 LangGraph 流程中使用。

5.3 手动调用ToolNode

  • 在将 ToolNode 组件集成到 LangGraph 图之前,先学习其手动调用方式,以便更好地理解其工作原理和使用方法。ToolNode 作为一个 LangChain Runnable 对象,可以通过 invoke 方法进行手动调用。该方法接收必须包含 messages 键的字典作为输入。 messages 键值为消息列表,列表中的最后一个消息必须是工具调用请求的 AIMessage 类型。ToolNode 会解析 AIMessage 中的 tool_calls 信息,调度执行相应的工具,并将执行结果封装成 ToolMessage 类型的消息返回。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    from langchain_core.messages import AIMessage
    from langgraph.prebuilt import ToolNode

    # 创建 ToolNode 实例,注册工具列表 ( 包含 get_weather 和 get_coolest_ cities 两个工具)
    tools = [get_weather, get_coolest_cities]
    tool_node = ToolNode(tools)
    # 构造包含单个工具调用请求的 AIMessage
    message_with_single_tool_call = AIMessage( # 创建 AIMessage
    content="", # content 为空字符串,表示该消息主要用于工具调用,不包含文本内容
    tool_calls=[ # tool_calls 参数,包含工具调用请求列表
    {
    "name": "get_weather", # 工具名称,必须与注册的工具名称一致
    "args": {"location": "sf"}, # 工具参数,必须与工具函数定义的参数匹配
    "id": "tool_call_id", # 工具调用 ID,用于唯一标识工具调用,可以自定义
    "type": "tool_call", # 消息类型,固定为 tool_call
    }
    ],
    )
    # 手动调用 ToolNode,输入为包含 AIMessage 的状态字典
    tool_node_output = tool_node.invoke({"messages": [message_with_single_tool_call]})
    print(tool_node_output) # 打印 ToolNode 的输出结果
    # {'messages': [ToolMessage(content="It's 16 degrees and foggy.", name='get_weather', tool_call_id='tool_call_id')]}
    • 我们首先创建 ToolNode 实例 tool_node,并将已定义的工具列 表 tools(包含 get_weather 和 get_coolest_cities 两个工具)注册到 ToolNode 中;然后, 构造了一个 AIMessage 类型的消息 message_with_single_tool_call,其 tool_calls 参数包含了一个调用 get_weather 工具的请求,并传递参数 {‘’location’’:’’sf’’};最后调用 tool_node.invoke() 方法,将包含 AIMessage 的状态字典 {‘’messages’’: [message_with_ single_tool_call]} 作为输入传递给 ToolNode 执行工具调用。
    • ToolNode 的 invoke 方法返回一个包含了状态更新的字典 {‘messages’:[…]},其中包含了 ToolMessage 类型的消息,该消息记录了工具的执行结果:“It’s 16 degrees and foggy.”、工具名称 name=’get_weather’,以及工具调用 ID tool_call_id=’tool_call_ id’。ToolNode 的输出结果可以直接作为下一步 LLM 的输入,实现基于工具执行结果的迭代对话。
  • ToolNode 也支持并行工具调用。当 AIMessage 的 tool_calls 参数包含多个工具调用请求时,ToolNode 会并发执行这些工具,并将多个执行结果封装为多个 ToolMessage 类型消息返回。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    from langchain_core.messages import AIMessage
    from langgraph.prebuilt import ToolNode

    # 创建 ToolNode 实例,注册工具列表 ( 包含 get_weather 和 get_coolest_ cities 两个工具)
    tools = [get_weather, get_coolest_cities]
    tool_node = ToolNode(tools)

    # 构造包含多个工具调用请求的AIMessage
    message_with_multiple_tool_calls = AIMessage( # 创建 AIMessage
    content="",
    tool_calls=[ # tool_calls 参数,包含多个工具调用请求
    {
    "name": "get_coolest_cities", # 工具 1:get_coolest_cities
    "args": {},
    "id": "tool_call_id_1",
    "type": "tool_call",
    },
    {
    "name": "get_weather", # 工具 2:get_weather
    "args": {"location": "sf"},
    "id": "tool_call_id_2",
    "type": "tool_call",
    }
    ]
    )

    # 手动调用ToolNode,输入为包含AIMessage的状态字典
    tool_node_output = tool_node.invoke({"messages": [message_with_multiple_tool_calls]})
    print(tool_node_output) # 打印 ToolNode 的输出结果
    # {'messages': [ToolMessage(content='NYC, SF', name='get_coolest_cities', tool_call_id='tool_call_id_1'), ToolMessage(content="It's 16 degrees and foggy.", name='get_weather', tool_call_id='tool_call_id_2')]}
    • message_with_multiple_tool_calls 消息的 tool_calls 参数包含了两个工具调用请求:分别调用 get_coolest_cities 和 get_weather 工具。ToolNode 的 invoke 方法会并发地执行这两个工具,并将执行结果分别封装为两个 ToolMessage 类型的消息返回。输出结果的消息列表包含这两个 ToolMessage ,分别对应两个工具的执行结果。通过手动调用 ToolNode,我们可以直观地理解和验证其工具调用和结果返回机制,为后续在 LangGraph 图中集成 ToolNode 组件打下基础。

5.4 在LangGraph图中使用ToolNode

  • 理解了 ToolNode 的工作原理和手动调用方法后,即可将其集成到 LangGraph 图中,构建具备工具调用能力的 ReAct 智能体。ToolNode 通常与 LLM 节点和条件路由节点配合使用,构成 ReAct 智能体的基本运行流程。

    • LLM 节点 (Agent Node):负责接收用户输入和对话历史,调用 LLM 生成智能体行为。如果 LLM 决定调用工具,则会返回包含 tool_calls 的 AIMessage,指示需要调用哪些工具,以及工具的参数;如果 LLM 决定直接回复用户,则会返回不包含 tool_calls 的 AIMessage,指示对话结束,或等待用户进一步输入。
    • ToolNode (工具节点):负责接收来自 LLM 节点的工具调用请求,调度和执行相应的工具,并将执行结果封装为 ToolMessage 返回。
    • 条件路由节点 (Conditional Edge):负责根据 LLM 节点的输出结果,动态地决定流程走向。若返回了 tool_calls (指示需要调用工具),则路由到 ToolNode 节点执行工具调用,否则将路由到 END 节点结束对话流程,或等待用户进一步输入。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    from langgraph.graph import StateGraph, MessagesState, START, END  # 导入 MessageState
    from langgraph.prebuilt import ToolNode # 导入 ToolNode
    from langchain_openai import ChatOpenAI

    # get_weather, get_coolest_cities 工具函数的定义,此处省略

    tools = [get_weather, get_coolest_cities] # 工具列表
    tool_node = ToolNode(tools) # 创建 ToolNode 实例,注册工具列表
    model_with_tools = ChatOpenAI(model="gpt-4-turbo", temperature=0).bind_tools(tools) # 绑定工具列表到 LLM


    def should_continue(state: MessagesState): # 条件路由函数,判断是否继续工 具调用
    messages = state["messages"]
    last_message = messages[-1] # 获取最后一条消息 (LLM 的输出 )
    if last_message.tool_calls: # 判断最后一个消息是否包含 tool_calls( 工具调用请求 )
    return "tools" # 如果包含 tool_calls,则路由到 tools 节点(ToolNode),执行工具调用
    return END # 如果不包含 tool_calls,则路由到 END 节点,结束流程


    def call_model(state: MessagesState): #LLM 模型节点函数
    messages = state["messages"]
    response = model_with_tools.invoke(messages) # 调用 LLM,生成 AI消息 (可能包含 tool_calls)
    return {"messages": [response]} # 返回包含 AI 消息的状态更新


    workflow = StateGraph(MessagesState) # 创建 StateGraph 实例,状态 Schema 为 MessageState
    workflow.add_node("agent", call_model) # 添加 LLM 节点,节点名为 agent
    workflow.add_node("tools", tool_node) # 添加 ToolNode 节点,节点名为 tools
    workflow.add_edge(START, "agent") # 定义从 START 节点到 agent 节点的边 ( 流 程入口 )
    workflow.add_conditional_edges("agent", should_continue, ["tools", END]) # 定义条件边,根据 agent 节点的输出,动态路由到 tools 节点或 END 节点
    workflow.add_edge("tools", "agent") # 定义从 tools 节点到 agent 节点的边 (ReAct 循环 )
    app = workflow.compile() # 编译 LangGraph 图
    • 我们创建了一个实用的 StateGraph 工作流实例。我们定义了 agent (LLM 节点) 和 tools(ToolNode 节点)两个核心节点,以及一个条件路由函数 should_continue。该函数根据 LLM 节点的输出动态地控制流程的走向,在工具调用(路由到 tools 节点)和结束对话(路由到 END 节点)之间进行选择。通过边连接这些节点,构建了基本的 ReAct 循环。最后,调用 workflow.compile() 方法编译 LangGraph 图,完成 ReAct 循环构建。
    • 此示例展示了使用 ToolNode 构建的 ReAct 智能体能够成功地接收用户输入,调用 get_weather 工具查询天气信息,并基于工具的执行结果生成最终的回复,完成基于工具调用的 ReAct 循环。ToolNode 组件作为工具调度和执行的核心模块,简化了工具集成和 ReAct 智能体构建流程,使开发者能更专注于智能体核心逻辑和业务功能的开发。

5.5 处理工具调用错误

  • ToolNode 组件虽然内置了基础的错误处理机制,能够捕获工具执行的异常,并返回给 LLM,但在某些场景下仍显不足。LLM 本身并不擅长理解和处理工具调用错误,可能会重复犯同样的错误 (例如,使用错误的工具参数,调用不存在的工具), 或流程阻塞。为了构建健壮的 AI 智能体系统,需要实现更精细的定制化工具调用,错误处理策略主要如下所示。

    • 自定义错误处理节点:当 ToolNode 节点检测到工具调用错误时,可将流程路由到错误处理节点。该节点分析错误原因并采取补救措施,如清理错误信息、修改用户输入、更换模型、重试工具调用或降级处理等。例如,创建“工具参数修正节点”,参数校验错误时自动指导 LLM 修正参数,并重试。
    • 裁剪失败的尝试:对不影响流程核心逻辑的工具调用错误,可以选择“裁剪” 或“忽略”,创建专门用于移除失败的工具调用的节点,保持消息历史的简洁,减少模型上下文负担。
    • 模型降级与重试:当错误与当前 LLM 的工具调用能力不足有关 (例如, 模型无法正确理解工具的描述信息或参数 Schema,导致无法生成符合要求的工具参数)时,可以切换到能力更强、更擅长工具调用的 LLM 进行重试。例如,设置 fallback_agent 节点,当工具调用失败时,使用性能更强的模型重试工具调用。
    • 人机环路机制:对复杂或者需人工判断的错误 (例如,用户权限不足、工具 API 调用配额超限、工具返回结果不符合预期等),可引入人机环路机制,将错误交由人工处理。例如,人工修正工具参数,选择其他工具,调整流程策略或直接回复用户。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    import json
    from typing import Literal
    from langchain_core.messages import AIMessage, ToolMessage
    from langchain_core.messages.modifier import RemoveMessage
    from langgraph.graph import MessagesState, StateGraph, END, START
    from langchain_core.tools import tool
    from langchain_openai import ChatOpenAI
    from langchain_core.output_parsers import StrOutputParser
    from pydantic import BaseModel, Field

    class HaikuRequest(BaseModel):
    topic: list[str] = Field(
    max_items=3,
    min_items=3,
    )

    @tool
    def master_haiku_generator(request: HaikuRequest):
    """Generates a haiku based on the provided topics."""
    model = ChatOpenAI(model="Qwen/Qwen2-1.5B-Instruct", temperature=0)
    chain = model | StrOutputParser()
    topics = ", ".join(request.topic)
    haiku = chain.invoke(f"Write a haiku about {topics}")
    return haiku

    def call_tool(state: MessagesState):
    # 创建工具名称到工具函数的映射字典
    tools_by_name = {master_haiku_generator.name:master_haiku_generator}
    messages = state["messages"]
    last_message = messages[-1] # 获取最后一条消息
    output_messages = []
    # 遍历最后一条消息中的所有工具调用
    for tool_call in last_message.tool_calls:
    try:
    # 根据工具名称找到对应的工具函数并调用,传入参数
    tool_result = tools_by_name[tool_call["name"]].invoke(tool_call["args"])
    # 将工具调用结果封装为ToolMessage添加到输出消息列表
    output_messages.append(
    ToolMessage(
    content=json.dumps(tool_result),
    name=tool_call["name"],
    tool_call_id=tool_call["id"],
    )
    )
    except Exception as e:
    # 若工具调用失败,则捕获异常并返回错误信息
    # 将错误信息封装为ToolMessage,并在additional_kwargs中标记
    output_messages.append(
    ToolMessage(
    content=str(e),
    name=tool_call["name"],
    tool_call_id=tool_call["id"],
    additional_kwargs={"error": e}, # 在额外参数中存储错误对象,用于后续错误处理
    )
    )
    return {"messages": output_messages}

    # 初始化基础模型(性能较弱的模型)
    model = ChatOpenAI(model="Qwen/Qwen2-1.5B-Instruct", temperature=0)
    model_with_tools = model.bind_tools([master_haiku_generator])

    # 初始化更强大的模型(用于降级策略)
    better_model = ChatOpenAI(model="Qwen/Qwen2.5-7B-Instruct", temperature=0)
    better_model_with_tools = better_model.bind_tools([master_haiku_generator])

    def should_continue(state: MessagesState):
    # 决定是否继续工具调用循环或结束流程
    messages = state["messages"]
    last_message = messages[-1]
    if last_message.tool_calls:
    # 若最后一条消息包含工具调用请求
    return "tools" # 则继续执行工具调用
    return END # 否则结束流程

    def should_fallback( state: MessagesState,)-> Literal["agent", "remove_failed_tool_call_attempt"]:
    # 决定是否需要转到更强大的模型
    messages = state["messages"]
    # 查找是否有失败的工具调用消息(通过 additional_kwargs 中的 error 标记识别)
    failed_tool_messages = [
    msg
    for msg in messages
    if isinstance(msg, ToolMessage)
    and msg.additional_kwargs.get("error") is not None
    ]
    if failed_tool_messages: # 若存在失败的工具调用
    return "remove_failed_tool_call_attempt" # 则路由到移除失败尝试的节点
    return "agent" # 否则继续使用当前模型

    def call_model(state: MessagesState):
    # 使用基础模型处理消息
    messages = state["messages"]
    response = model_with_tools.invoke(messages)
    return {"messages": [response]}

    def remove_failed_tool_call_attempt(state: MessagesState):
    # 移除失败的工具调用尝试,清理消息历史
    messages = state["messages"]
    # 从后向前查找最近的AI消息索引
    last_ai_message_index = next(
    i
    for i, msg in reversed(list(enumerate(messages)))
    if isinstance(msg, AIMessage)
    )
    # 获取需要移除的消息(从最近的AI消息开始的所有消息)
    messages_to_remove = messages[last_ai_message_index:]
    # 返回移除指令,通过RemoveMessage标记需要移除的消息
    return {"messages": [RemoveMessage(id=m.id) for m in messages_to_remove]}

    # 降级策略:使用更强大的模型重试
    def call_fallback_model(state: MessagesState):
    # 使用更强大的模型处理消息
    messages = state["messages"]
    response = better_model_with_tools.invoke(messages)
    return {"messages": [response]}

    # 创建状态图
    workflow = StateGraph(MessagesState)
    # 添加节点
    workflow.add_node("agent", call_model) # 基础模型节点
    workflow.add_node("tools", call_tool) # 工具调用节点
    workflow.add_node("remove_failed_tool_call_attempt", remove_failed_tool_call_attempt) # 清理失败尝试节点
    workflow.add_node("fallback_agent", call_fallback_model) # 降级模型节点
    # 添加边和条件边
    workflow.add_edge(START,"agent") # 流程从agent节点开始
    workflow.add_conditional_edges("agent", should_continue, ["tools", END]) # 根据should_continue函数决定是继续工具调用还是结束
    # 根据工具调用结果决定是继续使用当前模型还是清理失败尝试
    workflow.add_conditional_edges("tools", should_fallback, path_map = {"agent": "agent", "remove_failed_tool_call_attempt": "remove_failed_tool_call_attempt"})
    workflow.add_edge("remove_failed_tool_call_attempt", "fallback_agent") # 清理失败尝试后使用降级模型
    workflow.add_edge("fallback_agent", "tools") # 降级模型生成的工具调用请求继续由 tools 节点处理
    app = workflow.compile()
  • 通过组合使用自定义错误处理节点、裁剪失败尝试、模型降级重试等策略,构建更智能、更健壮的工具调用错误处理机制,提高 AI 智能体系统应对工具调用错误的能力,同时提升用户体验和系统可靠性。在实际应用中,开发者可以根据具体业务需求灵活地选择和组合使用各种错误处理策略,构建合适的 AI 智能体系统。

5.6 从工具中更新图状态

  • 在某些应用场景下,我们不仅需要工具返回结果,还希望工具能够直接更新 LangGraph 图的状态。例如,在客户支持系统中,对话的初始阶段可能需要调用“用户信息查询工具”查询用户的基本信息 (例如,用户 ID、用户名、会员等级、历史订单等),并将这些信息保存到图状态,供后续的对话流程使用,从而实现对话的个性化和上下文感知。要实现这一功能,我们需要让工具函数返回 Command 对象,并在 Command 对象中指定需要更新的状态键值对。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    from typing_extensions import Annotated, Any
    from langchain_core.tools import tool, InjectedToolCallId
    from langchain_core.runnables.config import RunnableConfig
    from langchain_core.messages import ToolMessage
    from langgraph.types import Command
    from langgraph.graph import StateGraph, START, END
    from langchain.agents import AgentState
    from langgraph.prebuilt import ToolNode

    USER_INFO = [ # 用户信息列表 ( 示例数据 )
    {"user_id": "1", "name": "Bob Dylan", "location": "New York,NY"},
    {"user_id": "2", "name": "Taylor Swift", "location": "Beverly Hills, CA"},
    ]

    USER_ID_TO_USER_INFO = {info["user_id"]: info for info in USER_INFO} # 用户ID→用户信息字典


    class State(AgentState): # 定义图状态结构体,继承自 AgentState,并添加 user_info 状态键
    user_info: dict[str, Any]


    @tool
    def lookup_user_info( # 定义工具函数 lookup_user_info
    tool_call_id: Annotated[str, InjectedToolCallId],
    config: RunnableConfig # config 参数,使用 RunnableConfig 类型提示,接收运行时配置信息
    ):
    """Use this to look up user information to better assist them with their questions.""" # 工具描述信息
    user_id = config.get("configurable", {}).get("user_id") # 从 config 参数中获取运行时参数 user_id
    if user_id is None:
    raise ValueError("Please provide user ID")
    if user_id not in USER_ID_TO_USER_INFO:
    raise ValueError(f"User '{user_id}' not found")
    user_info = USER_ID_TO_USER_INFO[user_id] # 根据 user_id 查询用户信息
    return Command( # 工具函数返回 Command 对象
    update={ # Command 对象包含状态更新指令用户信息
    "user_info": user_info, # 更新 user_info 状态键,值为查询到的
    "messages": [ # 更新 messages 状态键,添加 ToolMessage
    ToolMessage(
    "Successfully looked up user information", tool_call_id=tool_call_id
    )
    ],
    }
    )


    # 初始化状态图
    graph = StateGraph(State)


    # 定义节点
    def agent_node(state: State):
    """ 智能体节点,处理用户请求 """
    messages = state["messages"]
    user_info = state.get("user_info", {})
    # 如果有用户信息,则将其添加到系统消息中
    if user_info:
    system_message = f"You are assisting {user_info['name']} who lives in {user_info['location']}."
    else:
    system_message = "You are a helpful assistant."
    # 调用模型处理请求
    model = ChatOpenAI(model="Qwerty/Qwen2.5-7B-Instruct", temperature=0)
    model_with_tools = model.bind_tools([lookup_user_info])
    response = model_with_tools.invoke([{"role": "system", "content": system_message}] + messages)
    return {"messages": [response]}


    def should_use_tools(state: State):
    """ 决定是否使用工具 """
    messages = state["messages"]
    last_message = messages[-1]
    # 检查最后一条消息是否包含工具调用
    if hasattr(last_message, "tool_calls") and last_message.tool_calls:
    return "tools"
    return "end"


    # 使用 ToolNode 简化工具调用逻辑
    tools_node = ToolNode([lookup_user_info])
    # 添加节点到图中
    graph.add_node("agent", agent_node)
    graph.add_node("tools", tools_node)
    # 添加边和条件边
    graph.add_edge(START, "agent")
    graph.add_edge("tools", "agent")
    graph.add_conditional_edges("agent", should_use_tools, {"tools": "tools", "end": END})

    # 编译图
    agent = graph.compile()
    # 调用ReAct智能体,通过config参数传递运行时参数user_id
    for chunk in agent.stream(
    {"messages": [("human", "Who are you and where is vivo?")]},
    {"configurable": {"user_id": "1"}}, # 通过 config 参数传递运行时参数
    user_id="1"
    ):
    print(chunk)
    • lookup_user_info 工具函数不再直接返回执行结果(如字符串或字典),而是返回包含了 update 参数的 Command 对象。update 参数是一个字典,用于指定需要更新的图状态。
    • user_info:更新为查询到的用户信息 user_info,将工具执行结果保存到图状态中供后续节点访问。
    • messages:添加一个 ToolMessage 类型消息,记录工具执行成功的信息, 并关联工具调用 ID tool_call_id。这符合 LangGraph 和 ReAct Agent 最佳实践。
  • 要使 LangGraph 流程能够正确处理 Command 对象,并应用状态更新,则需要注意以下要点。

    • 使用预置组件:必须使用 create_react_agent 或 ToolNode 等 LangGraph 预置组件来构建和执行 ReAct 智能体流程。这些预置组件已经内置了处理 Command 对象的逻辑,能够自动识别和应用状态更新。自定义节点函数需要手动实现相关逻辑, 开发复杂度较高。
    • 工具函数必须返回 Command 类型的对象,而不是直接返回执行结果。
    • Command 对象必须包含 update 参数,否则 LangGraph 无法识别状态更新指令。
    • Command 对象更新 messages 状态键 (可选但推荐):建议在 Command 对象的 update 参数中更新 messages 消息历史列表,添加 ToolMessage,以保持对话历史的完整性和可追溯性,为后续推理提供更丰富的上下文信息。

5.7 向工具传递运行时参数

  • 在某些复杂的 LangGraph 应用场景中,不仅需要从工具中更新图状态,还需要在运行时,动态地向工具函数传递参数。这些运行时参数不由 LLM 生成,而是由 LangGraph 流程的外部环境或父图流程动态传入。例如,在多用户、多会话的 AI 系统中,每次工具调用时动态地将当前用户的ID(user_id)传递给工具函数,以便工具函数能访问相应用户的数据或资源。运行时参数与工具函数的普通参数不同,后者由 LLM 生成,前者则由开发者显式指定。

  • LangGraph 框架提供了两种传递运行时参数的主要方式。

    • 通过 RunnableConfig 的 config 参数传递。作为 LangChain Runnable 接口的标准机制,可在 graph.invoke() 或 graph. stream() 等调用时传入 config 字典参数。工具函数通过 config: RunnableConfig 类型提示接收,并使用config.get(“configurable”, {}).get(“user_id”) 等方式,获取运行时配置信息(例如,user_id)。推荐优先使用这种方式。

    • 使用 Annotated 和 InjectedState 注解。InjectedState 注解可以标记工具函数的某些参数为“注入参数”,这些参数不会暴露给 LLM 模型,而由 LangChain 框架运行时自动注入。InjectedState 注解通常与 Annotated 类型提示配合使用,用于声明注入参数的类型(例如,Annotated[str, InjectedState] 表示 user_id 参数是一个字符串类型的注入参数)。InjectedState 注解主要用于“隐藏”特定参数,防止 LLM 错误控制,完全由开发者显式地指定和控制。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      from langgraph.prebuilt import ToolNode, InjectedState


      class State(AgentState): # 定义图状态结构体,继承自 AgentState,并添加 user_info 状态键
      user_info: dict[str, Any]
      user_id: str


      @tool
      def lookup_user_info( # 定义工具函数 lookup_user_info
      tool_call_id: Annotated[str, InjectedToolCallId],
      user_id: Annotated[str, InjectedState("user_id")]
      ):
      """Use this to look up user information to better assist them with their questions.""" # 工具描述信息
      if user_id is None:
      raise ValueError("Please provide user ID")
      if user_id not in USER_ID_TO_USER_INFO:
      raise ValueError(f"User '{user_id}' not found")
      user_info = USER_ID_TO_USER_INFO[user_id] # 根据 user_id 查询用户信 息
      return Command( # 工具函数返回 Command 对象
      update={ # Command 对象包含状态更新指令
      "user_info": user_info, # 更新 user_info 状态键,值为查到的用户信息
      "messages": [ # 更新 messages 状态键,添加 ToolMessage
      ToolMessage("Successfully looked up user information", tool_call_id=tool_call_id
      )
      ]
      }
      )


      # 图的定义和编译代码省略
      # 调用ReAct智能体,通过config参数传递运行时参数
      for chunk in agent.stream(
      # 通过user_id状态键传递运行时参数
      {"messages": [("human", "Who am i and where do i live?")], "user_id": "1"},
      ):
      print(chunk)
      • 我们修改了 lookup_user_info 工具函数的定义,为其添加了user_id: Annotated[str, InjectedState(“user_id”)] 参数,使用 Annotated 和 InjectedState 注解将 user_id 参数标记为“注入参数”。这样,在流程运行时,LangGraph 框架会自动从状态结构体中提取 user_id 的值并注入 lookup_user_info 工具函数的 user_id 参数中,模型无须在工具调用请求中生成 user_id 值。
  • 在实际应用中,可根据业务需求灵活选择运行时参数传递方式。大多数场景使用 RunnableConfig 的 config 参数就可满足需求。如果需要更精细的参数控制和隐藏某些参数,则可考虑使用 Annotated 和 InjectedState 注解。

6、图的可视化

  • 随着 LangGraph 流程变得越来越复杂,特别是引入子图 (Subgraph) 机制后,仅通过代码理解流程结构和执行逻辑变得越来越困难。图的可视化 (Graph Visualization) 成为重要辅助工具。通过直观展示流程的整体结构、节点连接关系, 以及数据流动,可更高效地设计、调试和维护复杂的 AI 智能体系统。LangGraph 框架提供了多种内置的图可视化方法,可以满足不同场景下的可视化需求。

6.1 Mermaid语法

  • Mermaid 是一种流行的文本描述语言,用于快速创建流程图、时序图、甘特图等各类图表。LangGraph 内置了将 Graph 对象转换为 Mermaid 语法的功能。通过调用 graph.get_graph().draw_mermaid() 方法,可获取当前 LangGraph 图结构的 Mermaid 文本描述。该 Mermaid 文本可粘贴到任何支持 Mermaid 语法的在线编辑器(例如, Mermaid Live Editor)或 Markdown 编辑器中,实时渲染出清晰的流程图。Mermaid 语法具有轻量级、跨平台、易于编辑和分享等优势,无须额外依赖库,可直接在浏览器或 Markdown 文档中查看编辑图表。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    import operator
    from typing import Annotated, Any
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph, START, END


    class State(TypedDict):
    aggregate: Annotated[list, operator.add]


    def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}


    def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}


    def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}


    def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}


    builder = StateGraph(State)
    builder.add_node(a)
    builder.add_node(b)
    builder.add_node(c)
    builder.add_node(d)
    builder.add_edge(START, "a")
    builder.add_edge("a", "b")
    builder.add_edge("a", "c")
    builder.add_edge("b", "d")
    builder.add_edge("c", "d")
    builder.add_edge("d", END)

    graph = builder.compile()

    mermaid_syntax = graph.get_graph().draw_mermaid() # 调用 draw_ mermaid()方法获取 Mermaid 语法描述
    print(mermaid_syntax) # 打印 Mermaid 语法描述
    # ---
    # config:
    # flowchart:
    # curve: linear
    # ---
    # graph TD;
    # __start__([<p>__start__</p>]):::first
    # a(a)
    # b(b)
    # c(c)
    # d(d)
    # __end__([<p>__end__</p>]):::last
    # __start__ --> a;
    # a --> b;
    # a --> c;
    # b --> d;
    # c --> d;
    # d --> __end__;
    # classDef default fill:#f2f0ff,line-height:1.2
    # classDef first fill-opacity:0
    # classDef last fill:#bfb6fc

    将以上 Mermaid 语法复制粘贴到 Mermaid Live Editor 中,即可实时渲染出 LangGraph 流程图,便于直观地查看和分析图结构。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    ---
    config:
    flowchart:
    curve: linear
    ---
    graph TD;
    __start__([<p>__start__</p>]):::first
    a(a)
    b(b)
    c(c)
    d(d)
    __end__([<p>__end__</p>]):::last
    __start__ --> a;
    a --> b;
    a --> c;
    b --> d;
    c --> d;
    d --> __end__;
    classDef default fill:#f2f0ff,line-height:1.2
    classDef first fill-opacity:0
    classDef last fill:#bfb6fc

6.2 PNG图片

  • 除了 Mermaid 语法,LangGraph 还支持将 Graph 对象直接渲染成 PNG 图片,方便在 Notebook、 网页或 Markdown 文档中直接嵌入和展示图表。LangGraph 提供了三种 PNG 图片渲染方式,可以根据实际需求选择合适的方式。

6.2.1 使用Mermaid.ink API(默认)

  • graph.get_graph().draw_mermaid_png() 方法默认使用 Mermaid.ink API 进行 PNG 图片渲染。Mermaid.ink 是一个在线 Mermaid 图表渲染服务,无须在本地安装任何额外的依赖库,只需要联网即可使用,非常方便快捷,推荐作为默认的 PNG 渲染方式。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    import operator
    from typing import Annotated, Any
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph, START, END
    from IPython.display import Image
    from langchain_core.runnables.graph import MermaidDrawMethod


    class State(TypedDict):
    aggregate: Annotated[list, operator.add]


    def a(state: State):
    print(f'Adding "A" to {state["aggregate"]}')
    return {"aggregate": ["A"]}


    def b(state: State):
    print(f'Adding "B" to {state["aggregate"]}')
    return {"aggregate": ["B"]}


    def c(state: State):
    print(f'Adding "C" to {state["aggregate"]}')
    return {"aggregate": ["C"]}


    def d(state: State):
    print(f'Adding "D" to {state["aggregate"]}')
    return {"aggregate": ["D"]}


    builder = StateGraph(State)
    builder.add_node(a)
    builder.add_node(b)
    builder.add_node(c)
    builder.add_node(d)
    builder.add_edge(START, "a")
    builder.add_edge("a", "b")
    builder.add_edge("a", "c")
    builder.add_edge("b", "d")
    builder.add_edge("c", "d")
    builder.add_edge("d", END)

    graph = builder.compile()

    png_bytes = graph.get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod. API) # 使用 MermaidDrawMethod.API 指定使用 Mermaid.ink API 渲染
    display(Image(png_bytes)) # 在 Notebook 中显示 PNG 图片
    image-20251102133518669

6.2.2 使用Mermaid和Pyppeteer

  • graph.get_graph().draw_mermaid_png(draw_method=MermaidDrawMethod.PYP PETEER) 方法支持使用 Mermaid 和 Pyppeteer 库在本地进行 PNG 图片渲染。

  • Pyppeteer 作为 Python 版本的 Puppeteer 库,可通过 Chromium/Chrome 浏览器实现 Web 页面的自动化操作(含页面截图功能)。使用这种方法渲染,需要在本地安装 pyppeteer 和 nest_asyncio 库(用于在 Jupyter Notebook 中运行异步函数),并且首次运行时,要安装 Chromium 浏览器(如果本地没有安装)。这种方法的优点是渲染速度较快,不依赖外部 API,支持离线使用,并提供更多 Mermaid 的高级定制化选项(如曲线风格、节点颜色、标签换行、背景色、边距等)。

    1
    pip install pyppeteer nest_asyncio
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    import random
    from typing import Annotated, Literal
    from typing_extensions import TypedDict
    from langgraph.graph import StateGraph, START, END
    from langgraph.graph.message import add_messages


    class State(TypedDict):
    messages: Annotated[list, add_messages]


    class MyNode:
    def __init__(self, name: str):
    self.name = name

    def __call__(self, state: State):
    return {"messages": [("assistant", f"Called node {self.name}")]}


    def route(state) -> Literal["entry_node", "__end__"]:
    if len(state["messages"]) > 10:
    return "__end__"
    return "entry_node"


    def add_fractal_nodes(builder, current_node, level, max_level):
    if level > max_level:
    return
    # 定义在此层级创建的节点数量
    num_nodes = random.randint(1, 3)
    for i in range(num_nodes):
    nm = ["A", "B", "C"][i]
    node_name = f"node_{current_node}_{nm}"
    builder.add_node(node_name, MyNode(node_name))
    builder.add_edge(current_node, node_name)
    # 创建更多层级的随机节点
    r = random.random()
    if r > 0.2 and level + 1 < max_level:
    add_fractal_nodes(builder, node_name, level + 1, max_level)
    elif r > 0.05:
    builder.add_conditional_edges(node_name, route, node_name)
    else:
    # 把节点连接到终点
    builder.add_edge(node_name, "__end__")


    def build_fractal_graph(max_level: int):
    builder = StateGraph(State)
    entry_point = "entry_node"
    builder.add_node(entry_point, MyNode(entry_point))
    builder.add_edge(START, entry_point)
    add_fractal_nodes(builder, entry_point, 1, max_level)
    # 可选:把入口节点也直接连接到终点
    builder.add_edge(entry_point, END)
    return builder.compile()

    app = build_fractal_graph(3)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    from IPython.display import Image, display
    from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles
    import nest_asyncio

    # 修复 asyncio 运行时错误
    nest_asyncio.apply()

    png_bytes = app.get_graph().draw_mermaid_png( # 使用 draw_mermaid_png() 方法渲染 PNG 图片
    draw_method=MermaidDrawMethod.PYPPETEER, # 指定使用 MermaidDrawMethod.PYPPETEER,使用 Mermaid + Pyppeteer 渲染
    curve_style=CurveStyle.LINEAR, # 设置曲线风格为线性
    node_colors=NodeStyles(first="#ffdfba", last="#baffc9", default="#fad7de"), # 自定义节点颜色
    wrap_label_n_words=9, # 设置节点标签自动换行,每行最多 9 个单词
    output_file_path=None, # 不输出到文件
    background_color="white", # 设置背景色为白色
    padding=10, # 设置边距为 10 像素
    )
    display(Image(png_bytes)) # 在 Notebook 中显示 PNG 图片

    运行后会在 Notebook 中直接显示渲染好的 LangGraph 流程图 PNG 图片,并且使用了自定义的曲线风格、节点颜色、标签换行等高级定制化选项,使图表更加美观和易读,如图所示。

6.2.3 使用Graphviz

  • graph.get_graph().draw_png() 方法可以使用 Graphviz 库在本地进行 PNG 图片渲染。Graphviz 是一个功能强大的开源图表可视化工具包,能生成专业级的渲染效果,但配置和安装相对复杂,需要安装 Graphviz 库和 pygraphviz 库,并且可能需要安装 Graphviz 软件(依赖操作系统环境)。优点是渲染效果最为精细和专业,支持更多高级的图表布局和样式定制,适用于对图表美观度和定制化要求较高的场景。

    1
    pip install pygraphviz
    1
    2
    3
    from IPython.display import Image, display
    png_bytes = app.get_graph().draw_png() # 使用 draw_png() 方法,默认使用 Graphviz 库渲染 PNG 图片
    display(Image(png_bytes)) # 在 Notebook 中显示 PNG 图片

    运行后会在 Jupyter Notebook 中直接显示渲染好的 LangGraph 流程图 PNG 图片,如图所示,Graphviz 渲染的图表通常具有更专业的布局和样式,例如,节点形状更丰富,边的路由更智能,整体视觉效果更精细。

6.3 总结

  • 在实际应用中,可以根据具体需求和偏好,选择合适的 PNG 渲染方式,如表所示。对于快速原型开发和简单可视化,推荐使用默认的 Mermaid.ink API 方式,简单方便,无须额外配置;对于需要本地离线渲染,或需要定制化图表样式的场景,可以选择 Mermaid + Pyppeteer 方式;对于对图表美观度和专业性要求极高的场景,可以尝试 Graphviz 方式,但需要投入更多的时间和精力进行配置和学习。

    渲染方式 优点 缺点 依赖安装 适用场景
    Mermaid. ink API 最简单易用,无须本地安装任何依赖,跨平台,轻量级 需要联网,渲染速度可能受网速影响,定制化选项较少 快速原型开发,对图表美观度要求不高,网络环境良好
    Mermaid+ Pyppeteer 渲染速度快,离线可用,定制化选项 丰富,本地渲染,安全性较高 首次运行需要下载 Chromium 浏览器,需要 安装 pyppeteer 库和 nest_ asyncio 库,配 置相对复杂 Pyppeteer, nest_asyncio (和 Chromium 浏览器) 需要本地离线渲染,需要定制化图表样式,对渲染速度有 要求,网络环境不稳定或受限
    Graphviz 渲染效果最专业,布局和样式高度可定制,本地渲染,功能强大 配置和安装复杂,需要安装 Graphviz 库和 Pygraphviz 库,可能需要安装 Graphviz 软件 Graphviz, Pygraphviz 软件 (可选,依赖操作系统环境) 对图表美观度和专业性要求极高,需要深度定制化图表, 例如,出版物、技术文档、正式 PPT

6.4 X-Ray子图可视化

  • 对于包含子图 (Subgraph) 的 LangGraph 流程,默认的可视化方法可能仅显示父图的顶层结构,而隐藏了子图的内部细节。为深入理解复杂嵌套图结构, LangGraph 提供了 X-Ray 子图可视化功能,可穿透子图的封装展示其内部结构,特别适用于分析多智能体系统等层级化结构。

  • 启用 X-Ray 子图可视化需要在调用 graph.get_graph() 方法时设置 xray 参数:

    • xray=True:完全展开所有层级的子图,展现最完整的图结构细节,适用于代码审查、架构设计及深度调试等场景。
    • xray=<depth>:按指定深度展开子图,若 xray=1,则只展开第一层子图, xray=2 表示展开前两层子图,依次类推,可在复杂度和细节之间取得平衡, 适用于系统架构概览、模块间关系分析等场景。
    1
    2
    3
    4
    5
    6
    7
    8
    from IPython.display import Image, display
    # 包含子图的graph 的定义和编译代码,此处省略
    # 使用 X-Ray 子图可视化,完全展开所有层级子图
    png_bytes_xray_full = graph.get_graph(xray=True).draw_mermaid_png() # 设置 xray=True,完全展 开子图
    display(Image(png_bytes_xray_full))# 显示完全展开的子图可视化图表
    # 使用 X-Ray 子图可视化,展开深度为2的子图
    png_bytes_xray_depth_2 = graph.get_graph(xray=2).draw_mermaid_png() # 设置 xray=2,展开深度为 2 的子图
    display(Image(png_bytes_xray_depth_2)) # 显示展开深度为 2 的子图可视化图表

    运行示例,会分别生成完全展开子图和展开深度为 2 的子图的 PNG 图片,并在 Jupyter Notebook 中显示出来,如图所示。

    使用 X-Ray 子图可视化功能,有助于深入理解系统的内部结构和运行机制,提高系统设计、开发、调试和维护的效率和质量。