跳转到内容

中间件

langchain.agents.middleware

使用 Middleware 插件与 Agents 的入口。

参考文档

此页面包含 Middleware 的参考文档。有关使用 Middleware 的概念性指南、教程和示例,请参阅文档

描述
ContextEditingMiddleware

自动修剪工具结果以管理上下文大小。

HumanInTheLoopMiddleware

“人在环路”(Human in the loop)中间件。

LLMToolSelectorMiddleware

在调用主模型之前,使用 LLM 选择相关工具。

LLMToolEmulator

使用 LLM 模拟指定的工具,而不是执行它们。

ModelCallLimitMiddleware

跟踪模型调用次数并强制执行限制。

ModelFallbackMiddleware

发生错误时自动回退到备用模型。

PIIMiddleware

在代理对话中检测和处理个人可识别信息 (PII)。

PIIDetectionError

当配置为在检测到敏感值时阻止时引发。

SummarizationMiddleware

当令牌限制接近时,总结对话历史。

ToolCallLimitMiddleware

跟踪工具调用次数并强制执行限制。

AgentMiddleware

代理的中间件基类。

AgentState

代理的状态模式。

ClearToolUsesEdit

当超出令牌限制时清除工具输出的配置。

InterruptOnConfig

需要“人在环路”的操作的配置。

ModelRequest

代理的模型请求信息。

ModelResponse

模型执行的响应,包括消息和可选的结构化输出。

ModelRequest

代理的模型请求信息。

ContextEditingMiddleware

Bases: AgentMiddleware

自动修剪工具结果以管理上下文大小。

当总输入令牌数超过配置的阈值时,该中间件会应用一系列编辑。目前支持 `ClearToolUsesEdit` 策略,这与 Anthropic 的 `clear_tool_uses_20250919` 行为一致。

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

before_model

before_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

after_model

after_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

__init__

__init__(
    *,
    edits: Iterable[ContextEdit] | None = None,
    token_count_method: Literal["approximate", "model"] = "approximate",
) -> None

初始化上下文编辑中间件实例。

参数 描述
edits

要应用的编辑策略序列。默认为单个 `ClearToolUsesEdit`,以模仿 Anthropic 的默认行为。

类型: Iterable[ContextEdit] | None 默认: None

token_count_method

是使用近似令牌计数(更快,不太准确)还是由聊天模型实现的精确计数(可能更慢,更准确)。

类型: Literal['approximate', 'model'] 默认: 'approximate'

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

在通过处理器调用模型之前应用上下文编辑。

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

在通过处理器调用模型之前应用上下文编辑(异步版本)。

HumanInTheLoopMiddleware

Bases: AgentMiddleware

“人在环路”(Human in the loop)中间件。

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

before_model

before_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

__init__

__init__(
    interrupt_on: dict[str, bool | InterruptOnConfig],
    *,
    description_prefix: str = "Tool execution requires approval",
) -> None

初始化“人在环路”中间件。

参数 描述
interrupt_on

工具名称到允许操作的映射。如果某个工具没有条目,则默认自动批准。

  • True 表示允许所有决策:批准、编辑和拒绝。
  • False 表示该工具被自动批准。
  • InterruptOnConfig 表示此工具允许的具体决策。InterruptOnConfig 可以包含一个 `description` 字段(`str` 或 `Callable`)用于自定义中断描述的格式。

类型: dict[str, bool | InterruptOnConfig]

description_prefix

构建操作请求时使用的前缀。这用于提供有关工具调用和所请求操作的上下文。如果工具在其 `InterruptOnConfig` 中有 `description`,则不使用此项。

类型: str 默认: 'Tool execution requires approval'

after_model

after_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None

在 `AIMessage` 后为相关工具调用触发中断流程。

LLMToolSelectorMiddleware

Bases: AgentMiddleware

在调用主模型之前,使用 LLM 选择相关工具。

当一个代理有许多可用工具时,此中间件会将它们筛选为仅与用户查询最相关的工具。这减少了令牌使用量,并帮助主模型专注于正确的工具。

示例

限制为 3 个工具

from langchain.agents.middleware import LLMToolSelectorMiddleware

middleware = LLMToolSelectorMiddleware(max_tools=3)

agent = create_agent(
    model="openai:gpt-4o",
    tools=[tool1, tool2, tool3, tool4, tool5],
    middleware=[middleware],
)

使用较小的模型进行选择

middleware = LLMToolSelectorMiddleware(model="openai:gpt-4o-mini", max_tools=2)

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

before_model

before_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

after_model

after_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

__init__

__init__(
    *,
    model: str | BaseChatModel | None = None,
    system_prompt: str = DEFAULT_SYSTEM_PROMPT,
    max_tools: int | None = None,
    always_include: list[str] | None = None,
) -> None

初始化工具选择器。

参数 描述
model

用于选择的模型。如果未提供,则使用代理的主模型。可以是一个模型标识符字符串或 BaseChatModel 实例。

类型: str | BaseChatModel | None 默认: None

system_prompt

给选择模型的指令。

类型: str 默认: DEFAULT_SYSTEM_PROMPT

max_tools

要选择的最大工具数。如果模型选择的工具超过此数,将只使用前 max_tools 个。如果未指定,则无限制。

TYPE: int | None DEFAULT: None

always_include

无论选择如何,总是包含的工具名称。这些不计入 max_tools 限制。

类型: list[str] | None 默认值: None

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

在通过处理器调用模型之前,根据 LLM 的选择筛选工具。

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

在通过处理器调用模型之前,根据 LLM 的选择筛选工具。

LLMToolEmulator

Bases: AgentMiddleware

使用 LLM 模拟指定的工具,而不是执行它们。

该中间件允许为测试目的选择性地模拟工具。默认情况下(当 tools=None 时),所有工具都会被模拟。您可以通过传递工具名称列表或 BaseTool 实例来指定要模拟的工具。

示例

模拟所有工具(默认行为)

from langchain.agents.middleware import LLMToolEmulator

middleware = LLMToolEmulator()

agent = create_agent(
    model="openai:gpt-4o",
    tools=[get_weather, get_user_location, calculator],
    middleware=[middleware],
)

按名称模拟特定工具

middleware = LLMToolEmulator(tools=["get_weather", "get_user_location"])

使用自定义模型进行模拟

middleware = LLMToolEmulator(
    tools=["get_weather"], model="anthropic:claude-sonnet-4-5-20250929"
)

通过传递工具实例来模拟特定工具

middleware = LLMToolEmulator(tools=[get_weather, get_user_location])

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

before_model

before_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

after_model

after_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

__init__

__init__(
    *,
    tools: list[str | BaseTool] | None = None,
    model: str | BaseChatModel | None = None,
) -> None

初始化工具模拟器。

参数 描述
工具

要模拟的工具名称(str)或 BaseTool 实例列表。如果为 None(默认值),将模拟所有工具。如果为空列表,则不模拟任何工具。

类型: list[str | BaseTool] | None 默认: None

model

用于模拟的模型。默认为“anthropic:claude-sonnet-4-5-20250929”。可以是一个模型标识符字符串或 BaseChatModel 实例。

类型: str | BaseChatModel | None 默认: None

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

如果工具应该被模拟,则使用 LLM 模拟工具执行。

参数 描述
request

可能需要模拟的工具调用请求。

类型: ToolCallRequest

handler

执行工具的回调函数(可多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

如果工具应被模拟,则返回带有模拟响应的 ToolMessage,

ToolMessage | Command

否则调用处理器进行正常执行。

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

wrap_tool_call 的异步版本。

如果工具应该被模拟,则使用 LLM 模拟工具执行。

参数 描述
request

可能需要模拟的工具调用请求。

类型: ToolCallRequest

handler

执行工具的异步回调函数(可多次调用)。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

如果工具应被模拟,则返回带有模拟响应的 ToolMessage,

ToolMessage | Command

否则调用处理器进行正常执行。

ModelCallLimitMiddleware

Bases: AgentMiddleware[ModelCallLimitState, Any]

跟踪模型调用次数并强制执行限制。

此中间件监控代理执行期间的模型调用次数,并在达到指定限制时终止代理。它支持线程级和运行级调用计数,并具有可配置的退出行为。

线程级:中间件跟踪模型调用次数,并在代理的多次运行(调用)中保持调用计数。

运行级:中间件跟踪在代理的单次运行(调用)期间的模型调用次数。

示例
from langchain.agents.middleware.call_tracking import ModelCallLimitMiddleware
from langchain.agents import create_agent

# Create middleware with limits
call_tracker = ModelCallLimitMiddleware(thread_limit=10, run_limit=5, exit_behavior="end")

agent = create_agent("openai:gpt-4o", middleware=[call_tracker])

# Agent will automatically jump to end when limits are exceeded
result = await agent.invoke({"messages": [HumanMessage("Help me with a task")]})

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

state_schema 类属性 实例属性

state_schema = ModelCallLimitState

传递给中间件节点的状态模式。

__init__

__init__(
    *,
    thread_limit: int | None = None,
    run_limit: int | None = None,
    exit_behavior: Literal["end", "error"] = "end",
) -> None

初始化调用跟踪中间件。

参数 描述
thread_limit

每个线程允许的最大模型调用次数。None 表示无限制。

TYPE: int | None DEFAULT: None

run_limit

每次运行允许的最大模型调用次数。None 表示无限制。

TYPE: int | None DEFAULT: None

exit_behavior

超出限制时的操作。 - "end": 跳转到代理执行的末尾,并注入一条人工 AI 消息,指示已超出限制。 - "error": 抛出 `ModelCallLimitExceededError`

类型: Literal['end', 'error'] 默认: 'end'

引发 描述
ValueError

如果两个限制都为 `None` 或者 `exit_behavior` 无效。

before_model

before_model(state: ModelCallLimitState, runtime: Runtime) -> dict[str, Any] | None

在进行模型调用前检查模型调用限制。

参数 描述
state

包含调用计数的当前代理状态。

类型: ModelCallLimitState

runtime

langgraph 运行时。

类型: Runtime

返回 描述
dict[str, Any] | None

如果超出限制且 exit_behavior 为 "end",则返回

dict[str, Any] | None

一个 Command 以跳转到末尾,并附带超出限制的消息。否则返回 None。

引发 描述
ModelCallLimitExceededError

如果超出限制且 exit_behavior 为 "error"。

after_model

after_model(state: ModelCallLimitState, runtime: Runtime) -> dict[str, Any] | None

在模型调用后增加模型调用计数。

参数 描述
state

当前代理状态。

类型: ModelCallLimitState

runtime

langgraph 运行时。

类型: Runtime

返回 描述
dict[str, Any] | None

带有增加的调用计数的状态更新。

ModelFallbackMiddleware

Bases: AgentMiddleware

发生错误时自动回退到备用模型。

按顺序使用备用模型重试失败的模型调用,直到成功或所有模型都用尽。主模型在 create_agent() 中指定。

示例
from langchain.agents.middleware.model_fallback import ModelFallbackMiddleware
from langchain.agents import create_agent

fallback = ModelFallbackMiddleware(
    "openai:gpt-4o-mini",  # Try first on error
    "anthropic:claude-sonnet-4-5-20250929",  # Then this
)

agent = create_agent(
    model="openai:gpt-4o",  # Primary model
    middleware=[fallback],
)

# If primary fails: tries gpt-4o-mini, then claude-sonnet-4-5-20250929
result = await agent.invoke({"messages": [HumanMessage("Hello")]})

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

before_model

before_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

after_model

after_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

__init__

__init__(
    first_model: str | BaseChatModel, *additional_models: str | BaseChatModel
) -> None

初始化模型回退中间件。

参数 描述
first_model

第一个备用模型(字符串名称或实例)。

类型: str | BaseChatModel

*additional_models

按顺序排列的其他备用模型。

类型: str | BaseChatModel 默认: ()

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

出现错误时按顺序尝试备用模型。

参数 描述
request

初始模型请求。

类型: ModelRequest

handler

执行模型的回调函数。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

来自成功模型调用的 AIMessage。

引发 描述
Exception

如果所有模型都失败,则重新抛出最后一个异常。

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

出现错误时按顺序尝试备用模型(异步版本)。

参数 描述
request

初始模型请求。

类型: ModelRequest

handler

执行模型的异步回调函数。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

来自成功模型调用的 AIMessage。

引发 描述
Exception

如果所有模型都失败,则重新抛出最后一个异常。

PIIMiddleware

Bases: AgentMiddleware

在代理对话中检测和处理个人可识别信息 (PII)。

该中间件检测常见的 PII 类型并应用可配置的策略来处理它们。它可以在用户输入和代理输出中检测电子邮件、信用卡、IP 地址、MAC 地址和 URL。

内置 PII 类型
  • email: 电子邮件地址
  • credit_card:信用卡号(使用 Luhn 算法验证)
  • ip: IP 地址(使用标准库验证)
  • mac_address: MAC 地址
  • url: URL (包括 http/https 和裸 URL)
策略
  • block:检测到 PII 时引发异常
  • redact:将 PII 替换为 [REDACTED_TYPE] 占位符
  • mask:部分屏蔽 PII(例如,信用卡的 `****-****-****-1234`)
  • hash:将 PII 替换为确定性哈希值(例如,`<email_hash:a1b2c3d4>`)

策略选择指南

策略 是否保留身份? 最适用于
block 不适用 完全避免 PII
redact 通用合规性,日志清理
mask 人类可读性,客户服务界面
hash 是(假名) 分析,调试
示例
from langchain.agents.middleware import PIIMiddleware
from langchain.agents import create_agent

# Redact all emails in user input
agent = create_agent(
    "openai:gpt-5",
    middleware=[
        PIIMiddleware("email", strategy="redact"),
    ],
)

# Use different strategies for different PII types
agent = create_agent(
    "openai:gpt-4o",
    middleware=[
        PIIMiddleware("credit_card", strategy="mask"),
        PIIMiddleware("url", strategy="redact"),
        PIIMiddleware("ip", strategy="hash"),
    ],
)

# Custom PII type with regex
agent = create_agent(
    "openai:gpt-5",
    middleware=[
        PIIMiddleware("api_key", detector=r"sk-[a-zA-Z0-9]{32}", strategy="block"),
    ],
)

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

__init__

__init__(
    pii_type: Literal["email", "credit_card", "ip", "mac_address", "url"] | str,
    *,
    strategy: Literal["block", "redact", "mask", "hash"] = "redact",
    detector: Callable[[str], list[PIIMatch]] | str | None = None,
    apply_to_input: bool = True,
    apply_to_output: bool = False,
    apply_to_tool_results: bool = False,
) -> None

初始化 PII 检测中间件。

参数 描述
pii_type

要检测的 PII 类型。可以是内置类型(`email`, `credit_card`, `ip`, `mac_address`, `url`)或自定义类型名称。

类型: Literal['email', 'credit_card', 'ip', 'mac_address', 'url'] | str

strategy

如何处理检测到的 PII

  • block: 当检测到 PII 时引发 PIIDetectionError
  • redact:替换为 `[REDACTED_TYPE]` 占位符
  • mask:部分屏蔽 PII(显示最后几个字符)
  • hash:替换为确定性哈希值(格式:`<type_hash:digest>`)

类型: Literal['block', 'redact', 'mask', 'hash'] 默认: 'redact'

detector

自定义检测器函数或正则表达式模式。

  • 如果为 `Callable`:接受内容字符串并返回 PIIMatch 对象列表的函数
  • 如果为 `str`:匹配 PII 的正则表达式模式
  • 如果为 `None`:使用 pii_type 的内置检测器

类型: Callable[[str], list[PIIMatch]] | str | None 默认: None

apply_to_input

是否在模型调用前检查用户消息。

类型: bool 默认值: True

apply_to_output

是否在模型调用后检查 AI 消息。

类型: bool 默认值: False

apply_to_tool_results

是否在工具执行后检查工具结果消息。

类型: bool 默认值: False

引发 描述
ValueError

如果 pii_type 不是内置类型且未提供检测器。

name 属性

name: str

中间件的名称。

before_model

before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None

在模型调用前检查用户消息和工具结果中是否包含 PII。

参数 描述
state

当前代理状态。

类型: AgentState

runtime

langgraph 运行时。

类型: Runtime

返回 描述
dict[str, Any] | None

根据策略处理 PII 后的更新状态,如果未检测到 PII,则为 None。

引发 描述
PIIDetectionError

如果检测到 PII 且策略为 "block"。

after_model

after_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None

在模型调用后检查AI消息中的个人身份信息(PII)。

参数 描述
state

当前代理状态。

类型: AgentState

runtime

langgraph 运行时。

类型: Runtime

返回 描述
dict[str, Any] | None

根据策略处理 PII 后的更新状态,如果未检测到 PII,则为 None。

引发 描述
PIIDetectionError

如果检测到 PII 且策略为 "block"。

PIIDetectionError

基类:Exception

当配置为在检测到敏感值时阻止时引发。

__init__

__init__(pii_type: str, matches: Sequence[PIIMatch]) -> None

使用匹配上下文初始化异常。

参数 描述
pii_type

检测到的敏感类型的名称。

类型: str

matches

为该类型检测到的所有匹配项。

类型: Sequence[PIIMatch]

SummarizationMiddleware

Bases: AgentMiddleware

当令牌限制接近时,总结对话历史。

此中间件监控消息的令牌(token)计数,并在达到阈值时自动总结较早的消息,同时保留最近的消息,并通过确保 AI/工具消息对保持在一起以维持上下文的连续性。

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

after_model

after_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

__init__

__init__(
    model: str | BaseChatModel,
    max_tokens_before_summary: int | None = None,
    messages_to_keep: int = _DEFAULT_MESSAGES_TO_KEEP,
    token_counter: TokenCounter = count_tokens_approximately,
    summary_prompt: str = DEFAULT_SUMMARY_PROMPT,
    summary_prefix: str = SUMMARY_PREFIX,
) -> None

初始化摘要中间件。

参数 描述
model

用于生成摘要的语言模型。

类型: str | BaseChatModel

max_tokens_before_summary

触发摘要的令牌阈值。如果为 None,则禁用摘要功能。

TYPE: int | None DEFAULT: None

messages_to_keep

摘要后要保留的最近消息数量。

类型: int 默认值: _DEFAULT_MESSAGES_TO_KEEP

token_counter

用于计算消息中令牌数量的函数。

类型: TokenCounter 默认值: count_tokens_approximately

summary_prompt

用于生成摘要的提示模板。

类型: str 默认值: DEFAULT_SUMMARY_PROMPT

summary_prefix

包含摘要时添加到系统消息的前缀。

类型: str 默认值: SUMMARY_PREFIX

before_model

before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None

在模型调用前处理消息,可能会触发摘要。

ToolCallLimitMiddleware

基类:AgentMiddleware[ToolCallLimitState, Any]

跟踪工具调用次数并强制执行限制。

此中间件监控代理执行期间工具调用的次数,并在达到指定限制时可以终止代理。它支持线程级别和运行级别的调用计数,并具有可配置的退出行为。

线程级别:中间件跟踪工具调用的总数,并在代理的多次运行(调用)中持久化调用计数。

运行级别:中间件跟踪代理单次运行(调用)期间工具调用的次数。

示例
from langchain.agents.middleware.tool_call_limit import ToolCallLimitMiddleware
from langchain.agents import create_agent

# Limit all tool calls globally
global_limiter = ToolCallLimitMiddleware(thread_limit=20, run_limit=10, exit_behavior="end")

# Limit a specific tool
search_limiter = ToolCallLimitMiddleware(
    tool_name="search", thread_limit=5, run_limit=3, exit_behavior="end"
)

# Use both in the same agent
agent = create_agent("openai:gpt-4o", middleware=[global_limiter, search_limiter])

result = await agent.invoke({"messages": [HumanMessage("Help me with a task")]})

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

state_schema 类属性 实例属性

state_schema = ToolCallLimitState

传递给中间件节点的状态模式。

__init__

__init__(
    *,
    tool_name: str | None = None,
    thread_limit: int | None = None,
    run_limit: int | None = None,
    exit_behavior: Literal["end", "error"] = "end",
) -> None

初始化工具调用限制中间件。

参数 描述
tool_name

要限制的特定工具的名称。如果为 None,则限制适用于所有工具。默认为 None

类型: str | None 默认值: None

thread_limit

每个线程允许的最大工具调用次数。None 表示无限制。默认为 None

TYPE: int | None DEFAULT: None

run_limit

每次运行允许的最大工具调用次数。None 表示无限制。默认为 None

TYPE: int | None DEFAULT: None

exit_behavior

超出限制时应采取的操作。- "end":跳至代理执行的末尾,并注入一条人工 AI 消息,指示已超出限制。- "error":引发一个 ToolCallLimitExceededError。默认为 "end"。

类型: Literal['end', 'error'] 默认: 'end'

引发 描述
ValueError

如果两个限制都为 `None` 或者 `exit_behavior` 无效。

name 属性

name: str

中间件实例的名称。

如果指定了工具名称,则包含该名称,以允许此中间件的多个实例具有不同的工具名称。

before_model

before_model(state: ToolCallLimitState, runtime: Runtime) -> dict[str, Any] | None

在进行模型调用之前检查工具调用限制。

参数 描述
state

包含工具调用计数的当前代理状态。

类型: ToolCallLimitState

runtime

langgraph 运行时。

类型: Runtime

返回 描述
dict[str, Any] | None

如果超出限制且 exit_behavior 为 "end",则返回

dict[str, Any] | None

一个 Command 以跳转到末尾,并附带超出限制的消息。否则返回 None。

引发 描述
ToolCallLimitExceededError

如果超出限制且 exit_behavior 为 "error"。

after_model

after_model(state: ToolCallLimitState, runtime: Runtime) -> dict[str, Any] | None

在模型调用后(当进行工具调用时)增加工具调用计数。

参数 描述
state

当前代理状态。

类型: ToolCallLimitState

runtime

langgraph 运行时。

类型: Runtime

返回 描述
dict[str, Any] | None

如果进行了工具调用,状态更新会增加工具调用计数。

AgentMiddleware

基类:Generic[StateT, ContextT]

代理的中间件基类。

继承此子类并实现任何已定义的方法,以自定义代理在主代理循环中各步骤之间的行为。

state_schema 类属性 实例属性

state_schema: type[StateT] = cast('type[StateT]', AgentState)

传递给中间件节点的状态模式。

tools 实例属性

tools: list[BaseTool]

由中间件注册的额外工具。

name 属性

name: str

中间件实例的名称。

默认为类名,但可以为自定义命名而重写。

before_agent

before_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的逻辑。

abefore_agent 异步

abefore_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行开始前运行的异步逻辑。

before_model

before_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的逻辑。

abefore_model 异步

abefore_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用前运行的异步逻辑。

after_model

after_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的逻辑。

aafter_model 异步

aafter_model(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在模型被调用后运行的异步逻辑。

wrap_model_call

wrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], ModelResponse]
) -> ModelCallResult

通过处理器回调拦截和控制模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], ModelResponse]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

def wrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return handler(request)
        except Exception:
            if attempt == 2:
                raise

重写响应

def wrap_model_call(self, request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=f"[{ai_msg.content}]")],
        structured_response=response.structured_response,
    )

错误时回退

def wrap_model_call(self, request, handler):
    try:
        return handler(request)
    except Exception:
        return ModelResponse(result=[AIMessage(content="Service unavailable")])

缓存/短路

def wrap_model_call(self, request, handler):
    if cached := get_cache(request):
        return cached  # Short-circuit with cached result
    response = handler(request)
    save_cache(request, response)
    return response

简单的 AIMessage 返回(自动转换)

def wrap_model_call(self, request, handler):
    response = handler(request)
    # Can return AIMessage directly for simple cases
    return AIMessage(content="Simplified response")

awrap_model_call 异步

awrap_model_call(
    request: ModelRequest, handler: Callable[[ModelRequest], Awaitable[ModelResponse]]
) -> ModelCallResult

通过处理器回调拦截和控制异步模型执行。

处理器回调执行模型请求并返回一个 `ModelResponse`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

要执行的模型请求(包括状态和运行时)。

类型: ModelRequest

handler

执行模型请求并返回 `ModelResponse` 的异步回调函数。调用此函数以执行模型。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ModelRequest], Awaitable[ModelResponse]]

返回 描述
ModelCallResult

ModelCallResult

示例

出错时重试

async def awrap_model_call(self, request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

after_agent

after_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的逻辑。

aafter_agent 异步

aafter_agent(state: StateT, runtime: Runtime[ContextT]) -> dict[str, Any] | None

在代理执行完成后运行的异步逻辑。

wrap_tool_call

wrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command

拦截工具执行以进行重试、监控或修改。

多个中间件会自动组合(先定义的为最外层)。除非在 `ToolNode` 上配置了 `handle_tool_errors`,否则异常会传播。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具的可调用对象(可以多次调用)。

类型: Callable[[ToolCallRequest], ToolMessage | Command]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

在执行前修改请求

def wrap_tool_call(self, request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

出错时重试(多次调用处理器)

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

基于响应的条件性重试

def wrap_tool_call(self, request, handler):
    for attempt in range(3):
        result = handler(request)
        if isinstance(result, ToolMessage) and result.status != "error":
            return result
        if attempt < 2:
            continue
        return result

awrap_tool_call 异步

awrap_tool_call(
    request: ToolCallRequest,
    handler: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]],
) -> ToolMessage | Command

通过处理器回调拦截和控制异步工具执行。

处理器回调执行工具调用并返回一个 `ToolMessage` 或 `Command`。中间件可以多次调用处理器以实现重试逻辑,跳过调用以进行短路,或修改请求/响应。多个中间件组合时,列表中的第一个为最外层。

参数 描述
request

工具调用请求,包含调用 `dict`、`BaseTool`、状态和运行时。通过 `request.state` 访问状态,通过 `request.runtime` 访问运行时。

类型: ToolCallRequest

handler

执行工具并返回 `ToolMessage` 或 `Command` 的异步可调用对象。调用此对象以执行工具。可以为重试逻辑多次调用。可以跳过调用以进行短路。

类型: Callable[[ToolCallRequest], Awaitable[ToolMessage | Command]]

返回 描述
ToolMessage | Command

ToolMessageCommand (最终结果)。

处理器可调用对象可以为重试逻辑多次调用。对处理器的每次调用都是独立且无状态的。

示例

出错时异步重试

async def awrap_tool_call(self, request, handler):
    for attempt in range(3):
        try:
            result = await handler(request)
            if is_valid(result):
                return result
        except Exception:
            if attempt == 2:
                raise
    return result

async def awrap_tool_call(self, request, handler):
    if cached := await get_cache_async(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = await handler(request)
    await save_cache_async(request, result)
    return result

AgentState

基类:TypedDict, Generic[ResponseT]

代理的状态模式。

ClearToolUsesEdit 数据类

基类:ContextEdit

当超出令牌限制时清除工具输出的配置。

trigger 类属性 实例属性

trigger: int = 100000

触发编辑的令牌计数。

clear_at_least 类属性 实例属性

clear_at_least: int = 0

编辑运行时要回收的最小令牌数。

keep 类属性 实例属性

keep: int = 3

必须保留的最近工具结果的数量。

clear_tool_inputs 类属性 实例属性

clear_tool_inputs: bool = False

是否清除 AI 消息上原始工具调用的参数。

exclude_tools 类属性 实例属性

exclude_tools: Sequence[str] = ()

要排除在清除之外的工具名称列表。

placeholder 类属性 实例属性

placeholder: str = DEFAULT_TOOL_PLACEHOLDER

为已清除的工具输出插入的占位符文本。

apply

apply(messages: list[AnyMessage], *, count_tokens: TokenCounter) -> None

应用清除工具使用策略。

InterruptOnConfig

基类:TypedDict

需要“人在环路”的操作的配置。

这是在 HumanInTheLoopMiddleware.__init__ 方法中使用的配置格式。

allowed_decisions 实例属性

allowed_decisions: list[DecisionType]

此操作允许的决策。

description 实例属性

description: NotRequired[str | _DescriptionFactory]

附加到人工输入请求的描述。

可以是

  • 描述批准请求的静态字符串
  • 根据代理状态、运行时和工具调用信息动态生成描述的可调用对象
示例
# Static string description
config = ToolConfig(
    allowed_decisions=["approve", "reject"],
    description="Please review this tool execution"
)

# Dynamic callable description
def format_tool_description(
    tool_call: ToolCall,
    state: AgentState,
    runtime: Runtime
) -> str:
    import json
    return (
        f"Tool: {tool_call['name']}\n"
        f"Arguments:\n{json.dumps(tool_call['args'], indent=2)}"
    )

config = InterruptOnConfig(
    allowed_decisions=["approve", "edit", "reject"],
    description=format_tool_description
)

args_schema 实例属性

args_schema: NotRequired[dict[str, Any]]

如果允许编辑,则为与操作关联的参数的 JSON 模式。

ModelRequest 数据类

代理的模型请求信息。

override

override(**overrides: Unpack[_ModelRequestOverrides]) -> ModelRequest

用给定的覆盖项替换请求,生成一个新的请求。

返回一个新的 ModelRequest 实例,其中指定的属性已被替换。这遵循不可变模式,使原始请求保持不变。

参数 描述
**overrides

用于覆盖属性的关键字参数。支持的键:- model: BaseChatModel 实例 - system_prompt: 可选的系统提示字符串 - messages: 消息列表 - tool_choice: 工具选择配置 - tools: 可用工具列表 - response_format: 响应格式规范 - model_settings: 其他模型设置

类型: Unpack[_ModelRequestOverrides] 默认值: {}

返回 描述
ModelRequest

应用了指定覆盖的新 ModelRequest 实例。

示例

# Create a new request with different model
new_request = request.override(model=different_model)

# Override multiple attributes
new_request = request.override(system_prompt="New instructions", tool_choice="auto")

ModelResponse 数据类

模型执行的响应,包括消息和可选的结构化输出。

结果通常包含单个 AIMessage,但如果模型使用工具进行结构化输出,则可能包含一个额外的 ToolMessage。

result 实例属性

result: list[BaseMessage]

来自模型执行的消息列表。

structured_response 类属性 实例属性

structured_response: Any = None

如果指定了 response_format,则为解析的结构化输出,否则为 None。

before_model

before_model(
    func: _CallableWithStateAndRuntime[StateT, ContextT] | None = None,
    *,
    state_schema: type[StateT] | None = None,
    tools: list[BaseTool] | None = None,
    can_jump_to: list[JumpTo] | None = None,
    name: str | None = None,
) -> (
    Callable[
        [_CallableWithStateAndRuntime[StateT, ContextT]],
        AgentMiddleware[StateT, ContextT],
    ]
    | AgentMiddleware[StateT, ContextT]
)

用于动态创建带有 before_model 钩子的中间件的装饰器。

参数 描述
func

要装饰的函数。必须接受:state: StateT, runtime: Runtime[ContextT] - 状态和运行时上下文

类型: _CallableWithStateAndRuntime[StateT, ContextT] | None 默认值: None

state_schema

可选的自定义状态模式类型。如果未提供,则使用默认的 AgentState 模式。

类型: type[StateT] | None 默认值: None

工具

要在此中间件中注册的可选附加工具列表。

类型: list[BaseTool] | None 默认值: None

can_jump_to

条件边的可选有效跳转目标列表。有效值为:"tools""model""end"

类型: list[JumpTo] | None 默认值: None

name

生成的中间件类的可选名称。如果未提供,则使用被装饰函数的名称。

类型: str | None 默认值: None

返回 描述
Callable[[_CallableWithStateAndRuntime[StateT, ContextT]], AgentMiddleware[StateT, ContextT]] | AgentMiddleware[StateT, ContextT]

如果直接提供了 func,则为 AgentMiddleware 实例;否则为一个

Callable[[_CallableWithStateAndRuntime[StateT, ContextT]], AgentMiddleware[StateT, ContextT]] | AgentMiddleware[StateT, ContextT]

可以应用于其包装的函数的装饰器函数。

被装饰的函数应返回
  • dict[str, Any] - 要合并到代理状态中的状态更新
  • Command - 用于控制流程的命令(例如,跳转到不同的节点)
  • None - 无状态更新或流程控制

示例

基本用法

@before_model
def log_before_model(state: AgentState, runtime: Runtime) -> None:
    print(f"About to call model with {len(state['messages'])} messages")

带条件跳转

@before_model(can_jump_to=["end"])
def conditional_before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    if some_condition(state):
        return {"jump_to": "end"}
    return None

带自定义状态模式

@before_model(state_schema=MyCustomState)
def custom_before_model(state: MyCustomState, runtime: Runtime) -> dict[str, Any]:
    return {"custom_field": "updated_value"}

after_model

after_model(
    func: _CallableWithStateAndRuntime[StateT, ContextT] | None = None,
    *,
    state_schema: type[StateT] | None = None,
    tools: list[BaseTool] | None = None,
    can_jump_to: list[JumpTo] | None = None,
    name: str | None = None,
) -> (
    Callable[
        [_CallableWithStateAndRuntime[StateT, ContextT]],
        AgentMiddleware[StateT, ContextT],
    ]
    | AgentMiddleware[StateT, ContextT]
)

用于动态创建带有 after_model 钩子的中间件的装饰器。

参数 描述
func

要装饰的函数。必须接受:state: StateT, runtime: Runtime[ContextT] - 状态和运行时上下文

类型: _CallableWithStateAndRuntime[StateT, ContextT] | None 默认值: None

state_schema

可选的自定义状态模式类型。如果未提供,则使用默认的 AgentState 模式。

类型: type[StateT] | None 默认值: None

工具

要在此中间件中注册的可选附加工具列表。

类型: list[BaseTool] | None 默认值: None

can_jump_to

条件边的可选有效跳转目标列表。有效值为:"tools""model""end"

类型: list[JumpTo] | None 默认值: None

name

生成的中间件类的可选名称。如果未提供,则使用被装饰函数的名称。

类型: str | None 默认值: None

返回 描述
Callable[[_CallableWithStateAndRuntime[StateT, ContextT]], AgentMiddleware[StateT, ContextT]] | AgentMiddleware[StateT, ContextT]

如果提供了 func,则为 AgentMiddleware 实例;否则为一个装饰器

Callable[[_CallableWithStateAndRuntime[StateT, ContextT]], AgentMiddleware[StateT, ContextT]] | AgentMiddleware[StateT, ContextT]

可以应用于函数的函数。

被装饰的函数应返回
  • dict[str, Any] - 要合并到代理状态中的状态更新
  • Command - 用于控制流程的命令(例如,跳转到不同的节点)
  • None - 无状态更新或流程控制

示例

用于记录模型响应的基本用法

@after_model
def log_latest_message(state: AgentState, runtime: Runtime) -> None:
    print(state["messages"][-1].content)

带自定义状态模式

@after_model(state_schema=MyCustomState, name="MyAfterModelMiddleware")
def custom_after_model(state: MyCustomState, runtime: Runtime) -> dict[str, Any]:
    return {"custom_field": "updated_after_model"}

wrap_model_call

wrap_model_call(
    func: _CallableReturningModelResponse[StateT, ContextT] | None = None,
    *,
    state_schema: type[StateT] | None = None,
    tools: list[BaseTool] | None = None,
    name: str | None = None,
) -> (
    Callable[
        [_CallableReturningModelResponse[StateT, ContextT]],
        AgentMiddleware[StateT, ContextT],
    ]
    | AgentMiddleware[StateT, ContextT]
)

从函数创建带有 wrap_model_call 钩子的中间件。

将带有处理程序回调的函数转换为中间件,该中间件可以拦截模型调用、实现重试逻辑、处理错误并重写响应。

参数 描述
func

接受(request, handler)的函数,它调用 handler(request) 来执行模型并返回 ModelResponseAIMessage。Request 包含状态和运行时。

类型: _CallableReturningModelResponse[StateT, ContextT] | None 默认值: None

state_schema

自定义状态模式。默认为 AgentState

类型: type[StateT] | None 默认值: None

工具

要在此中间件中注册的附加工具。

类型: list[BaseTool] | None 默认值: None

name

中间件类名。默认为函数名。

类型: str | None 默认值: None

返回 描述
Callable[[_CallableReturningModelResponse[StateT, ContextT]], AgentMiddleware[StateT, ContextT]] | AgentMiddleware[StateT, ContextT]

如果提供了 func,则为 AgentMiddleware 实例,否则为一个装饰器。

示例

基本重试逻辑

@wrap_model_call
def retry_on_error(request, handler):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return handler(request)
        except Exception:
            if attempt == max_retries - 1:
                raise

模型回退

@wrap_model_call
def fallback_model(request, handler):
    # Try primary model
    try:
        return handler(request)
    except Exception:
        pass

    # Try fallback model
    request.model = fallback_model_instance
    return handler(request)

重写响应内容(完整的 ModelResponse)

@wrap_model_call
def uppercase_responses(request, handler):
    response = handler(request)
    ai_msg = response.result[0]
    return ModelResponse(
        result=[AIMessage(content=ai_msg.content.upper())],
        structured_response=response.structured_response,
    )

简单的 AIMessage 返回(自动转换)

@wrap_model_call
def simple_response(request, handler):
    # AIMessage is automatically converted to ModelResponse
    return AIMessage(content="Simple response")

wrap_tool_call

wrap_tool_call(
    func: _CallableReturningToolResponse | None = None,
    *,
    tools: list[BaseTool] | None = None,
    name: str | None = None,
) -> Callable[[_CallableReturningToolResponse], AgentMiddleware] | AgentMiddleware

从函数创建带有 wrap_tool_call 钩子的中间件。

将带有处理程序回调的函数转换为中间件,该中间件可以拦截工具调用、实现重试逻辑、监控执行并修改响应。

参数 描述
func

接受(request, handler)的函数,它调用 handler(request) 来执行工具并返回最终的 ToolMessageCommand。可以是同步或异步的。

类型: _CallableReturningToolResponse | None 默认值: None

工具

要在此中间件中注册的附加工具。

类型: list[BaseTool] | None 默认值: None

name

中间件类名。默认为函数名。

类型: str | None 默认值: None

返回 描述
Callable[[_CallableReturningToolResponse], AgentMiddleware] | AgentMiddleware

如果提供了 func,则为 AgentMiddleware 实例,否则为一个装饰器。

示例

重试逻辑

@wrap_tool_call
def retry_on_error(request, handler):
    max_retries = 3
    for attempt in range(max_retries):
        try:
            return handler(request)
        except Exception:
            if attempt == max_retries - 1:
                raise

异步重试逻辑

@wrap_tool_call
async def async_retry(request, handler):
    for attempt in range(3):
        try:
            return await handler(request)
        except Exception:
            if attempt == 2:
                raise

修改请求

@wrap_tool_call
def modify_args(request, handler):
    request.tool_call["args"]["value"] *= 2
    return handler(request)

使用缓存结果进行短路

@wrap_tool_call
def with_cache(request, handler):
    if cached := get_cache(request):
        return ToolMessage(content=cached, tool_call_id=request.tool_call["id"])
    result = handler(request)
    save_cache(request, result)
    return result

ModelRequest 数据类

代理的模型请求信息。

override

override(**overrides: Unpack[_ModelRequestOverrides]) -> ModelRequest

用给定的覆盖项替换请求,生成一个新的请求。

返回一个新的 ModelRequest 实例,其中指定的属性已被替换。这遵循不可变模式,使原始请求保持不变。

参数 描述
**overrides

用于覆盖属性的关键字参数。支持的键:- model: BaseChatModel 实例 - system_prompt: 可选的系统提示字符串 - messages: 消息列表 - tool_choice: 工具选择配置 - tools: 可用工具列表 - response_format: 响应格式规范 - model_settings: 其他模型设置

类型: Unpack[_ModelRequestOverrides] 默认值: {}

返回 描述
ModelRequest

应用了指定覆盖的新 ModelRequest 实例。

示例

# Create a new request with different model
new_request = request.override(model=different_model)

# Override multiple attributes
new_request = request.override(system_prompt="New instructions", tool_choice="auto")
© . This site is unofficial and not affiliated with LangChain, Inc.