类型 (Types)
langgraph.types ¶
| 函数 | 描述 |
|---|---|
中断 |
从节点内部通过可恢复的异常来中断图。 |
StreamMode 模块属性 ¶
StreamMode = Literal[
"values", "updates", "checkpoints", "tasks", "debug", "messages", "custom"
]
stream 方法应如何发出输出。
"values":在每个步骤后发出状态中的所有值,包括中断。与函数式 API 一起使用时,值在工作流结束时发出一次。"updates":仅在每个步骤后发出节点或任务返回的节点或任务名称及更新。如果在同一步骤中进行了多次更新(例如运行了多个节点),则这些更新将分别发出。"custom":使用StreamWriter从节点或任务内部发出自定义数据。"messages":逐个标记地发出 LLM 消息,同时附带节点或任务内部任何 LLM 调用的元数据。"checkpoints":在创建检查点时发出一个事件,格式与get_state()返回的格式相同。"tasks":在任务开始和结束时发出事件,包括其结果和错误。"debug":为调试目的发出"checkpoints"和"tasks"事件。
StreamWriter 模块属性 ¶
接受单个参数并将其写入输出流的 Callable。如果作为关键字参数请求,总是会注入到节点中,但在不使用 stream_mode="custom" 时,它是一个空操作。
RetryPolicy ¶
CachePolicy 数据类 ¶
Interrupt 数据类 ¶
PregelTask ¶
StateSnapshot ¶
Send ¶
发送到图中特定节点的消息或数据包。
`Send` 类在 `StateGraph` 的条件边中使用,以便在下一步骤中用自定义状态动态调用一个节点。
重要的是,发送的状态可以与核心图的状态不同,从而实现灵活和动态的工作流管理。
一个这样的例子是“map-reduce”工作流,其中您的图使用不同的状态并行调用同一个节点多次,然后在将结果聚合回主图的状态之前。
| 属性 | 描述 |
|---|---|
节点 |
要将消息发送到的目标节点的名称。
类型: |
参数 |
要发送给目标节点的状态或消息。
类型: |
示例
from typing import Annotated
from langgraph.types import Send
from langgraph.graph import END, START
from langgraph.graph import StateGraph
import operator
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], operator.add]
def continue_to_jokes(state: OverallState):
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
builder = StateGraph(OverallState)
builder.add_node("generate_joke", lambda state: {"jokes": [f"Joke about {state['subject']}"]})
builder.add_conditional_edges(START, continue_to_jokes)
builder.add_edge("generate_joke", END)
graph = builder.compile()
# Invoking with two subjects results in a generated joke for each
graph.invoke({"subjects": ["cats", "dogs"]})
# {'subjects': ['cats', 'dogs'], 'jokes': ['Joke about cats', 'Joke about dogs']}
| 方法 | 描述 |
|---|---|
__init__ |
初始化 `Send` 类的新实例。 |
Command 数据类 ¶
基类:Generic[N], ToolOutputMixin
一个或多个用于更新图状态并向节点发送消息的命令。
| 参数 | 描述 |
|---|---|
图
|
要发送命令的图。支持的值有
类型: |
更新
|
要应用于图状态的更新。
类型: |
恢复
|
用于恢复执行的值。与 `interrupt()` 一起使用。可以是以下之一
|
前往
|
可以是以下之一
|
Overwrite 数据类 ¶
绕过 reducer,将包装的值直接写入 `BinaryOperatorAggregate` 通道。
在单个超步骤中为同一个通道接收多个 `Overwrite` 值将引发 `InvalidUpdateError`。
示例
from typing import Annotated
import operator
from langgraph.graph import StateGraph
from langgraph.types import Overwrite
class State(TypedDict):
messages: Annotated[list, operator.add]
def node_a(state: TypedDict):
# Normal update: uses the reducer (operator.add)
return {"messages": ["a"]}
def node_b(state: State):
# Overwrite: bypasses the reducer and replaces the entire value
return {"messages": Overwrite(value=["b"])}
builder = StateGraph(State)
builder.add_node("node_a", node_a)
builder.add_node("node_b", node_b)
builder.set_entry_point("node_a")
builder.add_edge("node_a", "node_b")
graph = builder.compile()
# Without Overwrite in node_b, messages would be ["START", "a", "b"]
# With Overwrite, messages is just ["b"]
result = graph.invoke({"messages": ["START"]})
assert result == {"messages": ["b"]}
interrupt ¶
从节点内部通过可恢复的异常来中断图。
`interrupt` 函数通过暂停图的执行并向客户端呈现一个值来实现人机协同工作流。这个值可以传达上下文或请求恢复执行所需的输入。
在给定的节点中,此函数的第一次调用会引发 `GraphInterrupt` 异常,从而停止执行。提供的 `value` 会包含在异常中并发送给执行图的客户端。
恢复图的客户端必须使用 `Command` 原语来为中断指定一个值并继续执行。图会从节点的开头恢复,重新执行所有逻辑。
如果一个节点包含多个 `interrupt` 调用,LangGraph 会根据它们在节点中的顺序将恢复值与中断进行匹配。这个恢复值列表的作用域限定于执行该节点的特定任务,并且不在任务之间共享。
要使用 `interrupt`,您必须启用一个检查点(checkpointer),因为该功能依赖于持久化图的状态。
示例
import uuid
from typing import Optional
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
class State(TypedDict):
"""The graph state."""
foo: str
human_value: Optional[str]
"""Human value will be updated using an interrupt."""
def node(state: State):
answer = interrupt(
# This value will be sent to the client
# as part of the interrupt information.
"what is your age?"
)
print(f"> Received an input from the interrupt: {answer}")
return {"human_value": answer}
builder = StateGraph(State)
builder.add_node("node", node)
builder.add_edge(START, "node")
# A checkpointer must be enabled for interrupts to work!
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
config = {
"configurable": {
"thread_id": uuid.uuid4(),
}
}
for chunk in graph.stream({"foo": "abc"}, config):
print(chunk)
# > {'__interrupt__': (Interrupt(value='what is your age?', id='45fda8478b2ef754419799e10992af06'),)}
command = Command(resume="some input from a human!!!")
for chunk in graph.stream(Command(resume="some input from a human!!!"), config):
print(chunk)
# > Received an input from the interrupt: some input from a human!!!
# > {'node': {'human_value': 'some input from a human!!!'}}
| 参数 | 描述 |
|---|---|
value
|
当图被中断时,向客户端呈现的值。
类型: |
| 返回 | 描述 |
|---|---|
任意
|
在同一节点内(准确地说是同一任务内)的后续调用中,返回第一次调用时提供的值
类型: |
| 引发 | 描述 |
|---|---|
GraphInterrupt
|
在节点内的第一次调用时,暂停执行并向客户端呈现提供的值。 |