跳转到内容

Pregel

langgraph.pregel.NodeBuilder

方法 描述
subscribe_only

订阅单个通道。

subscribe_to

添加要订阅的通道。当其中任何一个通道被调用时,节点将被调用

read_from

添加指定的通道以进行读取,但不订阅它们。

do

添加指定的节点。

write_to

添加通道写入。

meta

向节点添加标签或元数据。

build

构建节点。

subscribe_only

subscribe_only(channel: str) -> Self

订阅单个通道。

subscribe_to

subscribe_to(*channels: str, read: bool = True) -> Self

添加要订阅的通道。当这些通道中的任何一个被更新时,节点将被调用,并将通道值的字典作为输入。

参数 描述
channels

要订阅的通道名称

类型: str 默认: ()

read

如果为 True,通道将被包含在节点的输入中。否则,它们将触发节点但不会作为输入发送。

类型: bool 默认值: True

返回 描述
Self

Self,用于链式调用

read_from

read_from(*channels: str) -> Self

添加指定的通道以进行读取,但不订阅它们。

do

do(node: RunnableLike) -> Self

添加指定的节点。

write_to

write_to(*channels: str | ChannelWriteEntry, **kwargs: _WriteValue) -> Self

添加通道写入。

参数 描述
*channels

要写入的通道名称

类型: str | ChannelWriteEntry 默认: ()

**kwargs

通道名称和值的映射关系

类型: _WriteValue 默认: {}

返回 描述
Self

Self,用于链式调用

meta

meta(*tags: str, **metadata: Any) -> Self

向节点添加标签或元数据。

build

build() -> PregelNode

构建节点。

langgraph.pregel.Pregel

基类: PregelProtocol[StateT, ContextT, InputT, OutputT], Generic[StateT, ContextT, InputT, OutputT]

Pregel 管理 LangGraph 应用程序的运行时行为。

概述

Pregel 将 行动者(actors)通道(channels) 组合成一个单一的应用程序。行动者 从通道读取数据并向通道写入数据。Pregel 遵循 Pregel 算法/批量同步并行(Bulk Synchronous Parallel) 模型,将应用程序的执行组织成多个步骤。

每个步骤包含三个阶段

  • 计划:确定在此步骤中执行哪些行动者。例如,在第一步中,选择订阅特殊输入通道的行动者;在后续步骤中,选择订阅前一步骤中已更新通道的行动者
  • 执行:并行执行所有选定的行动者,直到全部完成、一个失败或达到超时。在此阶段,通道的更新对行动者不可见,直到下一步骤。
  • 更新:用此步骤中行动者写入的值更新通道。

重复此过程,直到没有行动者被选中执行,或达到最大步骤数。

行动者

一个行动者就是一个 PregelNode。它订阅通道,从中读取数据,并向其写入数据。可以将其视为 Pregel 算法中的一个行动者PregelNodes 实现了 LangChain 的 Runnable 接口。

通道

通道用于在行动者(PregelNodes)之间进行通信。每个通道都有一个值类型、一个更新类型和一个更新函数——它接收一系列更新并修改存储的值。通道可用于将数据从一个链发送到另一个链,或在未来的步骤中将数据发送给链自身。LangGraph 提供了许多内置通道

基本通道:LastValue 和 Topic
  • LastValue:默认通道,存储发送到通道的最后一个值,可用于输入和输出值,或用于将数据从一个步骤发送到下一个步骤
  • Topic:一个可配置的 PubSub Topic(发布/订阅主题),可用于在行动者之间发送多个值,或用于累积输出。可以配置为对值进行去重,和/或在多个步骤中累积值。
高级通道:Context 和 BinaryOperatorAggregate
  • Context:公开上下文管理器的值,并管理其生命周期。可用于访问需要设置和/或拆卸的外部资源。例如 client = Context(httpx.Client)
  • BinaryOperatorAggregate:存储一个持久化的值,通过将一个二元运算符应用于当前值和发送到通道的每个更新来更新它,可用于计算多个步骤的聚合。例如 total = BinaryOperatorAggregate(int, operator.add)

示例

大多数用户将通过 StateGraph(图 API)或通过 entrypoint(函数式 API)与 Pregel 进行交互。

然而,对于高级用例,可以直接使用 Pregel。如果你不确定是否需要直接使用 Pregel,那么答案可能是否定的——你应该使用图 API 或函数式 API。这些是更高级别的接口,它们在底层会被编译为 Pregel。

这里有一些示例,让你了解它的工作原理

单节点应用程序
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo'}
使用多个节点和多个输出通道
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
)

node2 = (
    NodeBuilder().subscribe_to("b")
    .do(lambda x: x["b"] + x["b"])
    .write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
)

app.invoke({"a": "foo"})
{'b': 'foofoo', 'c': 'foofoofoofoo'}
使用 Topic 通道
from langgraph.channels import LastValue, EphemeralValue, Topic
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": Topic(str, accumulate=True),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{"c": ["foofoo", "foofoofoofoo"]}
使用 BinaryOperatorAggregate 通道
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder


node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
)


def reducer(current, update):
    if current:
        return current + " | " + update
    else:
        return update


app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
)

app.invoke({"a": "foo"})
{'c': 'foofoo | foofoofoofoo'}
引入循环

此示例演示了如何在图中引入循环,通过让一个链写入它所订阅的通道。执行将继续,直到一个 None 值被写入该通道。

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder, ChannelWriteEntry

example_node = (
    NodeBuilder()
    .subscribe_only("value")
    .do(lambda x: x + x if len(x) < 10 else None)
    .write_to(ChannelWriteEntry(channel="value", skip_none=True))
)

app = Pregel(
    nodes={"example_node": example_node},
    channels={
        "value": EphemeralValue(str),
    },
    input_channels=["value"],
    output_channels=["value"],
)

app.invoke({"value": "a"})
{'value': 'aaaaaaaaaaaaaaaa'}
方法 描述
stream

为单个输入流式传输图的步骤。

astream

为单个输入异步流式传输图的步骤。

invoke

使用单个输入和配置运行图。

ainvoke

异步调用具有单个输入的图。

get_state

获取图的当前状态。

aget_state

获取图的当前状态。

get_state_history

获取图状态的历史记录。

aget_state_history

异步获取图状态的历史记录。

update_state

使用给定的值更新图的状态,就像它们来自

aupdate_state

异步地使用给定的值更新图的状态,就像它们来自

bulk_update_state

批量应用更新到图状态。需要设置一个检查点记录器。

abulk_update_state

异步批量应用更新到图状态。需要设置一个检查点记录器。

get_graph

返回计算图的可绘制表示。

aget_graph

返回计算图的可绘制表示。

get_subgraphs

获取图的子图。

aget_subgraphs

获取图的子图。

with_config

创建具有更新配置的 Pregel 对象的副本。

stream

stream(
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] | None = None,
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    subgraphs: bool = False,
    debug: bool | None = None,
    **kwargs: Unpack[DeprecatedKwargs],
) -> Iterator[dict[str, Any] | Any]

为单个输入流式传输图的步骤。

参数 描述
输入

图的输入。

类型: InputT | Command | None

配置

用于运行的配置。

类型: RunnableConfig | None 默认值: None

context

用于运行的静态上下文。

在 0.6.0 版本中添加

类型: ContextT | None 默认: None

stream_mode

流式输出的模式,默认为 self.stream_mode。选项有

  • "values":在每个步骤后发出状态中的所有值,包括中断。与函数式 API 一起使用时,值在工作流结束时发出一次。
  • "updates":仅在每个步骤后发出节点或任务返回的节点或任务名称及更新。如果在同一步骤中进行了多次更新(例如运行了多个节点),则这些更新将分别发出。
  • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
  • "messages":逐个 token 地发出 LLM 消息,并附带节点或任务内任何 LLM 调用的元数据。将以 2 元组 (LLM token, metadata) 的形式发出。
  • "checkpoints":在创建检查点时发出一个事件,格式与 get_state() 返回的格式相同。
  • "tasks":在任务开始和结束时发出事件,包括其结果和错误。

你可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。流式输出将是 (mode, data) 形式的元组。

有关更多详细信息,请参阅 LangGraph 流式处理指南

类型: StreamMode | Sequence[StreamMode] | None 默认: None

print_mode

接受与 stream_mode 相同的值,但仅将输出打印到控制台,用于调试目的。不以任何方式影响图的输出。

类型: StreamMode | Sequence[StreamMode] 默认: ()

output_keys

要流式传输的键,默认为所有非上下文通道。

类型: str | Sequence[str] | None 默认值: None

interrupt_before

在此之前中断的节点,默认为图中的所有节点。

类型: All | Sequence[str] | None 默认: None

interrupt_after

在此之后中断的节点,默认为图中的所有节点。

类型: All | Sequence[str] | None 默认: None

durability

图执行的持久性模式,默认为 "async"。选项有

  • "sync":在下一步开始前同步持久化更改。
  • "async":在下一步执行时异步持久化更改。
  • "exit":仅在图退出时持久化更改。

类型: Durability | None 默认: None

subgraphs

是否从子图内部流式传输事件,默认为 False。如果为 True,事件将以元组 (namespace, data) 的形式发出,或者如果 stream_mode 是列表,则为 (namespace, mode, data),其中 namespace 是一个包含调用子图的节点路径的元组,例如 ("parent_node:<task_id>", "child_node:<task_id>")

有关更多详细信息,请参阅 LangGraph 流式处理指南

类型: bool 默认值: False

YIELDS 描述
dict[str, Any] | Any

图中每个步骤的输出。输出形状取决于 stream_mode

astream async

astream(
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] | None = None,
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    subgraphs: bool = False,
    debug: bool | None = None,
    **kwargs: Unpack[DeprecatedKwargs],
) -> AsyncIterator[dict[str, Any] | Any]

为单个输入异步流式传输图的步骤。

参数 描述
输入

图的输入。

类型: InputT | Command | None

配置

用于运行的配置。

类型: RunnableConfig | None 默认值: None

context

用于运行的静态上下文。

在 0.6.0 版本中添加

类型: ContextT | None 默认: None

stream_mode

流式输出的模式,默认为 self.stream_mode。选项有

  • "values":在每个步骤后发出状态中的所有值,包括中断。与函数式 API 一起使用时,值在工作流结束时发出一次。
  • "updates":仅在每个步骤后发出节点或任务返回的节点或任务名称及更新。如果在同一步骤中进行了多次更新(例如运行了多个节点),则这些更新将分别发出。
  • "custom":使用 StreamWriter 从节点或任务内部发出自定义数据。
  • "messages":逐个 token 地发出 LLM 消息,并附带节点或任务内任何 LLM 调用的元数据。将以 2 元组 (LLM token, metadata) 的形式发出。
  • "debug":为每个步骤发出包含尽可能多信息的调试事件。

你可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。流式输出将是 (mode, data) 形式的元组。

有关更多详细信息,请参阅 LangGraph 流式处理指南

类型: StreamMode | Sequence[StreamMode] | None 默认: None

print_mode

接受与 stream_mode 相同的值,但仅将输出打印到控制台,用于调试目的。不以任何方式影响图的输出。

类型: StreamMode | Sequence[StreamMode] 默认: ()

output_keys

要流式传输的键,默认为所有非上下文通道。

类型: str | Sequence[str] | None 默认值: None

interrupt_before

在此之前中断的节点,默认为图中的所有节点。

类型: All | Sequence[str] | None 默认: None

interrupt_after

在此之后中断的节点,默认为图中的所有节点。

类型: All | Sequence[str] | None 默认: None

durability

图执行的持久性模式,默认为 "async"。选项有

  • "sync":在下一步开始前同步持久化更改。
  • "async":在下一步执行时异步持久化更改。
  • "exit":仅在图退出时持久化更改。

类型: Durability | None 默认: None

subgraphs

是否从子图内部流式传输事件,默认为 False。如果为 True,事件将以元组 (namespace, data) 的形式发出,或者如果 stream_mode 是列表,则为 (namespace, mode, data),其中 namespace 是一个包含调用子图的节点路径的元组,例如 ("parent_node:<task_id>", "child_node:<task_id>")

有关更多详细信息,请参阅 LangGraph 流式处理指南

类型: bool 默认值: False

YIELDS 描述
AsyncIterator[dict[str, Any] | Any]

图中每个步骤的输出。输出形状取决于 stream_mode

invoke

invoke(
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode = "values",
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    **kwargs: Any,
) -> dict[str, Any] | Any

使用单个输入和配置运行图。

参数 描述
输入

图的输入数据。它可以是字典或任何其他类型。

类型: InputT | Command | None

配置

图运行的配置。

类型: RunnableConfig | None 默认值: None

context

用于运行的静态上下文。

在 0.6.0 版本中添加

类型: ContextT | None 默认: None

stream_mode

图运行的流模式。

类型: StreamMode 默认: 'values'

print_mode

接受与 stream_mode 相同的值,但仅将输出打印到控制台,用于调试目的。不以任何方式影响图的输出。

类型: StreamMode | Sequence[StreamMode] 默认: ()

output_keys

从图运行中检索的输出键。

类型: str | Sequence[str] | None 默认值: None

interrupt_before

在此之前中断图运行的节点。

类型: All | Sequence[str] | None 默认: None

interrupt_after

在此之后中断图运行的节点。

类型: All | Sequence[str] | None 默认: None

durability

图执行的持久性模式,默认为 "async"。选项有

  • "sync":在下一步开始前同步持久化更改。
  • "async":在下一步执行时异步持久化更改。
  • "exit":仅在图退出时持久化更改。

类型: Durability | None 默认: None

**kwargs

传递给图运行的其他关键字参数。

类型: Any 默认值: {}

返回 描述
dict[str, Any] | Any

图运行的输出。如果 stream_mode"values",它返回最新的输出。

dict[str, Any] | Any

如果 stream_mode 不是 "values",它返回一个输出块列表。

ainvoke async

ainvoke(
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode = "values",
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    **kwargs: Any,
) -> dict[str, Any] | Any

异步调用具有单个输入的图。

参数 描述
输入

计算的输入数据。它可以是字典或任何其他类型。

类型: InputT | Command | None

配置

计算的配置。

类型: RunnableConfig | None 默认值: None

context

用于运行的静态上下文。

在 0.6.0 版本中添加

类型: ContextT | None 默认: None

stream_mode

计算的流模式。

类型: StreamMode 默认: 'values'

print_mode

接受与 stream_mode 相同的值,但仅将输出打印到控制台,用于调试目的。不以任何方式影响图的输出。

类型: StreamMode | Sequence[StreamMode] 默认: ()

output_keys

结果中要包含的输出键。

类型: str | Sequence[str] | None 默认值: None

interrupt_before

在此之前中断的节点。

类型: All | Sequence[str] | None 默认: None

interrupt_after

在此之后中断的节点。

类型: All | Sequence[str] | None 默认: None

durability

图执行的持久性模式,默认为 "async"。选项有

  • "sync":在下一步开始前同步持久化更改。
  • "async":在下一步执行时异步持久化更改。
  • "exit":仅在图退出时持久化更改。

类型: Durability | None 默认: None

**kwargs

附加的关键字参数。

类型: Any 默认值: {}

返回 描述
dict[str, Any] | Any

计算的结果。如果 stream_mode"values",它返回最新的值。

dict[str, Any] | Any

如果 stream_mode"chunks",它返回一个块列表。

get_state

get_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot

获取图的当前状态。

aget_state async

aget_state(config: RunnableConfig, *, subgraphs: bool = False) -> StateSnapshot

获取图的当前状态。

get_state_history

get_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> Iterator[StateSnapshot]

获取图状态的历史记录。

aget_state_history async

aget_state_history(
    config: RunnableConfig,
    *,
    filter: dict[str, Any] | None = None,
    before: RunnableConfig | None = None,
    limit: int | None = None,
) -> AsyncIterator[StateSnapshot]

异步获取图状态的历史记录。

update_state

update_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any | None,
    as_node: str | None = None,
    task_id: str | None = None,
) -> RunnableConfig

使用给定的值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确的话)。

aupdate_state async

aupdate_state(
    config: RunnableConfig,
    values: dict[str, Any] | Any,
    as_node: str | None = None,
    task_id: str | None = None,
) -> RunnableConfig

异步地使用给定的值更新图的状态,就像它们来自节点 as_node 一样。如果未提供 as_node,它将被设置为最后一个更新状态的节点(如果不明确的话)。

bulk_update_state

bulk_update_state(
    config: RunnableConfig, supersteps: Sequence[Sequence[StateUpdate]]
) -> RunnableConfig

批量应用更新到图状态。需要设置一个检查点记录器。

参数 描述
配置

要应用更新的配置。

类型: RunnableConfig

supersteps

一个超级步骤列表,每个超级步骤包含一个要按顺序应用于图状态的更新列表。每个更新都是一个形式为 (values, as_node, task_id) 的元组,其中 task_id 是可选的。

类型: Sequence[Sequence[StateUpdate]]

引发 描述
ValueError

如果未设置检查点记录器或未提供更新。

InvalidUpdateError

如果提供了无效的更新。

返回 描述
RunnableConfig

更新后的配置。

类型: RunnableConfig

abulk_update_state async

abulk_update_state(
    config: RunnableConfig, supersteps: Sequence[Sequence[StateUpdate]]
) -> RunnableConfig

异步批量应用更新到图状态。需要设置一个检查点记录器。

参数 描述
配置

要应用更新的配置。

类型: RunnableConfig

supersteps

一个超级步骤列表,每个超级步骤包含一个要按顺序应用于图状态的更新列表。每个更新都是一个形式为 (values, as_node, task_id) 的元组,其中 task_id 是可选的。

类型: Sequence[Sequence[StateUpdate]]

引发 描述
ValueError

如果未设置检查点记录器或未提供更新。

InvalidUpdateError

如果提供了无效的更新。

返回 描述
RunnableConfig

更新后的配置。

类型: RunnableConfig

get_graph

get_graph(config: RunnableConfig | None = None, *, xray: int | bool = False) -> Graph

返回计算图的可绘制表示。

aget_graph async

aget_graph(config: RunnableConfig | None = None, *, xray: int | bool = False) -> Graph

返回计算图的可绘制表示。

get_subgraphs

get_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> Iterator[tuple[str, PregelProtocol]]

获取图的子图。

参数 描述
namespace

用于过滤子图的命名空间。

类型: str | None 默认值: None

recurse

是否递归进入子图。如果为 False,则只返回直接子图。

类型: bool 默认值: False

返回 描述
Iterator[tuple[str, PregelProtocol]]

(namespace, subgraph) 对的迭代器。

aget_subgraphs async

aget_subgraphs(
    *, namespace: str | None = None, recurse: bool = False
) -> AsyncIterator[tuple[str, PregelProtocol]]

获取图的子图。

参数 描述
namespace

用于过滤子图的命名空间。

类型: str | None 默认值: None

recurse

是否递归进入子图。如果为 False,则只返回直接子图。

类型: bool 默认值: False

返回 描述
AsyncIterator[tuple[str, PregelProtocol]]

(namespace, subgraph) 对的迭代器。

with_config

with_config(config: RunnableConfig | None = None, **kwargs: Any) -> Self

创建具有更新配置的 Pregel 对象的副本。

© . This site is unofficial and not affiliated with LangChain, Inc.