图 (Graphs)
langgraph.graph.state.StateGraph ¶
Bases: Generic[StateT, ContextT, InputT, OutputT]
一种图,其节点通过读取和写入共享状态进行通信。每个节点的签名为 State -> Partial
每个状态键都可以选择性地使用一个归约函数进行注解,该函数将用于聚合从多个节点接收到的该键的值。归约函数的签名为 (Value, Value) -> Value。
| 参数 | 描述 |
|---|---|
state_schema
|
定义状态的模式类。
类型: |
context_schema
|
定义运行时上下文的模式类。使用它向您的节点暴露不可变的上下文数据,例如
类型: |
input_schema
|
定义图输入的模式类。
类型: |
output_schema
|
定义图输出的模式类。
类型: |
config_schema 已弃用
config_schema 参数在 v0.6.0 中已弃用,并将在 v2.0.0 中移除支持。请改用 context_schema 来指定运行范围内的上下文模式。
示例
from langchain_core.runnables import RunnableConfig
from typing_extensions import Annotated, TypedDict
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph
from langgraph.runtime import Runtime
def reducer(a: list, b: int | None) -> list:
if b is not None:
return a + [b]
return a
class State(TypedDict):
x: Annotated[list, reducer]
class Context(TypedDict):
r: float
graph = StateGraph(state_schema=State, context_schema=Context)
def node(state: State, runtime: Runtime[Context]) -> dict:
r = runtime.context.get("r", 1.0)
x = state["x"][-1]
next_value = x * r * (1 - x)
return {"x": next_value}
graph.add_node("A", node)
graph.set_entry_point("A")
graph.set_finish_point("A")
compiled = graph.compile()
step1 = compiled.invoke({"x": 0.5}, context={"r": 3.0})
# {'x': [0.5, 0.75]}
| 方法 | 描述 |
|---|---|
add_node |
向状态图中添加一个新节点。 |
add_edge |
添加一条从起始节点(或起始节点列表)到结束节点的有向边。 |
add_conditional_edges |
添加一条从起始节点到任意数量目标节点的条件边。 |
add_sequence |
添加一个将按所提供顺序执行的节点序列。 |
compile |
将状态图编译成一个 |
add_node ¶
add_node(
node: str | StateNode[NodeInputT, ContextT],
action: StateNode[NodeInputT, ContextT] | None = None,
*,
defer: bool = False,
metadata: dict[str, Any] | None = None,
input_schema: type[NodeInputT] | None = None,
retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
cache_policy: CachePolicy | None = None,
destinations: dict[str, str] | tuple[str, ...] | None = None,
**kwargs: Unpack[DeprecatedKwargs],
) -> Self
向状态图中添加一个新节点。
| 参数 | 描述 |
|---|---|
node
|
该节点将运行的函数或 runnable。如果提供的是字符串,它将被用作节点名称,而 action 将被用作函数或 runnable。
类型: |
action
|
与节点关联的动作。如果
类型: |
defer
|
是否将节点的执行推迟到运行即将结束时。
类型: |
metadata
|
与节点关联的元数据。 |
input_schema
|
节点的输入模式。(默认:图的状态模式)
类型: |
retry_policy
|
节点的重试策略。如果提供了一个序列,将应用第一个匹配的策略。
类型: |
cache_policy
|
节点的缓存策略。
类型: |
destinations
|
指示节点可以路由到的目标。这对于返回 注意 这仅用于图的渲染,对图的执行没有任何影响。 |
示例
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableConfig
from langgraph.graph import START, StateGraph
class State(TypedDict):
x: int
def my_node(state: State, config: RunnableConfig) -> State:
return {"x": state["x"] + 1}
builder = StateGraph(State)
builder.add_node(my_node) # node name will be 'my_node'
builder.add_edge(START, "my_node")
graph = builder.compile()
graph.invoke({"x": 1})
# {'x': 2}
自定义名称
| 返回 | 描述 |
|---|---|
Self
|
状态图的实例,允许方法链式调用。
类型: |
add_edge ¶
add_conditional_edges ¶
add_conditional_edges(
source: str,
path: Callable[..., Hashable | Sequence[Hashable]]
| Callable[..., Awaitable[Hashable | Sequence[Hashable]]]
| Runnable[Any, Hashable | Sequence[Hashable]],
path_map: dict[Hashable, str] | list[str] | None = None,
) -> Self
添加一条从起始节点到任意数量目标节点的条件边。
| 参数 | 描述 |
|---|---|
source
|
起始节点。此条件边将在退出此节点时运行。
类型: |
path
|
确定下一个节点或多个节点的可调用对象。如果未指定
类型: |
path_map
|
可选的路径到节点名称的映射。如果省略, |
| 返回 | 描述 |
|---|---|
Self
|
图的实例,允许方法链式调用。
类型: |
警告
如果在 path 函数的返回值上没有类型提示(例如,-> Literal["foo", "__end__"]:)或没有 path_map,图的可视化将假定该边可以转换到图中的任何节点。
add_sequence ¶
add_sequence(
nodes: Sequence[
StateNode[NodeInputT, ContextT] | tuple[str, StateNode[NodeInputT, ContextT]]
],
) -> Self
添加一个将按所提供顺序执行的节点序列。
| 参数 | 描述 |
|---|---|
nodes
|
一个由
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果序列为空。 |
ValueError
|
如果序列包含重复的节点名称。 |
| 返回 | 描述 |
|---|---|
Self
|
状态图的实例,允许方法链式调用。
类型: |
compile ¶
compile(
checkpointer: Checkpointer = None,
*,
cache: BaseCache | None = None,
store: BaseStore | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
debug: bool = False,
name: str | None = None,
) -> CompiledStateGraph[StateT, ContextT, InputT, OutputT]
将状态图编译成一个 CompiledStateGraph 对象。
编译后的图实现了 Runnable 接口,可以被调用、流式传输、批处理和异步运行。
| 参数 | 描述 |
|---|---|
checkpointer
|
一个检查点保存器对象或标志。如果提供,此
类型: |
interrupt_before
|
一个可选的在执行前中断的节点名称列表。 |
interrupt_after
|
一个可选的在执行后中断的节点名称列表。 |
debug
|
一个指示是否启用调试模式的标志。
类型: |
name
|
用于已编译图的名称。
类型: |
| 返回 | 描述 |
|---|---|
CompiledStateGraph
|
已编译的状态图。
类型: |
langgraph.graph.state.CompiledStateGraph ¶
Bases: Pregel[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]
| 方法 | 描述 |
|---|---|
stream |
为单个输入流式传输图的步骤。 |
astream |
为单个输入异步流式传输图的步骤。 |
invoke |
使用单个输入和配置运行图。 |
ainvoke |
在单个输入上异步调用图。 |
get_state |
获取图的当前状态。 |
aget_state |
获取图的当前状态。 |
get_state_history |
获取图的状态历史记录。 |
aget_state_history |
异步获取图的状态历史记录。 |
update_state |
使用给定值更新图的状态,就像它们来自 |
aupdate_state |
异步使用给定值更新图的状态,就像它们来自 |
bulk_update_state |
批量应用对图状态的更新。需要设置检查点器。 |
abulk_update_state |
异步批量应用对图状态的更新。需要设置检查点器。 |
get_graph |
返回计算图的可绘制表示。 |
aget_graph |
返回计算图的可绘制表示。 |
get_subgraphs |
获取图的子图。 |
aget_subgraphs |
获取图的子图。 |
with_config |
创建具有更新配置的 Pregel 对象的副本。 |
stream ¶
stream(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
**kwargs: Unpack[DeprecatedKwargs],
) -> Iterator[dict[str, Any] | Any]
为单个输入流式传输图的步骤。
| 参数 | 描述 |
|---|---|
输入
|
图的输入。
类型: |
配置
|
用于运行的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
流式输出的模式,默认为
您可以将一个列表作为 更多详情请参阅 LangGraph 流式处理指南。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
要流式传输的键,默认为所有非上下文通道。 |
interrupt_before
|
在这些节点前中断,默认为图中的所有节点。 |
interrupt_after
|
在这些节点后中断,默认为图中的所有节点。 |
durability
|
图执行的持久性模式,默认为
类型: |
subgraphs
|
是否从子图内部流式传输事件,默认为 False。如果为 更多详情请参阅 LangGraph 流式处理指南。
类型: |
| YIELDS | 描述 |
|---|---|
dict[str, Any] | Any
|
图中每一步的输出。输出形状取决于 |
astream async ¶
astream(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
**kwargs: Unpack[DeprecatedKwargs],
) -> AsyncIterator[dict[str, Any] | Any]
为单个输入异步流式传输图的步骤。
| 参数 | 描述 |
|---|---|
输入
|
图的输入。
类型: |
配置
|
用于运行的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
流式输出的模式,默认为
您可以将一个列表作为 更多详情请参阅 LangGraph 流式处理指南。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
要流式传输的键,默认为所有非上下文通道。 |
interrupt_before
|
在这些节点前中断,默认为图中的所有节点。 |
interrupt_after
|
在这些节点后中断,默认为图中的所有节点。 |
durability
|
图执行的持久性模式,默认为
类型: |
subgraphs
|
是否从子图内部流式传输事件,默认为 False。如果为 更多详情请参阅 LangGraph 流式处理指南。
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[dict[str, Any] | Any]
|
图中每一步的输出。输出形状取决于 |
invoke ¶
invoke(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
使用单个输入和配置运行图。
| 参数 | 描述 |
|---|---|
输入
|
图的输入数据。它可以是字典或任何其他类型。
类型: |
配置
|
图运行的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
图运行的流模式。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
从图运行中检索的输出键。 |
interrupt_before
|
在这些节点之前中断图运行。 |
interrupt_after
|
在这些节点之后中断图运行。 |
durability
|
图执行的持久性模式,默认为
类型: |
**kwargs
|
传递给图运行的其他关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any] | Any
|
图运行的输出。如果 |
dict[str, Any] | Any
|
如果 |
ainvoke async ¶
ainvoke(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
在单个输入上异步调用图。
| 参数 | 描述 |
|---|---|
输入
|
计算的输入数据。它可以是字典或任何其他类型。
类型: |
配置
|
计算的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
计算的流模式。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
结果中要包含的输出键。 |
interrupt_before
|
在这些节点之前中断。 |
interrupt_after
|
在这些节点之后中断。 |
durability
|
图执行的持久性模式,默认为
类型: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any] | Any
|
计算的结果。如果 |
dict[str, Any] | Any
|
如果 |
get_state ¶
get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
获取图的当前状态。
aget_state async ¶
aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
获取图的当前状态。
get_state_history ¶
get_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
) -> Iterator[StateSnapshot]
获取图的状态历史记录。
aget_state_history async ¶
aget_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
) -> AsyncIterator[StateSnapshot]
异步获取图的状态历史记录。
update_state ¶
update_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
task_id: str | None = None,
) -> RunnableConfig
使用给定值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确)。
aupdate_state async ¶
aupdate_state(
config: RunnableConfig,
values: dict[str, Any] | Any,
as_node: str | None = None,
task_id: str | None = None,
) -> RunnableConfig
异步使用给定值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确)。
bulk_update_state ¶
bulk_update_state(
config: RunnableConfig, supersteps: Sequence[Sequence[StateUpdate]]
) -> RunnableConfig
批量应用对图状态的更新。需要设置检查点器。
| 参数 | 描述 |
|---|---|
配置
|
要应用更新的配置。
类型: |
supersteps
|
一个超步列表,每个超步包含一个要顺序应用于图状态的更新列表。每个更新都是 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果未设置检查点器或未提供更新。 |
InvalidUpdateError
|
如果提供了无效的更新。 |
| 返回 | 描述 |
|---|---|
RunnableConfig
|
更新后的配置。
类型: |
abulk_update_state async ¶
abulk_update_state(
config: RunnableConfig, supersteps: Sequence[Sequence[StateUpdate]]
) -> RunnableConfig
异步批量应用对图状态的更新。需要设置检查点器。
| 参数 | 描述 |
|---|---|
配置
|
要应用更新的配置。
类型: |
supersteps
|
一个超步列表,每个超步包含一个要顺序应用于图状态的更新列表。每个更新都是 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果未设置检查点器或未提供更新。 |
InvalidUpdateError
|
如果提供了无效的更新。 |
| 返回 | 描述 |
|---|---|
RunnableConfig
|
更新后的配置。
类型: |
get_graph ¶
get_graph(config: RunnableConfig | None = None, *, xray: int | bool = False) -> Graph
返回计算图的可绘制表示。
aget_graph async ¶
aget_graph(config: RunnableConfig | None = None, *, xray: int | bool = False) -> Graph
返回计算图的可绘制表示。
get_subgraphs ¶
aget_subgraphs async ¶
aget_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]
获取图的子图。
| 参数 | 描述 |
|---|---|
namespace
|
用于过滤子图的命名空间。
类型: |
recurse
|
是否递归进入子图。如果为
类型: |
| 返回 | 描述 |
|---|---|
AsyncIterator[tuple[str, PregelProtocol]]
|
一个 |
with_config ¶
with_config(config: RunnableConfig | None = None, **kwargs: Any) -> Self
创建具有更新配置的 Pregel 对象的副本。
langgraph.graph.message ¶
| 函数 | 描述 |
|---|---|
add_messages |
通过 ID 合并两个消息列表,更新现有消息。 |
add_messages ¶
add_messages(
left: Messages,
right: Messages,
*,
format: Literal["langchain-openai"] | None = None,
) -> Messages
通过 ID 合并两个消息列表,更新现有消息。
默认情况下,这确保状态是“仅追加”的,除非新消息与现有消息具有相同的 ID。
| 参数 | 描述 |
|---|---|
left
|
类型: |
right
|
要合并到基础列表中的
类型: |
format
|
返回消息的格式。如果为 要求 必须安装
类型: |
| 返回 | 描述 |
|---|---|
消息
|
一个新的消息列表,其中来自 |
消息
|
如果 |
示例
from langchain_core.messages import AIMessage, HumanMessage
msgs1 = [HumanMessage(content="Hello", id="1")]
msgs2 = [AIMessage(content="Hi there!", id="2")]
add_messages(msgs1, msgs2)
# [HumanMessage(content='Hello', id='1'), AIMessage(content='Hi there!', id='2')]
msgs1 = [HumanMessage(content="Hello", id="1")]
msgs2 = [HumanMessage(content="Hello again", id="1")]
add_messages(msgs1, msgs2)
# [HumanMessage(content='Hello again', id='1')]
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
class State(TypedDict):
messages: Annotated[list, add_messages]
builder = StateGraph(State)
builder.add_node("chatbot", lambda state: {"messages": [("assistant", "Hello")]})
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
graph = builder.compile()
graph.invoke({})
# {'messages': [AIMessage(content='Hello', id=...)]}
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, add_messages
class State(TypedDict):
messages: Annotated[list, add_messages(format="langchain-openai")]
def chatbot_node(state: State) -> list:
return {
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": "Here's an image:",
"cache_control": {"type": "ephemeral"},
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": "1234",
},
},
],
},
]
}
builder = StateGraph(State)
builder.add_node("chatbot", chatbot_node)
builder.set_entry_point("chatbot")
builder.set_finish_point("chatbot")
graph = builder.compile()
graph.invoke({"messages": []})
# {
# 'messages': [
# HumanMessage(
# content=[
# {"type": "text", "text": "Here's an image:"},
# {
# "type": "image_url",
# "image_url": {"url": "data:image/jpeg;base64,1234"},
# },
# ],
# ),
# ]
# }