AI智能体的交互体验

1、流式处理

  • 在 AI 智能体领域,用户体验至关重要。无论智能体的推理能力多么复杂或功能多么强大,若其响应缓慢,则难以满足用户的期望。现代用户已经习惯数字交互的即时性。他们同样期望 AI 智能体敏捷响应,实时提供反馈并展示进度。这对于对话式智能体或需要实时生成文本、代码等内容的应用来说尤其重要。大语言模型 (LLM) 和复杂人工智能工作流中固有的延迟问题,对实现期望的响应水平构成重大挑战。完整的输出需要长时间等待,这可能会导致用户感到沮丧、放弃,并认为 AI 智能体运行缓慢或不可靠。

  • 流式处理技术能克服延迟问题,显著提高 AI 智能体的感知响应速度。流式处理 不是等待整个输出生成完毕后再显示给用户,而是允许以分块的方式逐步交付信息。 这种即时反馈循环创造了一种实时交互感,让用户在智能体生成完整响应的过程中保持参与。例如,聊天机器人逐字显示回复,模仿人类对话,或者报告生成智能体实时显示已完成部分,都体现了流式处理如何将耗时过程转变为动态体验。

  • LangGraph 在架构设计阶段就将流式处理列为核心功能之一,如图所示,它提供了多种不同的流式处理模式,分别针对不同用例及监控需求而优化。这些模式使开发者能够利用实时数据流,涵盖从高级状态更新到细粒度 LLM 词元流等各个层面。本节将深入探讨值流、更新流、自定义流、消息流和调试流等流式处理模式。

    image-20251102142336459
  • 需要特别说明的是,一个包含子图的图需要流式传输输出,就必须在父图的 .stream() 或 .astream() 方法中指定 subgraphs=True,否则,子图的输出不会被流式传输。

    1
    2
    for chunk in graph.stream(inputs, stream_mode="values", subgraphs=True):
    print(chunk)

1.1 流式处理模式

  • LangGraph 通过 .stream() 和 .astream() 方法提供多样化的流式处理模式,使开发者能够选择在图执行期间流式传输的数据类型和粒度。这些方法有同步和异步两种形式,为不同应用程序架构的流式处理集成提供了灵活性。stream_mode 参数是关键配置项,接受一个字符串或字符串列表来指定所需的流式处理行为。主要的流式处理模式说明如下。

1.1.1 值流

  • 值流模式在每个步骤后流式传输图的完整状态,提供图执行的高级视图。具体来说,在 LangGraph 中每个节点或超级步骤执行完毕后,包含所有已定义的状态变量及其当前值的整个状态对象会作为一个完整的数据块在流中发出。此模式对于调试和理解图的整体进展特别有价值。通过观察状态如何逐步演变,开发者可以深入了解其 LangGraph 应用程序中的数据转换和决策逻辑。尽管值流模式可能显得过于冗长而不适合直接面向用户显示,但它非常适用于构建内部监控仪表板或日志记录系统,以跟踪智能体的内部状态转换过程。

  • 让我们通过一个简单的 LangGraph 示例来进一步理解这一模式:该示例旨在优化用户提供的主题,并围绕该主题生成笑话。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    from typing import TypedDict
    from langgraph.graph import StateGraph, START

    class State(TypedDict):
    topic: str
    joke: str

    def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}

    def generate_joke(state: State):
    return {"joke": f"This is a joke about {state['topic']}"}

    graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
    )
    • 当我们以 stream_mode=’’values’’ 执行此图时,可以观察到在每个节点执行完毕后,图的完整状态都被流式传输。

      1
      2
      3
      4
      5
      for chunk in graph.stream({"topic": "ice cream"}, stream_mode="values",):
      print(chunk)
      # {'topic': 'ice cream'}
      # {'topic': 'ice cream and cats'}
      # {'topic': 'ice cream and cats', 'joke': 'This is a joke about ice cream and cats'}

      输出结果显示,第一个数据块仅包含初始状态的“topic”。第二个数据块反映了 refine_topic 节点执行后的状态,其中“topic”已被更新。最后一个数据块显示了 generate_joke 节点执行后的完整状态,此时状态中同时包含了 “topic” 和 “joke” 两个字段。这种逐步展示完整状态的方式,正是值流模式的核心特点。

1.1.2 更新流

  • 值流模式相比,更新流模式提供了更集中和简洁的信息流。更新流模式不会流式传输整个状态,而是仅传输每个节点对状态所做的特定更新。开发者将会收到键值对流,其中键是生成更新的节点名称,值是包含该节点修改状态变量的字典。此模式对于向用户界面提供增量进度更新特别有用。例如,显示多阶段智能体工作流当前执行的步骤,或者突出显示每个阶段生成的特定数据。更新流模式相比值流模式更加简洁,直接反映图中每个单独节点的输出内容。

  • 以同样的笑话生成程序为例,观察使用 stream_mode=’’updates’’ 的输出。

    1
    2
    3
    4
    for chunk in graph.stream({"topic": "ice cream"}, stream_mode="updates",):
    print(chunk)
    # {'refine_topic': {'topic': 'ice cream and cats'}}
    # {'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

    这里的输出结果更加精简,仅显示每个节点关联的更新内容。refine_topic 更新显示了 topic 的更改,generate_joke 节点更新显示了新生成的 joke。此模式有效地隔离并仅呈现每个步骤中发生的更改,使其成为实现针对性、渐进式 UI 更新的理想选择。

1.1.3 自定义流

  • 自定义流模式提供了极致的灵活性,允许开发者从其 LangGraph 节点内部流式传输任意数据。这是通过 StreamWriter 对象实现的,该对象可以在任何节点函数中访问。节点在执行过程中随时使用 StreamWriter 将任意类型的数据发送到流中。这种模式非常适合流式传输进度更新、中间结果或任何其他与用户体验或监控需求相关的自定义信息。例如,在工具调用智能体中,实时传输工具调用结果;或者在数据处理管道中即时展示已处理数据块的预览。自定义流模式为需要定制化流式处理的场景提供了理想解决方案。

  • 以下示例通过修改 generate_joke 节点,使用 StreamWriter 在返回笑话前发送自定义消息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    from langgraph.types import StreamWriter


    def generate_joke(state: State, writer: StreamWriter):
    writer({"custom_key": "Writing custom data while generating a joke"})
    return {"joke": f"This is a joke about {state['topic']}"}


    graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
    )

    for chunk in graph.stream({"topic": "ice cream"}, stream_mode="custom",):
    print(chunk)
    # {'custom_key': 'Writing custom data while generating a joke'}

    上述通过 StreamWriter 成功接收了自定义字典。此模式支持将任何相关信息嵌入数据流中,高度适配特定应用场景需求。当与工具结合使用时,自定义流模式在提供复杂智能体行为实时洞察方面展现出更强大的能力。

1.1.4 消息流

  • 对于涉及 LLM 交互的应用程序,尤其是对话式智能体,消息流模式具有重要价值。此模式流式传输 LangGraph 中 LLM 生成的单个词元(Token)及其相关元数据。这种词元级流式处理实现最细粒度的实时反馈,为聊天机器人等文本生成应用程序实现“打字效果”界面。此外,与每个词元数据块还附带包括有关 LangGraph 步骤、 LLM 调用节点信息及相关标签的元数据,支持对词元流进行细粒度的过滤和控制。 消息流模式对于创建真正具有交互性和迅速响应能力的体验至关重要。

  • 以下示例通过增强笑话生成图来观察消息流模式的输出效果。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    from langchain_openai import ChatOpenAI
    import os
    import dotenv

    dotenv.load_dotenv()
    os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY1")
    os.environ["OPENAI_BASE_URL"] = os.getenv("OPENAI_BASE_URL")

    llm = ChatOpenAI(model="gpt-4-turbo")


    def generate_joke(state: State):
    llm_response = llm.invoke(
    [{"role": "user", "content": f"Generate a joke about {state['topic']}"}])
    return {"joke": llm_response.content}


    graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
    )

    for message_chunk, metadata in graph.stream({"topic": "ice cream"}, stream_mode="messages", ):
    if message_chunk.content:
    print(message_chunk.content, end="|", flush=True)
    # Why| did| the| cat| sit| on| the| ice| cream|?
    #
    # |Because| it| wanted| to| have| a| "|p|urr|-f|ect|ly|"| cool| spot| to| chill|!|

    输出结果显示,系统现在以单个词元为单位持续传输生成的笑话内容,支持逐字甚至逐字符的实时显示效果。随每个词元一同传输的 metadata 字典提供了宝贵的上下文信息。

1.1.5 调试流

  • 调试流模式专为需要全面监控 LangGraph 执行过程的开发者设计。该模式会流式传输包含丰富调试信息的事件流,在图执行的每个步骤提供详细数据,涵盖任务调度、执行结果、错误信息及状态转换等关键事件。调试流模式虽然因其技术性和信息密度较高而不适合直接面向用户展示,但在开发调试阶段和需要深度监控的场景中为开发者提供了不可或缺的分析工具,特别有助于进行性能优化和复杂行为分析。

  • 以下是通过调试流模式运行笑话生成图时的输出示例。

    1
    2
    3
    4
    5
    6
    for chunk in graph.stream({"topic": "ice cream"}, stream_mode="debug", ):
    print(chunk)
    # {'step': 1, 'timestamp': '2025-11-02T06:47:49.029428+00:00', 'type': 'task', 'payload': {'id': '9f0f0f7a-2670-3a54-fe6f-8645455187a5', 'name': 'refine_topic', 'input': {'topic': 'ice cream'}, 'triggers': ('branch:to:refine_topic',)}}
    # {'step': 1, 'timestamp': '2025-11-02T06:47:49.029549+00:00', 'type': 'task_result', 'payload': {'id': '9f0f0f7a-2670-3a54-fe6f-8645455187a5', 'name': 'refine_topic', 'error': None, 'result': {'topic': 'ice cream and cats'}, 'interrupts': []}}
    # {'step': 2, 'timestamp': '2025-11-02T06:47:49.029621+00:00', 'type': 'task', 'payload': {'id': '4db13db9-b092-f6e0-6ebc-2f964ba63b5a', 'name': 'generate_joke', 'input': {'topic': 'ice cream and cats'}, 'triggers': ('branch:to:generate_joke',)}}
    # {'step': 2, 'timestamp': '2025-11-02T06:47:51.428682+00:00', 'type': 'task_result', 'payload': {'id': '4db13db9-b092-f6e0-6ebc-2f964ba63b5a', 'name': 'generate_joke', 'error': None, 'result': {'joke': 'Why did the cat sit on the ice cream cone?\n\nBecause it wanted to be a cool cat with a purr-fect scoop!'}, 'interrupts': []}}

    输出结果为 JSON 格式的字典数据流,每个字典都代表图中执行过程中的不同事件。’type’: ‘task’ 事件指示节点执行的开始,而 ‘type’: ‘task_result’ 事件显示结果。每个事件中的 ‘payload’ 字段包含详细信息,如节点名称、输入、触发条件和执行结果等。 分析此详细流对于理解复杂的图行为、定位潜在问题非常有用。

1.1.6 组合流式处理模式

  • LangGraph 支持同时组合多种流式处理模式,通过将流模式字符串列表传递给 stream_mode 参数,开发者可以接收包含多种模式数据的交错流。组合模式时,流式处理输出将变为 (stream_mode, data) 元组,其中 stream_mode 指示数据类型,data 是该模式的实际流式内容。此功能支持更丰富的监控和用户反馈场景,例如同时查看高级状态更新和细粒度的 LLM 词元流。

  • 以下示例为笑话生成图组合更新流和自定义流模式。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    from langgraph.types import StreamWriter


    def generate_joke(state: State, writer: StreamWriter):
    writer({"custom_key": "Writing custom data while generating a joke"})
    return {"joke": f"This is a joke about {state['topic']}"}


    graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile()
    )
    for stream_mode, chunk in graph.stream({"topic": "ice cream"}, stream_mode=["updates", "custom"], ):
    print(f"Stream mode: {stream_mode}")
    print(chunk)
    print("\n")

    # Stream mode: updates
    # {'refine_topic': {'topic': 'ice cream and cats'}}
    #
    #
    # Stream mode: custom
    # {'custom_key': 'Writing custom data while generating a joke'}
    #
    #
    # Stream mode: updates
    # {'generate_joke': {'joke': 'This is a joke about ice cream and cats'}}

    此示例的输出结果包含多个元组。每个元组都明确标出 stream_mode (updates 或 custom) 及其对应的数据块。这种结构化输出方式便于处理不同类型的流式处理信息, 以构建全面的实时监控和用户反馈系统。例如,将便于 messages 消息流模式与其他模式组合使用,可以同时实现面向用户界面的逐词元输出显示和用于日志记录或调试的高级别状态更新。

1.1.7 总结

  • LangGraph 流式处理模式总结如表所示。

    流式处理模式 描述 用例
    值流 流式传输每个步骤后图的完整状态 调试,监控图的整体状态演变,构建内部监控仪表板
    更新流 流式传输每个节点对状态进行的更新 UI 进度更新,突出显示每个步骤的数据变化,逐步展示智能体工作流程
    自定义流 允许节点使用 StreamWriter 流式传输任意自定义数据 流式传输中间结果,工具输出,定制化进度信息,灵活满足各种应用场景
    消息流 流式传输 LLM 生成的词元及相关元数据,适用 于聊天模型 聊天机器人“打字效果”, 实时展示 LLM 生成内容,构建交互式对话体验
    调试流 流式传输详细的调试事件,包含任务调度、结果、 错误等信息 深入故障排除,性能分析,理解图执行细节,开发和高级监 控阶段的利器

1.2 事件流式处理

  • 除了 .stream() 和 .astream() 提供的基于模式的流式处理,LangGraph 还提供了 .astream_events() 方法,用于访问图中执行期间发生的较低级别事件流。此方法适用于捕获节点内部事件,便于开发者更精细地了解图的内部工作原理。.astream_events() 与 LangChain 对象中标准的事件流式处理接口一致,使其符合 LangChain 生态系统开发者的使用习惯。

  • .astream_events() 输出的每个事件都是一个字典,其中包含以下关键字段。

    • event:事件类型(例如,’on_chain_start’’,on_chat_model_stream’’,on_chain_end’),事件的完整列表可参考 LangChain 文档。
    • name:与事件关联的名称,通常指示发出事件的组件(例如,图事件为 ‘LangGraph’,节点事件为节点名称,LLM 事件为模型名称)。
    • data:事件的有效负载,包含事件类型的数据。例如,’on_chat_model_ stream’ 事件的 data 字段中包含流式传输的词元块。
  • 事件触发机制如下。

    • 节点执行:开始执行时触发 ‘on_chain_start’,执行期间触发 ‘on_chain_ stream’(适用于流式处理 Runnable),执行完成时触发 ‘on_chain_end’。节点事件的 ‘name’ 字段包含节点名称。
    • 图执行:图开始时发出 ‘on_chain_start’,每次节点执行后发出 ‘on_chain_ stream’,图完成时发出 ‘on_chain_end’。图事件的 ‘name’ 字段包含 ‘LangGraph’。
    • 状态通道写入:状态变量更新会触发 ‘on_chain_start’ 和 ‘on_chain_end’ 事件。
    • 内部节点事件:节点内部生成的事件,例如,LLM 事件 ‘on_chat_model_ start’’,on_chat_model_stream’’,on_chat_model_end’或工具事件,也会出现在.astream_ events() 流的输出中。
  • 以下示例是使用 .astream_events() 输出一个包含单个 LLM 调用节点的简化图。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    from langchain_openai import ChatOpenAI
    from langgraph.graph import StateGraph, MessagesState, START, END

    model = ChatOpenAI(model="gpt-4-turbo")


    def call_model(state: MessagesState):
    response = model.invoke(state['messages'])
    return {"messages": response}


    workflow = StateGraph(MessagesState)
    workflow.add_node(call_model)
    workflow.add_edge(START, "call_model")
    workflow.add_edge("call_model", END)
    app = workflow.compile()
    inputs = [{"role": "user", "content": "hi!"}]

    async for event in app.astream_events({"messages": inputs}, version="v1"):
    kind = event["event"]
    print(f"{kind}: {event['name']}")

    # on_chain_start: LangGraph
    # on_chain_start: call_model
    # on_chat_model_start: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_stream: ChatOpenAI
    # on_chat_model_end: ChatOpenAI
    # on_chain_stream: call_model
    # on_chain_end: call_model
    # on_chain_stream: LangGraph
    # on_chain_end: LangGraph
    • 此输出结果展示了图操作的完整执行顺序,具体如表所示。从图初始化阶段(on_chain_start: LangGraph)开始,经过输入处理(start 节点),继而执行 call_model 节点。在 call_model 节点执行过程中,我们看到了 LLM 调用事件序列(包括 on_chat_model_start、每个词元的 on_chat_model_stream、on_chat_model_end)。

      事件类型 (event) 名称 (name) 数据内容 (data) 描述
      on_ chain_ start LangGraph 或节点名称 (node_ name) 或 ChannelWrite <node_name, channel_name> input (对于图和节点), key 和 value (对于 ChannelWrite),以及其他运行元数据 (run_ id、tags、metadata 等) 表示图、节点 或通道写入操 作的开始
      on_ chain_ end LangGraph 或节点名称(node_ name) 或 ChannelWrite <node_name, channel_name> output (对于图和节 点),key 和 value(对 于 ChannelWrite),以及其他运行元数据(run_ id、tags、metadata 等) 表示图、节点 或通道写入操 作的结束
      on_ chain_ stream LangGraph 或节点名称 (node_ name) chunk (通常是状态更新的字典) 和其他运行元数据 (run_id、 tags、 metadata 等) 在节点或图的执行过程中, 流式传输的数据块,通常用 于 .stream() 方法的输出
      on_chat_ model_ start 模型名称 (例如 ChatOpenAI) llm (调用的 LLM 实例), messages (发送给模型的聊天消息列表), 以及其他运行元数据 (run_id、 tags、 metadata 等) 表示聊天模型调用的开始
      on_chat_ model_ stream 模型名称 (例如 ChatOpenAI) chunk(AIMessageChunk 对象,包含部分内 容), 以及其他运行元数据 (run_id、 tags、 metadata 等) 表示来自聊天模型的流式词 元块,用于实现实时 “打字效果”
      on_chat_ model_ end 模型名称 (例如 ChatOpenAI) response (完整的 ChatResult 对象), 以 及其他运行元数据(run_ id、tags、metadata 等) 表示聊天模型调用的结束, 包含完整的模 型响应
      Channel Write <node_ name, channel_ name> ChannelWrite <node_name, channel_name> key 和 value (写入通道的状态键和值), 以及其他运行元数据(run_ id、tags、metadata 等) 表示状态通道的写入操作, 用于跟踪状态更新
    • 最后,该过程以通道写入(ChannelWrite)、节点完成(on_chain_end: call_model)和图完成(on_chain_end: LangGraph)结束。.astream_events() 方法提供了对整个执行流程的精细化跟踪,这对于高级调试和理解 LangGraph 运行生命周期具有重要价值。

1.3 LangGraph流式处理的底层原理

  • LangGraph 提供的无缝流式处理功能并非抽象概念,而是建立在成熟的网络技术基础之上,这些技术支持高效的实时数据传输。了解这些底层机制有助于开发者更好地了解流式处理的工作原理,并有效地利用它。LangGraph 流式处理实现的核心技术包括分块数据传输和服务器发送事件 (Server-Sent Event,SSE)。

1.3.1 分块数据传输:分片交付数据

  • 以互联网下载大文件为例:传统方式下,浏览器需要预先获知整个文件大小才能开始下载;而通过分块数据传输技术,服务器可以直接以较小的“块”为单位发送文件片段,无须预先声明总数据量。这个过程可以形象地比喻为面包店在切面包时逐片递送给顾客,而不是等待整个面包全部切完后再一次性交付。
  • 在 HTTP 中,分块数据传输编码允许服务器以一系列数据块的形式发送响应。 服务器在每个数据块可用时发送,客户端(如浏览器或使用 graph.stream() 的 Python 脚本)则负责将这些数据块组装起来以重建完整的数据流。LangGraph 利用此原理避免在发送前在内存中缓冲整个输出。每当图中节点完成部分处理或 LLM 生成新词元时,LangGraph 会立即将数据打包成块并发送到数据流中。这种“即时交付”机制是实现流式处理快速响应的基础。
  • 这种分块数据传输方法对于服务器和客户端来说都是高效的。服务器不需要分配大型缓冲区来保存整个响应,客户端则可以更早地开始处理和显示数据,从而显著提升用户体验,尤其对于长时间运行的 AI 智能体操作而言效果更为明显。

1.3.2 服务器发送事件:实时推送通信

  • 为了实现数据块从 LangGraph 后端到应用程序的实时传输,LangGraph 通常采用服务器发送事件或类似的基于推送的通信模式。SSE 是一种 Web 标准,它使服务器 能够与客户端建立持久的单向连接,并在新数据可用时主动推送到客户端。
  • SSE 可视为 LangGraph 应用程序向客户端“广播”更新的专用通道。一旦客户端发起流式处理请求(如调用 graph.stream()),就会建立 SSE 连接。然后,LangGraph 使用此连接将各类数据块(包括状态更新、自定义消息或 LLM 词元)作为 SSE 事件推送到客户端。客户端侦听此连接,实时接收事件并更新用户界面或执行其他操作。
  • 要注意的是,底层推送机制除 SSE 外,也可能使用其他技术(如纯 HTTP 流式 处理或 WebSockets),具体取决于特定的 LangGraph 部署环境和所选的通信协议。 但是,其核心概念保持不变:建立持久连接,使 LangGraph 能够在生成数据块时主动推送到客户端。
  • 通过理解这些技术基础,开发者可以更好地理解 LangGraph 流式处理实现的高效性和优雅性,从而构建出真正具有交互性和快速响应能力的应用程序。
  • 流式处理绝非 LangGraph 中的可选功能,而是构建用户友好且响应迅速的 AI 智能体的基础架构。通过实施流式处理,开发者能够将原本耗时的人工智能处理流程转化为引人入胜的实时交互体验,从而显著提升用户满意度。LangGraph 通过丰富的 stream_mode 选项和底层的 .astream_events() 方法,提供了一系列可定制化的流式处理解决方案,以满足不同应用场景的特定需求。

2、持久化

  • 在上一节中,我们探讨了流式处理如何提升 AI 智能体实时响应能力,重点关注用 户即时反馈和交互体验。然而,真正强大且以用户为中心的 AI 智能体不仅需要速度,还需要有连续性。用户期望 AI 智能体能够记住过往交互,跨多个会话保持上下文,并能优雅地从中断或错误中恢复。这正是持久化技术的价值所在。LangGraph 通过内置的存档点系统实现持久化,使 AI 智能体能够维持交互状态,从而支持一系列对构建实用且引人入胜的 AI 智能体至关重要的高级功能。
  • 一个每次交互后都会遗忘,或遇到错误就丢失进度的智能体不仅令人沮丧, 功能上也存在严重局限。持久化机制通过保存和恢复智能体内部状态来解决这些 问题。该机制会定期捕获 LangGraph 中所有已定义状态变量的值,并将其存储在持久化存储后端,使 AI 智能体能够“记忆”过往交互、当前进度乃至对话细节, 从而提供更连贯、更具上下文感知能力的用户体验。
  • LangGraph 的持久化基于线程(Thread)和存档点(Checkpoint)两大概念之上。 线程代表图的独立执行上下文或会话,由 thread_id 唯一标识。存档点本质上是图在执行的不同阶段的状态快照,由可插拔的存档点器(Checkpointer)管理。

2.1 线程和存档点的概念

  • 线程和存档点是构建 LangGraph 持久化机制的基石,能够跨交互和会话持久保存和恢复智能体状态。

  • 在 LangGraph 框架中,线程代表图的独立执行上下文。可以将其理解为特定对话历史记录或工作流程执行的专属实例。每个线程都通过唯一的thread_id进行标识, 开发者在调用支持持久化的 LangGraph 时必须提供该标识符。这个 thread_id 作为命名空间,确保每个执行过程关联的存档点和状态数据与其他执行实例保持隔离。这种隔离机制对于处理并发用户交互或管理同一 LangGraph 应用中的多个独立工作流程具有关键作用。

  • 当启动支持持久化的 LangGraph 运行时,必须在 configurable 配置部分明确指定 thread_id 参数。

    1
    config = {"configurable": {"thread_id": "user_conversation_123"}}
  • 存档点器使用此 thread_id 来组织和存储此执行过程的专属存档点。同一对话或工作流中的后续交互应重复使用相同的 thread_id,以保持连续性并访问持久化状态。如要开启新的独立对话或工作流,只需使用不同的唯一 thread_id。这种线程级隔离对于构建多用户应用程序和管理复杂的智能体工作流至关重要,能解决不同执行上下文的独立性问题。

  • LangGraph 中的存档点是图在特定时间点的状态快照。当启用持久化功能时, LangGraph 会在图执行的每个 Superstep 后自动创建这些快照。Superstep 通常对应图中一个或多个节点的执行,然后由控制流逻辑确定下一步。每个存档点都会完整捕获图的当前状态,包括所有已定义状态通道的值。此状态被封装在 StateSnapshot 对象中, 该对象包含以下关键组成部分。

    • config:与此存档点关联的配置,包括 thread_id 和唯一的 checkpoint_id。
    • metadata:与存档点关联的元数据,包括执行步骤、存档点来源,以及写入操作的上下文信息。
    • values:包含存档点创建时所有状态通道值的字典。这是持久化的核心状态数据。
    • next:指示在图流程中计划执行的下一个节点的元组。
    • tasks:由 PregelTask 对象组成的元组,提供关于下一步任务的详细信息,包括任务 ID、节点名称,以及可能出现的错误或中断记录。
  • 这些 StateSnapshot 对象由存档点器存储在持久化存储后端中,并与 thread_id 关联。LangGraph 自动处理存档点过程,在每个 Superstep 后创建新的存档点。这确保了智能体状态的定期备份,从而实现状态的恢复、重放和检查。通过检索和加载这些存档点,LangGraph 可以从先前保存的状态开始“回溯”或“恢复”智能体,从而支持诸如记忆保持和时间旅行等高级特性。

  • 重要的是,要理解 LangGraph 的持久化功能,特别是存档点,主要支持的是 AI 智能体的短期记忆。这种短期记忆使 AI 智能体在单个对话线程或工作流执行范围内保留和利用信息。每个存档点都记录了 AI 智能体在特定步骤的即时工作状态,使其能够连贯地继续执行当前任务。这种“短期”特性对于维持单个交互中的上下文、用 户输入,以及实现预期结果所需的中间结果保存至关重要。

2.2 存档点器的实现

  • LangGraph 的持久化机制设计灵活,能适配各种存储基础设施。这种灵活性是通过由 BaseCheckpointSaver 类定义的存档点器接口实现的。LangGraph 提供了多种内置存档点器实现,每种实现基于不同的存储后端,使开发者能根据应用程序需求和部署环境选择最合适的实现方案。

2.2.1 MemorySaver:内存存档点器

  • MemorySaver 是最基础的存档点器实现,顾名思义,它将所有存档点数据保存在应用程序的内存中。这种实现方案非常快速且易于设置,非常适用于实验、开发及不需要跨应用程序重启持久化的场景。但是,由于数据仅存储在内存中,所以 MemorySaver 不适用于生产环境。因为在生产环境中,数据持久化和跨应用程序关闭的持久化至关重要。如果应用程序重启或崩溃,那么使用 MemorySaver 存储的数据将会丢失。

    1
    2
    3
    4
    from langgraph.checkpoint.memory import MemorySaver

    checkpointer = MemorySaver()
    graph = builder.compile(checkpointer=checkpointer)
  • MemorySaver 适用于以下场景。

    • 原型设计和开发:快速添加持久化以测试和实验 LangGraph 功能。
    • 学习和教程:演示持久化概念,而无须设置外部数据库。
    • 短期应用程序:不需要超出当前应用程序生命周期的数据持久化的应用程序。

2.2.2 SqliteSaver:基于文件的持久化

  • SqliteSaver 利用 SQLite 数据库提供基于文件的持久化方案。SQLite 是一种轻量级、独立的数据库引擎,将数据存储在单个文件中。这使 SqliteSaver 成为需要持久化到磁盘但不需要成熟数据库服务器的可扩展性或复杂性的应用程序的良好选择。它适用于单用户应用程序、本地部署,以及只需要简单基于文件的持久化解决方案的场景。SqliteSaver 提供跨应用程序重启的持久化,比 MemorySaver 更适合本地用例。

  • 使用 SqliteSaver 要先单独安装 langgraph-checkpoint-sqlite 包:

    1
    pip install langgraph-checkpoint-sqlite
    1
    2
    3
    4
    5
    import sqlite3
    from langgraph.checkpoint.sqlite import SqliteSaver

    conn = sqlite3.connect("checkpoints.sqlite")
    checkpointer = SqliteSaver(conn)
  • SqliteSaver 适用于以下场景。

    • 本地应用程序:需要状态持久化的桌面应用程序或命令行工具。
    • 小规模部署:并发性和数据量均有限的应用程序。
    • 开发和测试:提供基于文件的持久化,便于进行更真实的测试。

2.2.3 PostgresSaver:生产级持久化

  • 对于需要健壮、可扩展持久化的生产环境应用,LangGraph 提供了 PostgresSaver。此存档点器利用强大且广泛使用的 PostgreSQL 关系数据库。 PostgreSQL 以其可靠性、可扩展性及对并发访问的支持而闻名,这使 PostgresSaver 成为高流量、生产级 AI 智能体的理想选择。它提供了一个强大的持久化后端,能满足苛刻的工作负载需求。

  • 使用 PostgresSaver 要先安装 langgraph-checkpoint-postgres 包。

    1
    pip install psycopg psycopg-pool langgraph-checkpoint-postgres
    1
    2
    3
    4
    5
    6
    7
    from langgraph.checkpoint.postgres import PostgresSaver
    db_uri = "postgresql://postgres:postgres@localhost:5442/ postgres?sslmode=disable" # 替换为实际 PostgreSQL 连接 URI
    with PostgresSaver.from_conn_string(db_uri) as checkpointer:
    # 注意:第一次使用 checkpointer 时需要调用 setup()
    checkpointer.setup()
    config = {"configurable": {"thread_id": "1"}}
    graph = builder.compile(checkpointer=checkpointer, config=config)
  • PostgresSaver 专为以下场景设计。

    • 生产环境:需要高可靠性和可扩展性的生产级应用部署。
    • 多用户应用程序:支持多用户或智能体的并发访问。
    • 云部署:与基于云的 PostgreSQL 服务集成,以实现可扩展的持久化方案。

2.2.4 MongoDBSaver:文档数据库持久化

  • MongoDBSaver 依托主流 NoSQL 文档数据库 MongoDB 提供持久化服务。 MongoDB 凭借灵活性高、可扩展性强及处理海量非结构化数据的能力广受认可,因此 MongoDBSaver 尤其适用于采用文档型数据库,且对可扩展性和数据存储灵活性有需求的应用场景。

  • 使用 MongoDBSaver 要先安装 langgraph-checkpoint-mongodb 包。

    1
    pip install pymongo langgraph-checkpoint-mongodb
    1
    2
    3
    4
    from langgraph.checkpoint.mongodb import MongoDBSaver
    DB_URI = "mongodb://localhost:27017/" # 替换为实际 MongoDB 连接URI
    with MongoDBSaver.from_conn_string(DB_URI) as checkpointer:
    graph = builder.compile(checkpointer=checkpointer)
  • MongoDBSaver 适用于以下场景。

    • 可扩展应用程序:需要水平扩展并处理大型数据集的应用程序。
    • 以文档为中心的数据模型:图状态自然映射到面向文档的数据模型的应用程序。
    • 现代应用程序堆栈:与基于 MongoDB 的应用程序架构集成。

2.2.5 总结

  • 存档点器实现总结如表所示。

    存档点器名称 存储后端 优点 缺点 用例
    MemorySaver 内存 快速、易于设置 不适用于生产环境 原型设计和开发
    SqliteSaver SQLite 轻量级、跨平台 不适合高流量场景 本地应用程序
    PostgresSaver PostgreSQL 可靠、可扩展 需要数据库服务器 生产环境
    MongoDBSaver MongoDB 灵活、可扩展 需要文档数据库 现代应用程序

2.3 持久化的实际应用

2.3.1 对话记忆

  • 持久化的核心优势之一是支持对话记忆功能,具体流程如图所示。通过存档点保存对话历史记录(通常存储在状态的 messages 通道中),LangGraph 智能体可 以“记住”过往的对话回合。这使得智能体能够实现上下文感知的对话,回顾交互的早期部分,理解用户先前表达的偏好,并在多个回合中保持连贯的对话流程。若没有持久化,则每次与对话式智能体的交互都将从头开始,导致用户体验碎片化且令人沮丧。持久化确保智能体可以像人类一样进行有意义的多回合对话。

    image-20251102152640551
  • 这种由 LangGraph 持久化实现的对话记忆,实际上是一种短期对话记忆。智能体会记住当前对话中的内容,因为消息历史记录和相关状态会在每个回合通过存档点器保存。这种短期记忆对于维持基本的对话连贯性和响应性至关重要。

2.3.2 人机环路工作流

  • 持久化机制在 LangGraph 中对于实现人机环路工作流(如图所示)至关重要。这些工作流需要在智能体执行的各阶段引入人工干预,以实现批准、反馈或直接的状态编辑。持久化支持图执行的无缝中断和恢复,允许人工操作员检查存档点处的智能体状态、做出决策、提供输入或修改状态,然后指示图从该点继续执行。

    image-20251102152733966
  • 此功能对于构建需要人工监督、质量保证或处理复杂或不确定情况(需要人工判断)的智能体至关重要。诸如断点、动态中断和状态编辑等功能都依赖存档点器提供的底层持久化机制实现。

2.3.2 时间旅行调试和分支

  • LangGraph 的持久化机制在支持时间旅行方面展现了其强大能力,为高级开发和调试工作提供重要支持。时间旅行在此能指导并查看 LangGraph 执行的历史状态, 使开发者能检查过去的决策、从特定点重放执行过程,甚至分支执行路径以探索替代结果。这一功能显著提升开发效率,使调试工作更高效,实验更加便捷,同时深化对复杂智能体行为的整体理解。

  • 时间旅行的基础是浏览执行历史记录的能力。在 config 中提供 thread_ id 时,graph.get_state(config) 方法会返回指定线程的当前状态。graph.get_state_history(config) 方法会返回按时间顺序排列的 StateSnapshot 对象列表。此列表记录指定线程的完整执行历史,最新状态快照排在最前。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    from typing import TypedDict
    from langgraph.graph import StateGraph, START

    from langchain_openai import ChatOpenAI
    from langgraph.checkpoint.memory import MemorySaver


    class State(TypedDict):
    topic: str
    joke: str


    llm = ChatOpenAI(model="gpt-4-turbo")


    def refine_topic(state: State):
    return {"topic": state["topic"] + " and cats"}


    def generate_joke(state: State):
    llm_response = llm.invoke([{"role": "user", "content": f"Generate a joke about {state['topic']}"}])
    return {"joke": llm_response.content}


    graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .compile(checkpointer=MemorySaver())
    )

    config = {"configurable": {"thread_id": "my_thread_1"}}
    for chunk in graph.stream({"topic": "ice cream"}, config=config, stream_mode="updates", ):
    print(chunk)
    print("=" * 20)
    print(graph.get_state(config).values)
    state_history = list(graph.get_state_history(config))
    for snapshot in state_history:
    print(f"存档点 ID: {snapshot.config['configurable']['checkpoint_id']}")
    print(f" 步骤元数据 : {snapshot.metadata}")
    print(f" 父图状态值 : {snapshot.values}")
    print(f" 下一个节点 : {snapshot.next}")
    print("=" * 20)

    # {'refine_topic': {'topic': 'ice cream and cats'}}
    # {'generate_joke': {'joke': 'Why did the cat sit on the ice cream cone?\n\nBecause it wanted to be a cool cat with a purr-fect scoop!'}}
    # ====================
    # {'topic': 'ice cream and cats', 'joke': 'Why did the cat sit on the ice cream cone?\n\nBecause it wanted to be a cool cat with a purr-fect scoop!'}
    # 存档点 ID: 1f0b7be0-5806-650c-8002-a5659001c2f0
    # 步骤元数据 : {'source': 'loop', 'step': 2, 'parents': {}}
    # 父图状态值 : {'topic': 'ice cream and cats', 'joke': 'Why did the cat sit on the ice cream cone?\n\nBecause it wanted to be a cool cat with a purr-fect scoop!'}
    # 下一个节点 : ()
    # ====================
    # 存档点 ID: 1f0b7be0-45a4-6440-8001-aaddd058a36c
    # 步骤元数据 : {'source': 'loop', 'step': 1, 'parents': {}}
    # 父图状态值 : {'topic': 'ice cream and cats'}
    # 下一个节点 : ('generate_joke',)
    # ====================
    # 存档点 ID: 1f0b7be0-45a3-677a-8000-925e5503d5e5
    # 步骤元数据 : {'source': 'loop', 'step': 0, 'parents': {}}
    # 父图状态值 : {'topic': 'ice cream'}
    # 下一个节点 : ('refine_topic',)
    # ====================
    # 存档点 ID: 1f0b7be0-45a1-6af6-bfff-8d3723827603
    # 步骤元数据 : {'source': 'input', 'step': -1, 'parents': {}}
    # 父图状态值 : {}
    # 下一个节点 : ('__start__',)
    # ====================
  • LangGraph 的持久化机制不仅支持查看历史状态,更实现了真正的时间旅行功能。 开发者可以在调用 graph.stream() 或 graph.invoke() 时,通过在 config 参数中指定特定的 checkpoint_id 指示系统回溯到该存档点记录的状态,并从此处重新开始执行图流程。

  • LangGraph 能够智能识别哪些步骤在指定 checkpoint_id 存档点前就已经执行过。 系统会从存档点开始创建新的执行分支,即使某些步骤之前已经执行过,在新的分支中仍会完整重演后续所有步骤。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    # 假设来自先前示例的state_history
    checkpoint_to_replay = state_history[2] # 让我们从历史记录中的第 3 个存 档点重放
    print(f"存档点 ID: {checkpoint_to_replay.config['configurable']['checkpoint_id']}")
    print(f" 步骤元数据 : {checkpoint_to_replay.metadata}")
    print(f" 父图状态值 : {checkpoint_to_replay.values}")
    print(f" 下一个节点 : {checkpoint_to_replay.next}")
    # 存档点 ID: 1f0b7c08-a97b-6428-8000-e964ae2645ac
    # 步骤元数据 : {'source': 'loop', 'step': 0, 'parents': {}}
    # 父图状态值 : {'topic': 'ice cream'}
    # 下一个节点 : ('refine_topic',)

    replay_config = checkpoint_to_replay.config
    for event in graph.stream(None, replay_config, stream_mode="values"):
    print(event)
    # {'topic': 'ice cream'}
    # {'topic': 'ice cream and cats'}
    # {'topic': 'ice cream and cats', 'joke': 'Why did the cat sit next to the ice cream cone?\n\nBecause it heard it was a "cool" cat\'s favorite treat!'}
  • 重放功能为调试工作提供强大支持。如果在图执行期间遇到意外行为或错误,则可以先使用 get_state_history() 识别问题发生前的状态快照,再从该存档点重放数据,检查智能体的执行步骤,隔离问题节点或决策点,并在受控环境中重现错误。这种由重放功能支持的迭代调试,相比在实时端到端执行调试要高效得多。

  • 时间旅行功能不仅限于重放历史执行,还可以通过分支来探索替代执行路径。 分支允许开发者从历史存档点分叉,修改该点的智能体状态,然后从修改后的状态恢复执行。这会在执行历史记录中创建一个“分支”,使开发者可以探索不同的方案或测试状态修改的影响,保持原始执行流程不变。

  • 分支功能通过 graph.update_state(config, values) 方法实现。只需提供包含历史状态的 checkpoint_id 的 config,以及包含所需修改状态的 values 字典,即可创建一个新的分支存档点。这个新的存档点会继承原始存档点的执行上下文和元数据,还会合并状态更新。(update_state只是更新状态,但不会重新执行已经执行过的节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    checkpoint_to_branch = state_history[1] # 让我们从历史记录中的第 2 个存档 点创建分支
    print(f"存档点 ID: {checkpoint_to_branch.config['configurable']['checkpoint_id']}")
    print(f" 步骤元数据 : {checkpoint_to_branch.metadata}")
    print(f" 父图状态值 : {checkpoint_to_branch.values}")
    print(f" 下一个节点 : {checkpoint_to_branch.next}")
    # 存档点 ID: 1f0b7c08-a97c-6fd0-8001-c5e93c9ab4b0
    # 步骤元数据 : {'source': 'loop', 'step': 1, 'parents': {}}
    # 父图状态值 : {'topic': 'ice cream and cats'}
    # 下一个节点 : ('generate_joke',)

    branch_config = checkpoint_to_branch.config
    # 创建一个新存档点,使用相同的checkpoint_id来更新存档状态
    new_branch = graph.update_state(branch_config, {"topic": "ice cream and dogs"})
    # 恢复图的执行,它现在将使用分支的存档点
    for event in graph.stream(None, new_branch, stream_mode="values"):
    print(event)
    # {'topic': 'ice cream and dogs', 'joke': 'Why did the cat sit on the ice cream cone?\n\nBecause it wanted to be a cool cat with a purr-fect scoop!'}
  • LangGraph 提供了以下几种主要的子图状态管理和访问方式。

    • 获取子图状态快照。在父图中使用 graph.get_state(config, subgraphs=True) 方法获取包括父图和所有子图的完整状态快照。其中,subgraphs=True 指示 LangGraph 引擎返回父图和所有子图的状态信息。graph.get_state() 方法会返回一个 StateSnapshot 对象,包含了父图的当前状态值 values,以及所有处于“Pending (挂起)”状态的子图任务 tasks。对于每个子图任务 task,task.state 属性都包含了该子图的状态快照(State Snapshot),因此,开发者可从父图中逐层访问嵌套子图的状态。

      1
      2
      3
      4
      5
      6
      7
      8
      state = graph.get_state(config, subgraphs=True) # 获取包含子图状态的完整状态快照
      print("Grandparent State:")
      print(state.values) # 打印父图的状态值
      print("---------------")
      print("Parent Graph State:")
      print(state.tasks[0].state.values) # 访问第一个子图任务,打印其状态值 print("---------------")
      print("Subgraph State:")
      print(state.tasks[0].state.tasks[0].state.values) # 访问第一个子图任务的第一个子任务,打印其状态值

      上面的示例可以逐层深入访问嵌套子图的状态信息,例如 state.tasks[0].state. tasks[0].state.values 可以获取孙子图 (Grandchild Subgraph) 的状态值。获取子图状 态快照是在父图中管理和访问子图状态的基础,也是实现后续子图状态更新、流程控制和人机交互的关键。

    • 更新子图状态。在父图中使用 graph.update_state(config, updates) 方法更新指定子图的状态值。更新时需要将子图的 config 作为第一个参数传入,指明目标子图,updates 参数则指定要更新的状态键值对。

      1
      2
      # 更新子图状态:修改子图状态中的city状态键的值为la
      graph.update_state(state.tasks[0].state.config, {"city": "la"}) # 将子图状态的 config 作为第一个参数传入,updates 参数指定要更新的状态键值对{"city": "la"}

      此示例演示了如何获取子图 weather_graph 的状态快照 state,将 state.tasks[0]. state.config 作为 config 参数, {“city”:”la”} 作为 updates 参数,将子图中的“city”状态值从“sf”修改为“la”。更新子图状态是实现人机环路交互和流程干预的重要手段,允许在父图执行过程中根据用户反馈或外部信息动态调整子图行为参数。

    • 以子图节点身份更新状态。除了直接更新子图状态,LangGraph 还支持“以子图节点身份”更新状态,模拟子图节点执行完成后的状态更新效果。调用 graph.update_state() 方法时,除了传入子图的 config 参数,还需要通过 as_node 参数指定要模拟的子图节点名称。

      1
      2
      3
      4
      5
      6
      # 以子图 weather_graph 的 weather_node 节点身份更新状态
      graph.update_state(
      state.tasks[0].state.config, # 将子图状态的 config 作为 graph.update_state() 的 config 参数传入
      {"messages": [{"role": "assistant", "content": "rainy"}]}, # updates 参数指定“虚假的”天气查询结果
      as_node="weather_node", # 指定要模拟的子图节点名称为 weather_node
      )

      此示例通过 as_node=’’weather_node’’ 参数,模拟了 weather_node 节点执行后返回 rainy 天气查询结果的状态更新效果。这种方式实现了更精细的子图状态控制,允许在父图中精确模拟子图内部的节点行为和状态变化,支持高级流程干预和定制需求。

  • 这种在任何图嵌套级别有选择性地分支和修改状态的能力,是微调智能体行为、 实验不同的子图特定参数,以及创建高度适应性 AI 智能体系统的基础。

  • LangGraph 持久化提供的时间旅行功能有以下优势。

    • 更短的调试周期:通过重放与分支集中检查,可快速定位并复现问题,从而缩短调试时长。
    • 迭代开发:支持测试更改而无须重新运行。
    • 增强行为理解:通过浏览历史记录和逐步执行过去的操作,开发者能更深入地理解 AI 智能体的工作原理、决策过程和状态转换。
    • 增强实验能力:支持受控实验,包括状态修改和替代输入,从而推动更具探索性和数据驱动的开发过程。
  • 需要强调的是,时间旅行功能完全基于持久化机制实现。如果没有保存和检索存档点的能力,浏览历史记录、重放执行和分支执行路径将无法实现。持久化为捕获和保存智能体在不同时间点的状态提供了必要的基础,使时间旅行成为现实。存档点器作为持久化引擎,支撑着所有时间旅行操作,确保历史状态得到可靠存储并可随时访问,以支持调试、分析和实验。

3、人机环路协作

  • 虽然自动化是 AI 智能体的核心优势,但在处理复杂或敏感场景时,完全自主的运行模式并不总是最佳选择,有时甚至不可行。人机环路 (Human In The Loop, HITL) 提供了一种有效范式,将 AI 智能体的优势与人类用户不可替代的判断力和监督力相结合,如图 4-4 所示。 LangGraph 框架深度集成了人机环路机制,使开发者能够构建兼具智能、协作性和透明性的 AI 智能体系统。通过在智能体工作流中合理引入人工输入环节,我们可以创建更可靠、更值得信赖且更符合用户需求和道德考量的 AI 智能体系统。

    image-20251102160524968
  • 如果一个 AI 智能体要做出关键决策、处理敏感操作(如金融交易或数据修改) 或生成需要人工验证的内容(如法律文档或营销文案),那么全自动操作可能风险过高或根本不符合实际需求。人机环路工作流通过在执行过程中设置存档点来解决这一问题,在这些存档点中引入人工审查、批准或反馈。这种协作方法不仅提高了 AI 智能体输出的可靠性和准确性,还增强了用户对 AI 系统的信任感和控制力。

  • 人机环路工作流的核心在于 interrupt() 函数,它允许开发者在指定的节点暂停图执行,向人类用户呈现相关信息,然后根据人工输入决定后续执行流程。本节将探讨 LangGraph 中人机环路协作的各方面,包括实用的设计模式、实现技术,以及将人类智能与 AI 智能体工作流融合的好处。我们将首先讨论静态断点这一基础执行暂停的机制,再探索更高级的人机环路设计模式。

3.1 静态断点:定义固定的人工干预点

  • 通过静态断点将人工干预点引入智能体工作流中,是一种很直接的实施人机环路的方法。静态断点在图编译时通过 graph.compile() 方法的 interrupt_before 和 interrupt_after 参数进行定义。这些参数允许开发者指定节点名称,在这些节点处图执行将自动暂停,从而在工作流中创建固定的人工交互点。

  • interrupt_before 参数接收节点名称列表。当节点名称包含在此列表中时,图执行将在进入并执行指定节点之前暂停。这适用于需要在执行特定节点的逻辑之前检查图 的状态和预期操作的场景。例如,执行 API 调用、执行数据库写入或生成潜在敏感内容之前暂停执行。

  • 当图的执行由于 interrupt_before 断点而暂停时,LangGraph 会发出 Interrupt 信号, 并保存当前图状态的存档点,执行将保持暂停状态,直到显式使用 Command 恢复执行。

    1
    2
    3
    4
    # 编译图,在action节点之前设置断点,当图执行到达 action节点时,它将暂停
    graph = graph_builder.compile(interrupt_before=["action"], checkpointer=MemorySaver())
    # 编译图,在assistant节点之后设置断点,当图执行完成assistant节点时,它将 暂停
    graph = graph_builder.compile(interrupt_after=["assistant"], checkpointer=MemorySaver())
  • 相应地,interrupt_after 参数同样接收节点名称列表。当节点名称包含在 interrupt_after 列表中的时候,图执行将在指定节点完成执行之后、继续执行下一个节点之前自动暂停。这适用于审查节点执行输出,以及在节点逻辑应用之后的状态更改的场景,例如,在 LLM 调用后暂停执行,以审查生成的内容,或在工具执行后检查工具输出,然后再让智能体做出进一步的决策。与 interrupt_before 类似,interrupt_ after 在触发断点时也发出 Interrupt 信号并保存图状态的存档点,从而暂停执行,直到显式使用 Command 恢复执行。

  • 使用 interrupt_before 和 interrupt_after 定义的静态断点提供了一种基本但有效的人工干预机制,特别适用于以下场景。

    • 简单审批步骤:在特定节点前后快速添加人工审查关卡。
    • 开发时调试:在开发阶段检查中间状态和输出。
    • 演示教学:用于演示人机环路概念的基本实现方式。
  • 需要注意的是,静态断点在需要动态判断干预时机的场景下灵活性不足。对于更复杂、需根据运行时条件决定干预时机的需求,interrupt() 函数能实现更灵活、上下文感知的人机环路工作流程。

3.2 人机环路的核心设计模式:基于操作的干预

  • LangGraph 中的人机环路工作流通常围绕三种基本类型的人工操作构建:审批、 编辑图状态和输入提供。这些操作代表了人类与 AI 智能体交互和指导的主要方式, 每种操作在增强控制、准确性和用户体验方面都具有独特的作用。

3.2.1 审批:验证智能体决策

  • 审批工作流旨在在 AI 智能体执行关键或不可逆操作之前引入人工监督。当智能体即将执行具有重大影响的操作时,此模式尤其重要,例如触发外部流程的 API 调用、 执行数据库修改或进行金融交易。通过预先设置人工审批步骤,可以确保人工审查员检查智能体的预期操作、评估其适当性并明确授权执行。这为可能产生重大影响的场景增加了至关重要的安全控制层。

  • 以金融投资 AI 智能体为例,在下达交易指令前,AI 智能体可以暂停执行,并将建议的交易详情(例如,证券、数量、价格等)呈现给人工财务顾问以供审查和批准。 只有在获得明确的人工批准后,AI 智能体才会继续执行交易。这一审批步骤有效降低了因算法错误或市场异常导致财务损失的风险。

  • 在 LangGraph 中实现审批工作流通常包括以下步骤。

    • 在关键操作节点前插入一个“人工审批节点”。
    • 在审批节点中使用 interrupt() 函数暂停执行,并向用户展示相关信息。
    • 提供“批准”或“拒绝”的操作选项。
    • 使用 Command 对象的 resume 参数将人工输入传递回 LangGraph。若批准, 则继续执行关键操作;若被拒绝,则图可能会采取替代路径(如修改操作或中止工作流)。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    from typing import Literal, TypedDict
    from langgraph.graph import StateGraph
    from langgraph.types import interrupt, Command
    from langgraph.checkpoint.memory import MemorySaver


    # 定义状态类型
    class State(TypedDict):
    topic: str
    proposed_action_details: str


    # 定义节点函数
    def propose_action(state: State) -> State:
    """ 提出一个需要人工审批的操作 """
    return {
    **state,
    "proposed_action_details": f" 基于主题 '{state['topic']}' 的操作提议"
    }


    def human_approval_node(state: State) -> Command[Literal["execute_action", "revise_action"]]:
    """ 在执行关键行动前请求人工审批 """
    approval_request = interrupt(
    {
    "question": "Approve the execution of the following action?",
    "action_details": state["proposed_action_details"] # 待审批操作的详情
    }
    )
    if approval_request["user_response"] == "approve": # 用户批准
    return Command(goto="execute_action") # 路由到执行操作的节点
    else: # 用户拒绝
    return Command(goto="revise_action") # 路由到修改操作的节点


    def execute_action(state: State) -> State:
    """ 执行已批准的操作 """
    return {
    **state,
    "proposed_action_details": f" 已执行操作 : {state['proposed_action_details']}"
    }


    def revise_action(state: State) -> State:
    """ 修改被拒绝的操作 """
    return {
    **state,
    "proposed_action_details": f" 修改后的操作 : {state['proposed_action_details']} ( 已调整 )"
    }


    # 构建图
    graph_builder = StateGraph(State)
    # 添加节点
    graph_builder.add_node("node_proposing_action", propose_action)
    graph_builder.add_node("human_approval", human_approval_node)
    graph_builder.add_node("execute_action", execute_action)
    graph_builder.add_node("revise_action", revise_action)
    # 添加边
    graph_builder.add_edge("node_proposing_action", "human_approval")
    graph_builder.add_edge("revise_action", "human_approval") # 修改后再发布审批请求
    # 设置入口节点
    graph_builder.set_entry_point("node_proposing_action")
    # 编译图
    graph = graph_builder.compile(checkpointer=MemorySaver())
    # 执行图
    config = {"configurable": {"thread_id": "approval_thread"}}
    graph.invoke({"topic": " 重要决策 "}, config=config)
    print(graph.get_state(config))
    print("=" * 20)
    # 恢复图执行,拒绝提议
    graph.invoke(Command(resume={"user_response": "deny"}), config=config)
    print(graph.get_state(config))
    print("=" * 20)
    # 恢复图执行,批准提议
    graph.invoke(Command(resume={"user_response": "approve"}), config=config)
    print(graph.get_state(config))

    # StateSnapshot(values={'topic': ' 重要决策 ', 'proposed_action_details': " 基于主题 ' 重要决策 ' 的操作提议"}, next=('human_approval',), config={'configurable': {'thread_id': 'approval_thread', 'checkpoint_ns': '', 'checkpoint_id': '1f0b7c60-9422-6de4-8001-39cd646a4da7'}}, metadata={'source': 'loop', 'step': 1, 'parents': {}}, created_at='2025-11-02T08:29:23.522498+00:00', parent_config={'configurable': {'thread_id': 'approval_thread', 'checkpoint_ns': '', 'checkpoint_id': '1f0b7c60-9421-646c-8000-339ba1fcd217'}}, tasks=(PregelTask(id='00f4d559-732f-3a40-1ad2-76bde2d5071a', name='human_approval', path=('__pregel_pull', 'human_approval'), error=None, interrupts=(Interrupt(value={'question': 'Approve the execution of the following action?', 'action_details': " 基于主题 ' 重要决策 ' 的操作提议"}, id='33641a80a24b3065031feea9dfbb9c36'),), state=None, result=None),), interrupts=(Interrupt(value={'question': 'Approve the execution of the following action?', 'action_details': " 基于主题 ' 重要决策 ' 的操作提议"}, id='33641a80a24b3065031feea9dfbb9c36'),))
    # ====================
    # StateSnapshot(values={'topic': ' 重要决策 ', 'proposed_action_details': " 修改后的操作 : 基于主题 ' 重要决策 ' 的操作提议 ( 已调整 )"}, next=('human_approval',), config={'configurable': {'thread_id': 'approval_thread', 'checkpoint_ns': '', 'checkpoint_id': '1f0b7c60-9428-6a1e-8003-b21cf6ff29a3'}}, metadata={'source': 'loop', 'step': 3, 'parents': {}}, created_at='2025-11-02T08:29:23.524861+00:00', parent_config={'configurable': {'thread_id': 'approval_thread', 'checkpoint_ns': '', 'checkpoint_id': '1f0b7c60-9427-69f2-8002-45a5ea47cde4'}}, tasks=(PregelTask(id='50b7295a-77b4-048c-c38a-7878cc0ef22c', name='human_approval', path=('__pregel_pull', 'human_approval'), error=None, interrupts=(Interrupt(value={'question': 'Approve the execution of the following action?', 'action_details': " 修改后的操作 : 基于主题 ' 重要决策 ' 的操作提议 ( 已调整 )"}, id='30343d1d67ff7a0fc0ce985d1f0b925c'),), state=None, result=None),), interrupts=(Interrupt(value={'question': 'Approve the execution of the following action?', 'action_details': " 修改后的操作 : 基于主题 ' 重要决策 ' 的操作提议 ( 已调整 )"}, id='30343d1d67ff7a0fc0ce985d1f0b925c'),))
    # ====================
    # StateSnapshot(values={'topic': ' 重要决策 ', 'proposed_action_details': " 已执行操作 : 修改后的操作 : 基于主题 ' 重要决策 ' 的操作提议 ( 已调整 )"}, next=(), config={'configurable': {'thread_id': 'approval_thread', 'checkpoint_ns': '', 'checkpoint_id': '1f0b7c60-942d-68de-8005-2fba4db53f15'}}, metadata={'source': 'loop', 'step': 5, 'parents': {}}, created_at='2025-11-02T08:29:23.526878+00:00', parent_config={'configurable': {'thread_id': 'approval_thread', 'checkpoint_ns': '', 'checkpoint_id': '1f0b7c60-942c-6f88-8004-59b36423c038'}}, tasks=(), interrupts=())
    image-20251102163058863

3.2.2 编辑图状态:纠正和优化智能体行为

  • 编辑图状态工作流允许人工用户通过审查和修改 AI 智能体的内部状态来直接干预其执行过程。这对于纠正错误、优化智能体行为或在运行时提供额外的上下文或约束特别有用。通过增加“编辑智能体思维”的功能,开发者可以精细地控制执行流程, 并将 AI 智能体引导至期望的结果,尤其是在初始轨迹偏离预期的情况下。

  • 以 AI 智能体生成长文档摘要为例,智能体可能会生成事实准确但遗漏了关键细节的摘要。工作流会在生成初始摘要后暂停编辑,将其呈现给人工编辑,并允许编辑直接修改摘要文本。智能体恢复执行时,将编辑后的摘要合并到后续流程中,例如将其作为回答问题的上下文。

  • 在 LangGraph 中实现状态编辑通常涉及以下步骤。

    • 在需要审查的位置插入“人工编辑节点”。
    • 在编辑节点中使用 interrupt() 函数暂停执行,并将相关的状态变量(或派生信息)呈现给用户审查。
    • 提供便于直接编辑状态信息的用户界面。
    • 在恢复执行时,使用人工编辑的值更新图状态。通过带有 update 参数的 Command 对象,将人工编辑后的状态注入回 LangGraph。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from langgraph.types import Interrupt, Command


    def human_editing_node(state: State):
    """ 对于已创建的内容申请人工审核 """
    current_summary = state["generated_summary"]
    edited_summary = Interrupt(
    {
    "task": "Review and edit the generated summary.",
    "current_summary": current_summary
    }
    )
    return {
    "generated_summary": edited_summary # 使用人工编辑的摘要更新状态
    }

    # 图定义
    graph_builder.add_node("human_editing", human_editing_node)
    graph_builder.add_edge("node_generating_summary", "human_editing")

    # 在图human_editing_node处中断后
    edited_text_from_human = get_user_edited_summary_from_ui() # 从用户界面获取编辑后的摘要
    # 恢复图执行,使用编辑后的摘要更新状态
    graph.invoke(
    Command(resume={"edited_text": edited_text_from_human}), # 将编辑后的文本作为恢复值传递
    config=thread_config)

3.2.3 输入提供:指导对话和收集上下文

  • 输入提供工作流通过人工交互收集 AI 智能体有效执行操作所需的额外信息或上下文。这种模式适用于对话式智能体或需要澄清需求的工作流,以及需要获取用户偏好或现实世界数据的工作流。通过在关键点请求人工输入,可以引导对话、消除歧义并确保智能体获得生成准确响应所需的必要信息。

  • 以旅行计划 AI 智能体为例说明。AI 智能体最初可能会询问用户目的地和旅行日期等基本信息。如果用户请求含糊不清(如“明年夏天我想去欧洲”),那么 AI 智能体会触发输入提供工作流进行澄清,询问用户诸如“您对欧洲的哪个地区感兴趣?” 或“您正在寻找哪种旅行体验?”等问题。用户的回答为 AI 智能体提供了优化旅行建议所需的关键上下文。

  • 输入提供工作流程具体如下所述。

    • 在需要用户澄清或额外信息的位置插入“人工输入节点”。
    • 在输入节点中使用 interrupt() 函数暂停执行,并将具体问题或信息请求呈现给用户。
    • 提供专门的用户界面,允许人工输入。
    • 在恢复执行时,使用人工提供的输入更新图状态。通过 Command 对象的 resume 参数,将人工输入传回 LangGraph。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    from langgraph.types import Interrupt, Command


    def human_input_node(state: State):
    """ 请求人工审核的节点 """
    user_location = interrupt(
    "To help me find the best travel options, could you please specify which part of Europe you'are interested in?"
    )
    return {
    "user_location": user_location # 使用用户提供的位置更新状态
    }


    # 图定义
    graph_builder.add_node("human_input", human_input_node)
    graph_builder.add_edge("node_initial_request", "human_input")
    # 在图 human_input_node 处中断后
    user_provided_location = get_user_input_from_ui() # 从用户界面获取用户的位置输入
    thread_config = {"configurable": {"thread_id": "my_thread_1"}}
    # 恢复图执行,传入用户提供的位置
    graph.invoke(
    Command(resume=user_provided_location), # 将用户位置作为恢复值传递
    config=thread_config)

3.3 interrupt()函数的技术细节

  • interrupt() 函数是 LangGraph 实现人机环路工作流的核心机制,能够在策略节点暂停图执行,向用户呈现信息,并根据人工输入无缝恢复执行。理解 interrupt() 的技术细节及其与 Command 对象的交互,对设计和实现高效的人机环路工作流至关重要。

  • 当在 LangGraph 节点中调用 interrupt() 函数时,将触发以下关键操作。

    • 图执行暂停:执行流程在调用 interrupt() 函数的代码位置立即暂停。至关重要的是,在显式使用 Command 恢复执行之前,当前节点或后续节点中的代码都不会继续执行。智能体的工作流完全停止,等待外部人工干预。
    • 发出中断信号:LangGraph 发出结构化 Interrupt 信号(在使用 .stream() 或 .astream() 时可作为流输出的一部分访问)。此信号携带着恢复工作流所需的基本信息,包含以下内容。
      • value:调用 interrupt() 时定义的有效负载。它可以是任何 JSON 可序列化的值, 例如,问题、要审查的文本、操作建议或相关状态数据。此值用于通知人类用户并指导其输入。
      • resumable=True:明确表示图执行可以从此暂停点恢复。对于由 interrupt() 函数创建的中断,此标志始终为 True,表明其在人机环路工作流中的作用。
      • ns:作为图中特定中断实例的唯一标识符,由 LangGraph 的引擎使用,以管 理恢复过程,尤其是在处理嵌套图或复杂工作流中的多个中断时,确保正确路由恢复。
      • when=’during’:表示中断发生在图执行过程中(当前 interrupt() 函数调用仅产生此类型中断)。
    • 图状态存档点保存:为了实现恢复,LangGraph 会暂停,并自动创建完整 的图状态存档点。此存档点包含中断时刻所有状态变量和执行上下文的快照,并保存 到持久化存储后端,确保在暂停期间不会丢失任何进度。与此图状态存档点关联的 checkpoint_id 会被包含在 Interrupt 信号的命名空间中,从而将暂停链接到持久化状态。
    • 控制权返回给用户:在发出 Interrupt 信号并保存存档点状态后, LangGraph 会将控制权交还给调用图执行的用户或应用程序代码(例如,调用 graph. stream() 或 graph.invoke() 的代码)。当使用 .stream() 或 .astream() 时, Interrupt 信号将作为数据流的一部分产生,应用程序检测暂停并提取 value 有效负载,并在用户界面中呈现给人类用户,等待人工输入。
  • 要在中断之后恢复图执行,必须使用与中断运行相同的 thread_id 再次调用图, 但需要提供 Command 对象作为输入。

  • Command 对象中的参数 resume 值将作为 interrupt() 调用在节点中(在其中触发中断)的返回值。这种机制允许将人工输入注入回 LangGraph 执行流程,使 AI 智能体能够根据人工指导继续处理。

  • 需要注意的是,在中断后恢复执行时,图执行会从调用 interrupt() 的节点的开头重新开始。该节点内 interrupt() 调用之前的所有代码,都将重新执行。因此,必须将任何具有副作用的代码(如 API 调用、数据库写入等)放在 interrupt() 调用之后,以避免在恢复时意外地重新执行这些操作。

  • 虽然 interrupt() 函数为人机环路工作流提供了强大而灵活的机制,但开发者应注意以下几个常见陷阱,以确保在 LangGraph 中实现正确而健壮的人机环路工作流。

    • 常见陷阱之一:忘记配置存档点器

      • 最基本的陷阱之一是在编译图时忘记配置存档点器。interrupt() 函数完全依赖 LangGraph 的持久化机制来保存暂停执行时的图状态。如果在 graph.compile() 期间未提供存档点器的情况下编译图,则 interrupt() 函数将无法按预期工作,并可能导致运 行时错误或不可预测的行为。图将无法在中断点保存其状态,因此,恢复将是不可能的。

      • 解决方案:始终确保在编译任何使用 interrupt() 函数的 LangGraph 时提供有效的存档点器实例(例如,MemorySaver、SqliteSaver、PostgresSaver、MongoDBSaver)。

        1
        2
        3
        4
        # 正确:使用存档点器编译图
        graph = graph_builder.compile(checkpointer=MemorySaver())
        # 不正确:在没有存档点器的情况下编译图, interrupt() 将无法正常工作
        graph_correct = graph_builder.compile() # 缺少存档点器!
    • 常见陷阱之二:不正确的 thread_id 管理

      • 人机环路工作流本质上依赖线程的概念,以维护不同对话或工作流执行的独立 执行上下文和持久状态。常见的错误是不正确地管理 thread_id,尤其是在恢复中断执行时。如果使用与中断运行时不同的 thread_id 恢复图,那么 LangGraph 将无法找到 正确的存档点,导致恢复失败。同样,如果在多回合对话的整个生命周期中未持续传 递正确的 thread_id,那么可能会丢失上下文和持久化。
      • 解决方案:
        • 确保在单个对话线程或工作流执行中的所有交互中始终使用相同的 thread_id。存储并重用为给定用户会话生成的初始 thread_id。
        • 恢复中断的图时,请注意不要随意创建新的 thread_id。当使用 Command 恢复时,传递与中断执行关联的原始 thread_config。
    • 常见陷阱之三:恢复时 Command 对象结构不正确

      • Command 对象是恢复中断的 LangGraph 执行并将人工输入传递回工作流的指定机制。使用结构不正确的 Command 对象是一个常见的陷阱。最常见的错误是错误配 置或省略 resume 参数、为其提供错误的数据类型或在 resume 中错误地嵌套数据。
      • 解决方案:
        • 在中断后恢复图执行时,必须使用带有 resume 参数的 Command 对象;确保传递给 Command(resume=value) 的值的数据类型和结构与 human_node 函数期望接收的 interrupt() 返回值的类型和结构相匹配。请参阅节点代码以确定预 期的数据格式。
        • 在调试时,仔细检查 .stream() 或 .astream() 发出的 Interrupt 信号,其 value 字段通常提供有关 resume 值的预期格式的线索。
    • 常见陷阱之四:节点外部状态突变

      • update_state() 方法和 Command 对象的 update 参数虽然可用于修改图状态,但在节点状态突变是一个常见错误,尤其是在人机环路工作流中。状态突变,特别是与中断恢复机制结合使用时,可能导致在并发或异步应用程序中出现竞争条件、状态不一 致等不可预测的行为。
      • 解决方案:
        • 图状态的修改应主要在 LangGraph 节点函数内部进行。
        • 当需要从外部修改状态(如人机环路场景)时,应使用 graph.update_state() 方法或 Command 对象的 update 参数,以确保状态更新已被正确同步并集成到 LangGraph 执行流程中。
        • 不应依赖全局变量或外部副作用直接修改图状态。这种做法会破坏 LangGraph 提供的受控状态管理,并可能引发难以调试的问题。
    • 常见陷阱之五:多重中断节点的复杂性

      • 在单个节点内使用多个 interrupt() 调用,虽然有助于实现诸如验证人工输入之类的模式,但如果处理不当,则会引入复杂性和潜在问题。恢复值与节点内 interrupt() 函数调用之间的索引匹配关系对节点代码结构的修改很敏感。
      • 解决方案:
        • 若节点包含多个 interrupt() 调用,则应避免在执行之间动态添加、删除或重新排序这些调用。要尽可能保持节点的静态结构。
        • 修改包含多个 interrupt() 调用的节点时,务必注意任何可能影响 interrupt() 调用的顺序或数量的更改,以免破坏索引恢复值匹配。
        • 对于需要动态中断处理的复杂场景,建议将节点逻辑重构为多个较小节点, 每个节点都有单个 interrupt() 调用,并通过条件边连接。这可以提高代码清晰 度并减少索引相关问题风险。

3.4 人机协作是构建信任和控制的关键

  • 人机环路工作流不仅仅是 AI 智能体的后备机制,更是一种强大的设计范式,可以实现更协作、更透明和以用户为中心的 AI 应用开发。interrupt() 函数提供了一种灵活且符合工程学原理的方式,支持在关键决策点进行人工干预。

  • 通过采用人机环路工作流,开发者可以构建具备以下特性的 AI 智能体。

    • 更可靠:人工监督降低了错误风险,确保关键任务执行的准确性。
    • 更值得信赖:透明度和人工控制增强了用户对 AI 系统的信任基础。
    • 更具适应性:人工输入使智能体能够处理模棱两可的情况、适应不断变化的需求并从人工反馈中学习。
    • 更以用户为中心:人机环路工作流实现了更具交互性和协作性的用户体验, 并在需要时确保人工主导权。
  • 无论是用于批准敏感操作、编辑生成的内容,还是收集澄清信息,人机环路工作流都是 LangGraph 开发者工具包中的宝贵工具。掌握这些模式使开发者能够构建不仅智能且自动化,而且及时响应人工指导和监督的 AI 智能体。

  • 人机环路的三种操作如表所示。

    人机环路操作 描述 LangGraph实现技术 主要优势
    审批 人工验证并授权智能体在执行前提出的操作 专用节点中的 interrupt() 函数,基于用户响应进行路由 增强安全性,降低意外或有害操作的风险
    编辑图状态 人工审查并修改智能体的内部状态,以纠 正错误或优化行为 专用节点中的 interrupt() 函数, 带有人工编辑状态的 ommand(resume=…) 精细控制,纠正智能体的错误降低,提高输出质量
    输入提供 智能体显式请求并收集来自人类用户的澄清信息或上下文 专用节点中的 interrupt() 函数, 带有用户输入的 ommand(resume=…) 指导对话,消除歧义,更丰富的上下文理解