Pregel
langgraph.pregel.NodeBuilder ¶
| 方法 | 描述 |
|---|---|
subscribe_only |
订阅单个通道。 |
subscribe_to |
添加要订阅的通道。当其中任何一个通道被调用时,节点将被调用 |
read_from |
添加指定的通道以进行读取,但不订阅它们。 |
do |
添加指定的节点。 |
write_to |
添加通道写入。 |
meta |
向节点添加标签或元数据。 |
build |
构建节点。 |
langgraph.pregel.Pregel ¶
基类: PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]
Pregel 管理 LangGraph 应用程序的运行时行为。
概述¶
Pregel 将 行动者(actors) 和 通道(channels) 组合成一个单一的应用程序。行动者 从通道读取数据并向通道写入数据。Pregel 遵循 Pregel 算法/批量同步并行(Bulk Synchronous Parallel) 模型,将应用程序的执行组织成多个步骤。
每个步骤包含三个阶段
- 计划:确定在此步骤中执行哪些行动者。例如,在第一步中,选择订阅特殊输入通道的行动者;在后续步骤中,选择订阅前一步骤中已更新通道的行动者。
- 执行:并行执行所有选定的行动者,直到全部完成、一个失败或达到超时。在此阶段,通道的更新对行动者不可见,直到下一步骤。
- 更新:用此步骤中行动者写入的值更新通道。
重复此过程,直到没有行动者被选中执行,或达到最大步骤数。
行动者¶
一个行动者就是一个 PregelNode。它订阅通道,从中读取数据,并向其写入数据。可以将其视为 Pregel 算法中的一个行动者。PregelNodes 实现了 LangChain 的 Runnable 接口。
通道¶
通道用于在行动者(PregelNodes)之间进行通信。每个通道都有一个值类型、一个更新类型和一个更新函数——它接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据发送给链自身。LangGraph 提供了许多内置通道
基本通道:LastValue 和 Topic¶
LastValue:默认通道,存储发送到通道的最后一个值,可用于输入和输出值,或用于将数据从一个步骤发送到下一个步骤Topic:一个可配置的 PubSub Topic(发布/订阅主题),可用于在行动者之间发送多个值,或用于累积输出。可以配置为对值进行去重,和/或在多个步骤中累积值。
高级通道:Context 和 BinaryOperatorAggregate¶
Context:公开上下文管理器的值,并管理其生命周期。可用于访问需要设置和/或拆卸的外部资源。例如client = Context(httpx.Client)BinaryOperatorAggregate:存储一个持久化的值,通过将一个二元运算符应用于当前值和发送到通道的每个更新来更新它,可用于计算多个步骤的聚合。例如total = BinaryOperatorAggregate(int, operator.add)
示例¶
大多数用户将通过 StateGraph(图 API)或通过 entrypoint(函数式 API)与 Pregel 进行交互。
然而,对于高级用例,可以直接使用 Pregel。如果你不确定是否需要直接使用 Pregel,那么答案可能是否定的——你应该使用图 API 或函数式 API。这些是更高级别的接口,它们在底层会被编译为 Pregel。
这里有一些示例,让你了解它的工作原理
单节点应用程序
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
app.invoke({"a": "foo"})
使用多个节点和多个输出通道
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
)
app.invoke({"a": "foo"})
使用 Topic 通道
from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": Topic(str, accumulate=True),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
使用 BinaryOperatorAggregate 通道
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
)
def reducer(current, update):
if current:
return current + " | " + update
else:
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
)
app.invoke({"a": "foo"})
引入循环
此示例演示了如何在图中引入循环,通过让一个链写入它所订阅的通道。执行将继续,直到一个 None 值被写入该通道。
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry
example_node = (
NodeBuilder()
.subscribe_only("value")
.do(lambda x: x + x if len(x) < 10 else None)
.write_to(ChannelWriteEntry(channel="value", skip_none=True))
)
app = Pregel(
nodes={"example_node": example_node},
channels={
"value": EphemeralValue(str),
},
input_channels=["value"],
output_channels=["value"],
)
app.invoke({"value": "a"})
| 方法 | 描述 |
|---|---|
stream |
为单个输入流式传输图的步骤。 |
astream |
为单个输入异步流式传输图的步骤。 |
invoke |
使用单个输入和配置运行图。 |
ainvoke |
异步调用具有单个输入的图。 |
get_state |
获取图的当前状态。 |
aget_state |
获取图的当前状态。 |
get_state_history |
获取图状态的历史记录。 |
aget_state_history |
异步获取图状态的历史记录。 |
update_state |
使用给定的值更新图的状态,就像它们来自 |
aupdate_state |
异步地使用给定的值更新图的状态,就像它们来自 |
bulk_update_state |
批量应用更新到图状态。需要设置一个检查点记录器。 |
abulk_update_state |
异步批量应用更新到图状态。需要设置一个检查点记录器。 |
get_graph |
返回计算图的可绘制表示。 |
aget_graph |
返回计算图的可绘制表示。 |
get_subgraphs |
获取图的子图。 |
aget_subgraphs |
获取图的子图。 |
with_config |
创建具有更新配置的 Pregel 对象的副本。 |
stream ¶
stream(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
**kwargs: Unpack[DeprecatedKwargs],
) -> Iterator[dict[str, Any] | Any]
为单个输入流式传输图的步骤。
| 参数 | 描述 |
|---|---|
输入
|
图的输入。
类型: |
配置
|
用于运行的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
流式输出的模式,默认为
你可以将列表作为 有关更多详细信息,请参阅 LangGraph 流式处理指南。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
要流式传输的键,默认为所有非上下文通道。 |
interrupt_before
|
在此之前中断的节点,默认为图中的所有节点。 |
interrupt_after
|
在此之后中断的节点,默认为图中的所有节点。 |
durability
|
图执行的持久性模式,默认为
类型: |
subgraphs
|
是否从子图内部流式传输事件,默认为 False。如果为 有关更多详细信息,请参阅 LangGraph 流式处理指南。
类型: |
| YIELDS | 描述 |
|---|---|
dict[str, Any] | Any
|
图中每个步骤的输出。输出形状取决于 |
astream async ¶
astream(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
**kwargs: Unpack[DeprecatedKwargs],
) -> AsyncIterator[dict[str, Any] | Any]
为单个输入异步流式传输图的步骤。
| 参数 | 描述 |
|---|---|
输入
|
图的输入。
类型: |
配置
|
用于运行的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
流式输出的模式,默认为
你可以将列表作为 有关更多详细信息,请参阅 LangGraph 流式处理指南。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
要流式传输的键,默认为所有非上下文通道。 |
interrupt_before
|
在此之前中断的节点,默认为图中的所有节点。 |
interrupt_after
|
在此之后中断的节点,默认为图中的所有节点。 |
durability
|
图执行的持久性模式,默认为
类型: |
subgraphs
|
是否从子图内部流式传输事件,默认为 False。如果为 有关更多详细信息,请参阅 LangGraph 流式处理指南。
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[dict[str, Any] | Any]
|
图中每个步骤的输出。输出形状取决于 |
invoke ¶
invoke(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
使用单个输入和配置运行图。
| 参数 | 描述 |
|---|---|
输入
|
图的输入数据。它可以是字典或任何其他类型。
类型: |
配置
|
图运行的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
图运行的流模式。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
从图运行中检索的输出键。 |
interrupt_before
|
在此之前中断图运行的节点。 |
interrupt_after
|
在此之后中断图运行的节点。 |
durability
|
图执行的持久性模式,默认为
类型: |
**kwargs
|
传递给图运行的其他关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any] | Any
|
图运行的输出。如果 |
dict[str, Any] | Any
|
如果 |
ainvoke async ¶
ainvoke(
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
异步调用具有单个输入的图。
| 参数 | 描述 |
|---|---|
输入
|
计算的输入数据。它可以是字典或任何其他类型。
类型: |
配置
|
计算的配置。
类型: |
context
|
用于运行的静态上下文。 在 0.6.0 版本中添加
类型: |
stream_mode
|
计算的流模式。
类型: |
print_mode
|
接受与
类型: |
output_keys
|
结果中要包含的输出键。 |
interrupt_before
|
在此之前中断的节点。 |
interrupt_after
|
在此之后中断的节点。 |
durability
|
图执行的持久性模式,默认为
类型: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any] | Any
|
计算的结果。如果 |
dict[str, Any] | Any
|
如果 |
get_state ¶
get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
获取图的当前状态。
aget_state async ¶
aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot
获取图的当前状态。
get_state_history ¶
get_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
) -> Iterator[StateSnapshot]
获取图状态的历史记录。
aget_state_history async ¶
aget_state_history(
config: RunnableConfig,
*,
filter: dict[str, Any] | None = None,
before: RunnableConfig | None = None,
limit: int | None = None,
) -> AsyncIterator[StateSnapshot]
异步获取图状态的历史记录。
update_state ¶
update_state(
config: RunnableConfig,
values: dict[str, Any] | Any | None,
as_node: str | None = None,
task_id: str | None = None,
) -> RunnableConfig
使用给定的值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确的话)。
aupdate_state async ¶
aupdate_state(
config: RunnableConfig,
values: dict[str, Any] | Any,
as_node: str | None = None,
task_id: str | None = None,
) -> RunnableConfig
异步地使用给定的值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确的话)。
bulk_update_state ¶
bulk_update_state(
config: RunnableConfig, supersteps: Sequence[Sequence[StateUpdate]]
) -> RunnableConfig
批量应用更新到图状态。需要设置一个检查点记录器。
| 参数 | 描述 |
|---|---|
配置
|
要应用更新的配置。
类型: |
supersteps
|
一个超级步骤列表,每个超级步骤包含一个要按顺序应用于图状态的更新列表。每个更新都是一个形式为 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果未设置检查点记录器或未提供更新。 |
InvalidUpdateError
|
如果提供了无效的更新。 |
| 返回 | 描述 |
|---|---|
RunnableConfig
|
更新后的配置。
类型: |
abulk_update_state async ¶
abulk_update_state(
config: RunnableConfig, supersteps: Sequence[Sequence[StateUpdate]]
) -> RunnableConfig
异步批量应用更新到图状态。需要设置一个检查点记录器。
| 参数 | 描述 |
|---|---|
配置
|
要应用更新的配置。
类型: |
supersteps
|
一个超级步骤列表,每个超级步骤包含一个要按顺序应用于图状态的更新列表。每个更新都是一个形式为 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果未设置检查点记录器或未提供更新。 |
InvalidUpdateError
|
如果提供了无效的更新。 |
| 返回 | 描述 |
|---|---|
RunnableConfig
|
更新后的配置。
类型: |
get_graph ¶
get_graph(config: RunnableConfig | None = None, *, xray: int | bool = False) -> Graph
返回计算图的可绘制表示。
aget_graph async ¶
aget_graph(config: RunnableConfig | None = None, *, xray: int | bool = False) -> Graph
返回计算图的可绘制表示。
get_subgraphs ¶
aget_subgraphs async ¶
aget_subgraphs(
*, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]
获取图的子图。
| 参数 | 描述 |
|---|---|
namespace
|
用于过滤子图的命名空间。
类型: |
recurse
|
是否递归进入子图。如果为
类型: |
| 返回 | 描述 |
|---|---|
AsyncIterator[tuple[str, PregelProtocol]]
|
|
with_config ¶
with_config(config: RunnableConfig | None = None, **kwargs: Any) -> Self
创建具有更新配置的 Pregel 对象的副本。