跳转到内容

类型 (Types)

langgraph.types

函数 描述
中断

从节点内部通过可恢复的异常来中断图。

All 模块属性

All = Literal['*']

特殊值,表示图应在所有节点上中断。

StreamMode 模块属性

StreamMode = Literal[
    "values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
]

stream 方法应如何发出输出。

  • "values":在每个步骤后发出状态中的所有值,包括中断。与函数式 API 一起使用时,值在工作流结束时发出一次。
  • "updates":仅在每个步骤后发出节点或任务返回的节点或任务名称及更新。如果在同一步骤中进行了多次更新(例如运行了多个节点),则这些更新将分别发出。
  • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
  • "messages":逐个标记地发出 LLM 消息,同时附带节点或任务内部任何 LLM 调用的元数据。
  • "checkpoints":在创建检查点时发出一个事件,格式与 get_state() 返回的格式相同。
  • "tasks":在任务开始和结束时发出事件,包括其结果和错误。
  • "debug":为调试目的发出 "checkpoints""tasks" 事件。

StreamWriter 模块属性

StreamWriter = Callable[[Any], None]

接受单个参数并将其写入输出流的 Callable。如果作为关键字参数请求,总是会注入到节点中,但在不使用 stream_mode="custom" 时,它是一个空操作。

RetryPolicy

基类:NamedTuple

用于重试节点的配置。

在版本 0.2.24 中添加

initial_interval 类属性 实例属性

initial_interval: float = 0.5

在第一次重试发生前必须经过的时间量。单位为秒。

backoff_factor 类属性 实例属性

backoff_factor: float = 2.0

每次重试后间隔增加的乘数。

max_interval 类属性 实例属性

max_interval: float = 128.0

两次重试之间可能经过的最长时间。单位为秒。

max_attempts 类属性 实例属性

max_attempts: int = 3

放弃前尝试的最大次数,包括第一次。

jitter 类属性 实例属性

jitter: bool = True

是否在重试间隔之间添加随机抖动。

retry_on 类属性 实例属性

retry_on: type[Exception] | Sequence[type[Exception]] | Callable[[Exception], bool] = (
    default_retry_on
)

应触发重试的异常类列表,或对于应触发重试的异常返回 `True` 的可调用对象。

CachePolicy 数据类

基类:Generic[KeyFuncT]

用于缓存节点的配置。

key_func 类属性 实例属性

key_func: KeyFuncT = default_cache_key

从节点输入生成缓存键的函数。默认为使用 pickle 对输入进行哈希处理。

ttl 类属性 实例属性

ttl: int | None = None

缓存条目的存活时间,单位为秒。如果为 `None`,则条目永不过期。

Interrupt 数据类

关于在节点中发生的中断的信息。

在版本 0.2.24 中添加

v0.4.0 版本中变更

  • `interrupt_id` 被引入作为一个属性

v0.6.0 版本中变更

以下属性已被移除

  • ns
  • when
  • resumable
  • `interrupt_id`,已弃用,推荐使用 `id`

id 实例属性

id: str

中断的 ID。可用于直接恢复中断。

value 实例属性

value: Any = value

与中断关联的值。

PregelTask

基类:NamedTuple

一个 Pregel 任务。

StateSnapshot

基类:NamedTuple

图在某一步骤开始时的状态快照。

values 实例属性

values: dict[str, Any] | Any

通道的当前值。

next 实例属性

next: tuple[str, ...]

在此步骤中为每个任务要执行的节点的名称。

config 实例属性

用于获取此快照的配置。

metadata 实例属性

metadata: CheckpointMetadata | None

与此快照关联的元数据。

created_at 实例属性

created_at: str | None

快照创建的时间戳。

parent_config 实例属性

parent_config: RunnableConfig | None

用于获取父快照的配置(如果有的话)。

tasks 实例属性

tasks: tuple[PregelTask, ...]

在此步骤中要执行的任务。如果已经尝试过,可能包含错误。

interrupts 实例属性

interrupts: tuple[Interrupt, ...]

在此步骤中发生的、待解决的中断。

Send

发送到图中特定节点的消息或数据包。

`Send` 类在 `StateGraph` 的条件边中使用,以便在下一步骤中用自定义状态动态调用一个节点。

重要的是,发送的状态可以与核心图的状态不同,从而实现灵活和动态的工作流管理。

一个这样的例子是“map-reduce”工作流,其中您的图使用不同的状态并行调用同一个节点多次,然后在将结果聚合回主图的状态之前。

属性 描述
节点

要将消息发送到的目标节点的名称。

类型: str

参数

要发送给目标节点的状态或消息。

类型: Any

示例

from typing import Annotated
from langgraph.types import Send
from langgraph.graph import END, START
from langgraph.graph import StateGraph
import operator

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], operator.add]

def continue_to_jokes(state: OverallState):
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

builder = StateGraph(OverallState)
builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()

# Invoking with two subjects results in a generated joke for each
graph.invoke({"subjects": ["cats", "dogs"]})
# {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
方法 描述
__init__

初始化 `Send` 类的新实例。

__init__

__init__(node: str, arg: Any) -> None

初始化 `Send` 类的新实例。

参数 描述
节点

要将消息发送到的目标节点的名称。

类型: str

参数

要发送给目标节点的状态或消息。

类型: Any

Command 数据类

基类:Generic[N], ToolOutputMixin

一个或多个用于更新图状态并向节点发送消息的命令。

参数 描述

要发送命令的图。支持的值有

  • None:当前图
  • Command.PARENT:最近的父图

类型: str | None 默认值: None

更新

要应用于图状态的更新。

类型: Any | None 默认值: None

恢复

用于恢复执行的值。与 `interrupt()` 一起使用。可以是以下之一

  • 中断 ID 到恢复值的映射
  • 用于恢复下一次中断的单个值

类型: dict[str, Any] | Any | None 默认值: None

前往

可以是以下之一

  • 接下来要导航到的节点的名称(属于指定 `graph` 的任何节点)
  • 接下来要导航到的一系列节点名称
  • `Send` 对象(使用提供的输入执行一个节点)
  • 一系列 `Send` 对象

类型: Send | Sequence[Send | N] | N 默认值: ()

Overwrite 数据类

绕过 reducer,将包装的值直接写入 `BinaryOperatorAggregate` 通道。

在单个超步骤中为同一个通道接收多个 `Overwrite` 值将引发 `InvalidUpdateError`。

示例

from typing import Annotated
import operator
from langgraph.graph import StateGraph
from langgraph.types import Overwrite

class State(TypedDict):
    messages: Annotated[list, operator.add]

def node_a(state: TypedDict):
    # Normal update: uses the reducer (operator.add)
    return {"messages": ["a"]}

def node_b(state: State):
    # Overwrite: bypasses the reducer and replaces the entire value
    return {"messages": Overwrite(value=["b"])}

builder = StateGraph(State)
builder.add_node("node_a", node_a)
builder.add_node("node_b", node_b)
builder.set_entry_point("node_a")
builder.add_edge("node_a", "node_b")
graph = builder.compile()

# Without Overwrite in node_b, messages would be ["START", "a", "b"]
# With Overwrite, messages is just ["b"]
result = graph.invoke({"messages": ["START"]})
assert result == {"messages": ["b"]}

value 实例属性

value: Any

要直接写入通道的值,绕过任何 reducer。

interrupt

interrupt(value: Any) -> Any

从节点内部通过可恢复的异常来中断图。

`interrupt` 函数通过暂停图的执行并向客户端呈现一个值来实现人机协同工作流。这个值可以传达上下文或请求恢复执行所需的输入。

在给定的节点中,此函数的第一次调用会引发 `GraphInterrupt` 异常,从而停止执行。提供的 `value` 会包含在异常中并发送给执行图的客户端。

恢复图的客户端必须使用 `Command` 原语来为中断指定一个值并继续执行。图会从节点的开头恢复,重新执行所有逻辑。

如果一个节点包含多个 `interrupt` 调用,LangGraph 会根据它们在节点中的顺序将恢复值与中断进行匹配。这个恢复值列表的作用域限定于执行该节点的特定任务,并且不在任务之间共享。

要使用 `interrupt`,您必须启用一个检查点(checkpointer),因为该功能依赖于持久化图的状态。

示例

import uuid
from typing import Optional
from typing_extensions import TypedDict

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command


class State(TypedDict):
    """The graph state."""

    foo: str
    human_value: Optional[str]
    """Human value will be updated using an interrupt."""


def node(state: State):
    answer = interrupt(
        # This value will be sent to the client
        # as part of the interrupt information.
        "what is your age?"
    )
    print(f"> Received an input from the interrupt: {answer}")
    return {"human_value": answer}


builder = StateGraph(State)
builder.add_node("node", node)
builder.add_edge(START, "node")

# A checkpointer must be enabled for interrupts to work!
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

config = {
    "configurable": {
        "thread_id": uuid.uuid4(),
    }
}

for chunk in graph.stream({"foo": "abc"}, config):
    print(chunk)

# > {'__interrupt__': (Interrupt(value='what is your age?', id='45fda8478b2ef754419799e10992af06'),)}

command = Command(resume="some input from a human!!!")

for chunk in graph.stream(Command(resume="some input from a human!!!"), config):
    print(chunk)

# > Received an input from the interrupt: some input from a human!!!
# > {'node': {'human_value': 'some input from a human!!!'}}
参数 描述
value

当图被中断时,向客户端呈现的值。

类型: Any

返回 描述
任意

在同一节点内(准确地说是同一任务内)的后续调用中,返回第一次调用时提供的值

类型: Any

引发 描述
GraphInterrupt

在节点内的第一次调用时,暂停执行并向客户端呈现提供的值。

© . This site is unofficial and not affiliated with LangChain, Inc.