跳转到内容

函数式 API

langgraph.func

函数 描述
任务 (task)

使用 task 装饰器定义一个 LangGraph 任务。

entrypoint

基类:Generic[ContextT]

使用 entrypoint 装饰器定义一个 LangGraph 工作流。

函数签名

被装饰的函数必须接受一个**单一参数**,作为函数的输入。此输入参数可以是任何类型。若要向函数传递**多个参数**,请使用字典。

可注入参数

被装饰的函数可以请求访问将在运行时自动注入的额外参数。这些参数包括:

参数 描述
配置 一个配置对象(即 RunnableConfig),用于存放运行时配置值。
previous 给定线程的上一个返回值(仅当提供了检查点时可用)。
runtime 一个 Runtime 对象,包含当前运行的信息,包括上下文、存储和写入器。

entrypoint 装饰器可以应用于同步函数或异步函数。

状态管理

previous 参数可用于访问同一线程 ID 上 entrypoint 上一次调用的返回值。此值仅在提供了检查点时可用。

如果您希望 previous 与返回值不同,可以使用 entrypoint.final 对象在返回一个值的同时,将另一个不同的值保存到检查点中。

参数 描述
checkpointer

指定一个检查点,以创建一个可以在多次运行之间持久化其状态的工作流。

类型: BaseCheckpointSaver | None 默认值: None

store

一个通用的键值存储。某些实现可能通过可选的 index 配置支持语义搜索功能。

类型: BaseStore | None 默认值: None

cache

用于缓存工作流结果的缓存。

类型: BaseCache | None 默认值: None

context_schema

指定将传递给工作流的上下文对象的模式。

类型: type[ContextT] | None 默认值: None

cache_policy

用于缓存工作流结果的缓存策略。

类型: CachePolicy | None 默认值: None

retry_policy

在工作流失败时使用的重试策略(或策略列表)。

类型: RetryPolicy | Sequence[RetryPolicy] | None 默认值: None

config_schema 已弃用

config_schema 参数在 v0.6.0 中已弃用,并将在 v2.0.0 中移除支持。请改用 context_schema 来指定运行范围内的上下文模式。

使用 entrypoint 和 tasks
import time

from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

@task
def compose_essay(topic: str) -> str:
    time.sleep(1.0)  # Simulate slow operation
    return f"An essay about {topic}"

@entrypoint(checkpointer=InMemorySaver())
def review_workflow(topic: str) -> dict:
    """Manages the workflow for generating and reviewing an essay.

    The workflow includes:
    1. Generating an essay about the given topic.
    2. Interrupting the workflow for human review of the generated essay.

    Upon resuming the workflow, compose_essay task will not be re-executed
    as its result is cached by the checkpointer.

    Args:
        topic: The subject of the essay.

    Returns:
        dict: A dictionary containing the generated essay and the human review.
    """
    essay_future = compose_essay(topic)
    essay = essay_future.result()
    human_review = interrupt({
        "question": "Please provide a review",
        "essay": essay
    })
    return {
        "essay": essay,
        "review": human_review,
    }

# Example configuration for the workflow
config = {
    "configurable": {
        "thread_id": "some_thread"
    }
}

# Topic for the essay
topic = "cats"

# Stream the workflow to generate the essay and await human review
for result in review_workflow.stream(topic, config):
    print(result)

# Example human review provided after the interrupt
human_review = "This essay is great."

# Resume the workflow with the provided human review
for result in review_workflow.stream(Command(resume=human_review), config):
    print(result)
访问上一个返回值

当启用检查点时,函数可以访问同一线程 ID 上上一次调用的返回值。

from typing import Optional

from langgraph.checkpoint.memory import MemorySaver

from langgraph.func import entrypoint


@entrypoint(checkpointer=InMemorySaver())
def my_workflow(input_data: str, previous: Optional[str] = None) -> str:
    return "world"


config = {"configurable": {"thread_id": "some_thread"}}
my_workflow.invoke("hello", config)
使用 entrypoint.final 保存值

entrypoint.final 对象允许您在返回一个值的同时,将另一个不同的值保存到检查点中。只要使用相同的线程 ID,这个值就可以在下一次调用 entrypoint 时通过 previous 参数访问。

from typing import Any

from langgraph.checkpoint.memory import MemorySaver

from langgraph.func import entrypoint


@entrypoint(checkpointer=InMemorySaver())
def my_workflow(
    number: int,
    *,
    previous: Any = None,
) -> entrypoint.final[int, int]:
    previous = previous or 0
    # This will return the previous value to the caller, saving
    # 2 * number to the checkpoint, which will be used in the next invocation
    # for the `previous` parameter.
    return entrypoint.final(value=previous, save=2 * number)


config = {"configurable": {"thread_id": "some_thread"}}

my_workflow.invoke(3, config)  # 0 (previous was None)
my_workflow.invoke(1, config)  # 6 (previous was 3 * 2 from the previous invocation)
方法 描述
__init__

初始化 entrypoint 装饰器。

__call__

将一个函数转换为 Pregel 图。

final dataclass

基类:Generic[R, S]

可以从 entrypoint 返回的一种原生类型。

这种原生类型允许将一个与 entrypoint 返回值不同的值保存到检查点。

解耦返回值和保存值
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint


@entrypoint(checkpointer=InMemorySaver())
def my_workflow(
    number: int,
    *,
    previous: Any = None,
) -> entrypoint.final[int, int]:
    previous = previous or 0
    # This will return the previous value to the caller, saving
    # 2 * number to the checkpoint, which will be used in the next invocation
    # for the `previous` parameter.
    return entrypoint.final(value=previous, save=2 * number)


config = {"configurable": {"thread_id": "1"}}

my_workflow.invoke(3, config)  # 0 (previous was None)
my_workflow.invoke(1, config)  # 6 (previous was 3 * 2 from the previous invocation)
value instance-attribute
value: R

要返回的值。即使值为 None,也总会返回一个值。

save instance-attribute
save: S

下一个检查点的状态值。

即使值为 None,也总会保存一个值。

__init__

__init__(
    checkpointer: BaseCheckpointSaver | None = None,
    store: BaseStore | None = None,
    cache: BaseCache | None = None,
    context_schema: type[ContextT] | None = None,
    cache_policy: CachePolicy | None = None,
    retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
    **kwargs: Unpack[DeprecatedKwargs],
) -> None

初始化 entrypoint 装饰器。

__call__

__call__(func: Callable[..., Any]) -> Pregel

将一个函数转换为 Pregel 图。

参数 描述
func

要转换的函数。支持同步和异步函数。

类型: Callable[..., Any]

返回 描述
Pregel

一个 Pregel 图。

task

task(
    __func_or_none__: Callable[P, Awaitable[T]] | Callable[P, T] | None = None,
    *,
    name: str | None = None,
    retry_policy: RetryPolicy | Sequence[RetryPolicy] | None = None,
    cache_policy: CachePolicy[Callable[P, str | bytes]] | None = None,
    **kwargs: Unpack[DeprecatedKwargs],
) -> (
    Callable[[Callable[P, Awaitable[T]] | Callable[P, T]], _TaskFunction[P, T]]
    | _TaskFunction[P, T]
)

使用 task 装饰器定义一个 LangGraph 任务。

异步函数需要 Python 3.11 或更高版本

task 装饰器支持同步和异步函数。要使用异步函数,请确保您使用的是 Python 3.11 或更高版本。

任务只能在 entrypointStateGraph 内部调用。任务的调用方式与普通函数类似,但有以下区别:

  • 当启用检查点时,函数的输入和输出必须是可序列化的。
  • 被装饰的函数只能在 entrypoint 或 StateGraph 内部调用。
  • 调用该函数会产生一个 future。这使得任务的并行化变得容易。
参数 描述
name

任务的可选名称。如果未提供,将使用函数名。

类型: str | None 默认值: None

retry_policy

在任务失败时使用的可选重试策略(或策略列表)。

类型: RetryPolicy | Sequence[RetryPolicy] | None 默认值: None

cache_policy

用于任务的可选缓存策略。这允许缓存任务结果。

类型: CachePolicy[Callable[P, str | bytes]] | None 默认值: None

返回 描述
Callable[[Callable[P, Awaitable[T]] | Callable[P, T]], _TaskFunction[P, T]] | _TaskFunction[P, T]

用作装饰器时是一个可调用函数。

同步任务
from langgraph.func import entrypoint, task


@task
def add_one(a: int) -> int:
    return a + 1


@entrypoint()
def add_one(numbers: list[int]) -> list[int]:
    futures = [add_one(n) for n in numbers]
    results = [f.result() for f in futures]
    return results


# Call the entrypoint
add_one.invoke([1, 2, 3])  # Returns [2, 3, 4]
异步任务
import asyncio
from langgraph.func import entrypoint, task


@task
async def add_one(a: int) -> int:
    return a + 1


@entrypoint()
async def add_one(numbers: list[int]) -> list[int]:
    futures = [add_one(n) for n in numbers]
    return asyncio.gather(*futures)


# Call the entrypoint
await add_one.ainvoke([1, 2, 3])  # Returns [2, 3, 4]
© . This site is unofficial and not affiliated with LangChain, Inc.