langchain-tavily¶
langchain_tavily ¶
TavilyCrawl ¶
基类: BaseTool
该工具可向 Tavily Crawl API 发送请求,并支持动态设置参数。
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
工具的输入模式。 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
__init_subclass__ |
在子类创建期间验证工具类定义。 |
run |
运行工具。 |
arun |
异步运行工具。 |
__init__ |
初始化工具。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
return_direct class-attribute instance-attribute ¶
return_direct: bool = False
是否直接返回工具的输出。
将其设置为 True 意味着在调用工具后,AgentExecutor 将停止循环。
callbacks class-attribute instance-attribute ¶
callbacks: Callbacks = Field(default=None, exclude=True)
在工具执行期间调用的回调。
tags class-attribute instance-attribute ¶
与工具关联的可选标签列表。
这些标签将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
metadata class-attribute instance-attribute ¶
与工具关联的可选元数据。
此元数据将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
handle_validation_error class-attribute instance-attribute ¶
handle_validation_error: (
bool | str | Callable[[ValidationError | ValidationError], str] | None
) = False
处理抛出的 ValidationError 的内容。
response_format class-attribute instance-attribute ¶
response_format: Literal['content', 'content_and_artifact'] = 'content'
工具响应格式。
如果为 `'content'`,则工具的输出被解释为 `ToolMessage` 的内容。如果为 `'content_and_artifact'`,则输出应为与 `ToolMessage` 的 `(content, artifact)` 对应的二元组。
is_single_input property ¶
is_single_input: bool
检查工具是否只接受单个输入参数。
| 返回 | 描述 |
|---|---|
bool
|
如果工具只有一个输入参数,则为 |
tool_call_schema property ¶
获取工具调用的模式,不包括注入的参数。
| 返回 | 描述 |
|---|---|
ArgsSchema
|
应用于语言模型工具调用的模式。 |
description class-attribute instance-attribute ¶
description: str = "A powerful web crawler that initiates a structured web crawl starting from a specified \n base URL. The crawler uses a BFS Depth: refering to the number of link hops from the root URL. \n A page directly linked from the root is at BFS depth 1, regardless of its URL structure.\n You can control how deep and wide it goes, and guide it to focus on specific sections of the site.\n "
用于告诉模型如何/何时/为何使用该工具。
您可以提供 few-shot 示例作为描述的一部分。
args_schema class-attribute instance-attribute ¶
用于验证和解析工具输入参数的 Pydantic 模型类。
参数模式应该是以下之一
pydantic.BaseModel的子类。- 如果在 pydantic 2 中访问 v1 命名空间,则为
pydantic.v1.BaseModel的子类 - 一个 JSON schema 字典
handle_tool_error class-attribute instance-attribute ¶
handle_tool_error: bool = True
处理抛出的 ToolException 的内容。
max_depth class-attribute instance-attribute ¶
max_depth: int | None = None
爬取的最大深度。定义爬虫可以从基础 URL 探索多远。
max_depth 必须大于 0
默认值为 1
max_breadth class-attribute instance-attribute ¶
max_breadth: int | None = None
每个树层级(即每个页面)要跟随的最大链接数。
max_breadth 必须大于 0
默认值为 20
limit class-attribute instance-attribute ¶
limit: int | None = None
爬虫在停止前将处理的总链接数。
limit 必须大于 0
默认值为 50
instructions class-attribute instance-attribute ¶
instructions: str | None = None
给爬虫的自然语言指令。
例如:"Python SDK"
select_paths class-attribute instance-attribute ¶
用于仅选择具有特定路径模式的 URL 的正则表达式模式。
例如:["/api/v1.*"]
select_domains class-attribute instance-attribute ¶
用于仅从特定域名或子域名中选择 URL 的正则表达式模式。
例如:["^docs.example.com$"]
exclude_paths class-attribute instance-attribute ¶
用于排除具有特定路径模式的 URL 的正则表达式模式,例如:[/private/., /admin/.]
exclude_domains class-attribute instance-attribute ¶
用于排除特定域名或子域名免于爬取的正则表达式模式,例如:[^private.example.com$]
allow_external class-attribute instance-attribute ¶
allow_external: bool | None = None
是否允许跟踪指向外部域名的链接。
默认值为 False
include_images class-attribute instance-attribute ¶
include_images: bool | None = None
是否在爬取结果中包含图像。
默认值为 False
categories class-attribute instance-attribute ¶
categories: (
list[
Literal[
"Careers",
"Blogs",
"Documentation",
"About",
"Pricing",
"Community",
"Developers",
"Contact",
"Media",
]
]
| None
) = None
使用预定义类别(如“文档”、“博客”等)过滤 URL。
extract_depth class-attribute instance-attribute ¶
extract_depth: Literal['basic', 'advanced'] | None = None
高级提取会检索更多数据,包括表格和嵌入内容,成功率更高,但可能会增加延迟。
默认值为 basic
format class-attribute instance-attribute ¶
format: str | None = None
提取的网页内容的格式。markdown 返回 markdown 格式的内容。text 返回纯文本,可能会增加延迟。
默认值为 markdown
include_favicon class-attribute instance-attribute ¶
include_favicon: bool | None = None
是否为每个结果包含网站图标的 URL。
默认值为 False。
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init_subclass__ ¶
__init_subclass__(**kwargs: Any) -> None
在子类创建期间验证工具类定义。
| 参数 | 描述 |
|---|---|
**kwargs
|
传递给父类的额外关键字参数。
类型: |
| 引发 | 描述 |
|---|---|
SchemaAnnotationError
|
如果 |
run ¶
run(
tool_input: str | dict[str, Any],
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调(事件处理程序)的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
arun async ¶
arun(
tool_input: str | dict,
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
异步运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
TavilyExtract ¶
基类: BaseTool
该工具可向 Tavily Extract API 发送请求,并支持动态设置参数。
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
工具的输入模式。 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
__init_subclass__ |
在子类创建期间验证工具类定义。 |
run |
运行工具。 |
arun |
异步运行工具。 |
__init__ |
初始化工具。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
return_direct class-attribute instance-attribute ¶
return_direct: bool = False
是否直接返回工具的输出。
将其设置为 True 意味着在调用工具后,AgentExecutor 将停止循环。
callbacks class-attribute instance-attribute ¶
callbacks: Callbacks = Field(default=None, exclude=True)
在工具执行期间调用的回调。
tags class-attribute instance-attribute ¶
与工具关联的可选标签列表。
这些标签将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
metadata class-attribute instance-attribute ¶
与工具关联的可选元数据。
此元数据将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
handle_validation_error class-attribute instance-attribute ¶
handle_validation_error: (
bool | str | Callable[[ValidationError | ValidationError], str] | None
) = False
处理抛出的 ValidationError 的内容。
response_format class-attribute instance-attribute ¶
response_format: Literal['content', 'content_and_artifact'] = 'content'
工具响应格式。
如果为 `'content'`,则工具的输出被解释为 `ToolMessage` 的内容。如果为 `'content_and_artifact'`,则输出应为与 `ToolMessage` 的 `(content, artifact)` 对应的二元组。
is_single_input property ¶
is_single_input: bool
检查工具是否只接受单个输入参数。
| 返回 | 描述 |
|---|---|
bool
|
如果工具只有一个输入参数,则为 |
tool_call_schema property ¶
获取工具调用的模式,不包括注入的参数。
| 返回 | 描述 |
|---|---|
ArgsSchema
|
应用于语言模型工具调用的模式。 |
description class-attribute instance-attribute ¶
description: str = "Extracts comprehensive content from web pages based on provided URLs. This tool retrieves raw text of a web page, with an option to include images. It supports two extraction depths: 'basic' for standard text extraction and 'advanced' for a more comprehensive extraction with higher success rate. Ideal for use cases such as content curation, data ingestion for NLP models, and automated information retrieval, this endpoint seamlessly integrates into your content processing pipeline. Input should be a list of one or more URLs."
用于告诉模型如何/何时/为何使用该工具。
您可以提供 few-shot 示例作为描述的一部分。
args_schema class-attribute instance-attribute ¶
用于验证和解析工具输入参数的 Pydantic 模型类。
参数模式应该是以下之一
pydantic.BaseModel的子类。- 如果在 pydantic 2 中访问 v1 命名空间,则为
pydantic.v1.BaseModel的子类 - 一个 JSON schema 字典
handle_tool_error class-attribute instance-attribute ¶
handle_tool_error: bool = True
处理抛出的 ToolException 的内容。
extract_depth class-attribute instance-attribute ¶
extract_depth: Literal['basic', 'advanced'] | None = None
提取过程的深度。“advanced”提取比“basic”提取检索更多数据,成功率更高,但可能会增加延迟。
默认值为“basic”
include_images class-attribute instance-attribute ¶
include_images: bool | None = None
在响应中包含从 URL 中提取的图像列表。
默认值为 False
format class-attribute instance-attribute ¶
format: str | None = None
提取的网页内容的格式。markdown 返回 markdown 格式的内容。text 返回纯文本,可能会增加延迟。
默认值为“markdown”
include_favicon class-attribute instance-attribute ¶
include_favicon: bool | None = None
是否为每个结果包含网站图标的 URL。
默认值为 False。
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init_subclass__ ¶
__init_subclass__(**kwargs: Any) -> None
在子类创建期间验证工具类定义。
| 参数 | 描述 |
|---|---|
**kwargs
|
传递给父类的额外关键字参数。
类型: |
| 引发 | 描述 |
|---|---|
SchemaAnnotationError
|
如果 |
run ¶
run(
tool_input: str | dict[str, Any],
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调(事件处理程序)的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
arun async ¶
arun(
tool_input: str | dict,
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
异步运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
TavilyMap ¶
基类: BaseTool
该工具可向 Tavily Map API 发送请求,并支持动态设置参数。
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
工具的输入模式。 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
__init_subclass__ |
在子类创建期间验证工具类定义。 |
run |
运行工具。 |
arun |
异步运行工具。 |
__init__ |
初始化工具。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
return_direct class-attribute instance-attribute ¶
return_direct: bool = False
是否直接返回工具的输出。
将其设置为 True 意味着在调用工具后,AgentExecutor 将停止循环。
callbacks class-attribute instance-attribute ¶
callbacks: Callbacks = Field(default=None, exclude=True)
在工具执行期间调用的回调。
tags class-attribute instance-attribute ¶
与工具关联的可选标签列表。
这些标签将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
metadata class-attribute instance-attribute ¶
与工具关联的可选元数据。
此元数据将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
handle_validation_error class-attribute instance-attribute ¶
handle_validation_error: (
bool | str | Callable[[ValidationError | ValidationError], str] | None
) = False
处理抛出的 ValidationError 的内容。
response_format class-attribute instance-attribute ¶
response_format: Literal['content', 'content_and_artifact'] = 'content'
工具响应格式。
如果为 `'content'`,则工具的输出被解释为 `ToolMessage` 的内容。如果为 `'content_and_artifact'`,则输出应为与 `ToolMessage` 的 `(content, artifact)` 对应的二元组。
is_single_input property ¶
is_single_input: bool
检查工具是否只接受单个输入参数。
| 返回 | 描述 |
|---|---|
bool
|
如果工具只有一个输入参数,则为 |
tool_call_schema property ¶
获取工具调用的模式,不包括注入的参数。
| 返回 | 描述 |
|---|---|
ArgsSchema
|
应用于语言模型工具调用的模式。 |
description class-attribute instance-attribute ¶
description: str = '"A powerful web mapping tool that creates a structured map of website URLs, allowing \n you to discover and analyze site structure, content organization, and navigation paths. \n Perfect for site audits, content discovery, and understanding website architecture.\n '
用于告诉模型如何/何时/为何使用该工具。
您可以提供 few-shot 示例作为描述的一部分。
args_schema class-attribute instance-attribute ¶
用于验证和解析工具输入参数的 Pydantic 模型类。
参数模式应该是以下之一
pydantic.BaseModel的子类。- 如果在 pydantic 2 中访问 v1 命名空间,则为
pydantic.v1.BaseModel的子类 - 一个 JSON schema 字典
handle_tool_error class-attribute instance-attribute ¶
handle_tool_error: bool = True
处理抛出的 ToolException 的内容。
max_depth class-attribute instance-attribute ¶
max_depth: int | None = None
爬取的最大深度。定义爬虫可以从基础 URL 探索多远。
max_depth 必须大于 0
默认值为 1
max_breadth class-attribute instance-attribute ¶
max_breadth: int | None = None
每个树层级(即每个页面)要跟随的最大链接数。
max_breadth 必须大于 0
默认值为 20
limit class-attribute instance-attribute ¶
limit: int | None = None
爬虫在停止前将处理的总链接数。
limit 必须大于 0
默认值为 50
instructions class-attribute instance-attribute ¶
instructions: str | None = None
给爬虫的自然语言指令。
例如:"Python SDK"
select_paths class-attribute instance-attribute ¶
用于仅选择具有特定路径模式的 URL 的正则表达式模式。
例如:["/api/v1.*"]
select_domains class-attribute instance-attribute ¶
用于仅从特定域名或子域名中选择 URL 的正则表达式模式。
例如:["^docs.example.com$"]
exclude_paths class-attribute instance-attribute ¶
用于排除具有特定路径模式的 URL 的正则表达式模式,例如:[/private/., /admin/.]
exclude_domains class-attribute instance-attribute ¶
用于从映射中排除特定域名或子域名的正则表达式模式,例如:[^private.example.com$]
allow_external class-attribute instance-attribute ¶
allow_external: bool | None = None
是否允许跟踪指向外部域名的链接。
默认值为 False
categories class-attribute instance-attribute ¶
categories: (
list[
Literal[
"Careers",
"Blogs",
"Documentation",
"About",
"Pricing",
"Community",
"Developers",
"Contact",
"Media",
]
]
| None
) = None
使用预定义类别(如“文档”、“博客”等)过滤 URL。
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init_subclass__ ¶
__init_subclass__(**kwargs: Any) -> None
在子类创建期间验证工具类定义。
| 参数 | 描述 |
|---|---|
**kwargs
|
传递给父类的额外关键字参数。
类型: |
| 引发 | 描述 |
|---|---|
SchemaAnnotationError
|
如果 |
run ¶
run(
tool_input: str | dict[str, Any],
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调(事件处理程序)的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
arun async ¶
arun(
tool_input: str | dict,
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
异步运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
TavilySearch ¶
基类: BaseTool
该工具用于查询 Tavily Search API 并返回 json 格式的数据。
设置
安装 langchain-tavily 并设置环境变量 TAVILY_API_KEY。
.. code-block:: bash
pip install -U langchain-tavily
export TAVILY_API_KEY="your-api-key"
实例化
.. code-block:: python
from langchain_tavily import TavilySearch
tool = TavilySearch(
max_results=1,
topic="general",
# include_answer=False,
# include_raw_content=False,
# include_images=False,
# include_image_descriptions=False,
# search_depth="basic",
# time_range="day",
# include_domains=None,
# exclude_domains=None,
# country=None
# include_favicon=False
)
使用参数直接调用
.. code-block:: python
tool.invoke({"query": "What happened at the last wimbledon"})
.. code-block:: json
{
'query': 'What happened at the last wimbledon',
'follow_up_questions': None,
'answer': None,
'images': [],
'results': [{'title': "Andy Murray pulls out of the men's singles draw at his last Wimbledon",
'url': 'https://www.nbcnews.com/news/sports/andy-murray-wimbledon-tennis-singles-draw-rcna159912',
'content': "NBC News Now LONDON — Andy Murray, one of the last decade's most successful ..."
'score': 0.6755297,
'raw_content': None
}],
'response_time': 1.31
}
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
工具的输入模式。 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
__init_subclass__ |
在子类创建期间验证工具类定义。 |
run |
运行工具。 |
arun |
异步运行工具。 |
__init__ |
初始化工具。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
return_direct class-attribute instance-attribute ¶
return_direct: bool = False
是否直接返回工具的输出。
将其设置为 True 意味着在调用工具后,AgentExecutor 将停止循环。
callbacks class-attribute instance-attribute ¶
callbacks: Callbacks = Field(default=None, exclude=True)
在工具执行期间调用的回调。
tags class-attribute instance-attribute ¶
与工具关联的可选标签列表。
这些标签将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
metadata class-attribute instance-attribute ¶
与工具关联的可选元数据。
此元数据将与对此工具的每次调用相关联,并作为参数传递给在 callbacks 中定义的处理程序。
您可以使用它们来(例如)通过其用例识别工具的特定实例。
handle_validation_error class-attribute instance-attribute ¶
handle_validation_error: (
bool | str | Callable[[ValidationError | ValidationError], str] | None
) = False
处理抛出的 ValidationError 的内容。
response_format class-attribute instance-attribute ¶
response_format: Literal['content', 'content_and_artifact'] = 'content'
工具响应格式。
如果为 `'content'`,则工具的输出被解释为 `ToolMessage` 的内容。如果为 `'content_and_artifact'`,则输出应为与 `ToolMessage` 的 `(content, artifact)` 对应的二元组。
is_single_input property ¶
is_single_input: bool
检查工具是否只接受单个输入参数。
| 返回 | 描述 |
|---|---|
bool
|
如果工具只有一个输入参数,则为 |
tool_call_schema property ¶
获取工具调用的模式,不包括注入的参数。
| 返回 | 描述 |
|---|---|
ArgsSchema
|
应用于语言模型工具调用的模式。 |
description class-attribute instance-attribute ¶
description: str = "A search engine optimized for comprehensive, accurate, and trusted results. Useful for when you need to answer questions about current events. It not only retrieves URLs and snippets, but offers advanced search depths, domain management, time range filters, and image search, this tool delivers real-time, accurate, and citation-backed results.Input should be a search query."
用于告诉模型如何/何时/为何使用该工具。
您可以提供 few-shot 示例作为描述的一部分。
args_schema class-attribute instance-attribute ¶
用于验证和解析工具输入参数的 Pydantic 模型类。
参数模式应该是以下之一
pydantic.BaseModel的子类。- 如果在 pydantic 2 中访问 v1 命名空间,则为
pydantic.v1.BaseModel的子类 - 一个 JSON schema 字典
handle_tool_error class-attribute instance-attribute ¶
handle_tool_error: bool = True
处理抛出的 ToolException 的内容。
auto_parameters class-attribute instance-attribute ¶
auto_parameters: bool | None = None
当启用 auto_parameters 时,Tavily 会根据您的查询内容和意图自动配置搜索参数。您仍然可以手动设置其他参数,您明确设置的值将覆盖自动配置的值。参数 include_answer、include_raw_content 和 max_results 必须始终手动设置,因为它们直接影响响应的大小。注意:当 search_depth 很可能改善结果时,它可能会被自动设置为 `advanced`。这会使每次请求消耗 2 个 API 积分。为避免额外费用,您可以明确地将 search_depth 设置为 `basic`。
默认值为 False。
include_domains class-attribute instance-attribute ¶
一个域名列表,用于将这些域名特别包含在搜索结果中
默认值为 None
exclude_domains class-attribute instance-attribute ¶
一个域名列表,用于将这些域名从搜索结果中特别排除
默认值为 None
search_depth class-attribute instance-attribute ¶
search_depth: Literal['basic', 'advanced'] | None = None
搜索的深度。可以是 'basic' 或 'advanced'
默认值为 "basic"
include_images class-attribute instance-attribute ¶
include_images: bool | None = None
在响应中包含与查询相关的图片列表
默认值为 False
time_range class-attribute instance-attribute ¶
time_range: Literal['day', 'week', 'month', 'year'] | None = None
从当前日期回溯的时间范围,用于筛选结果
默认值为 None
topic class-attribute instance-attribute ¶
topic: Literal['general', 'news', 'finance'] | None = None
搜索的类别。可以是 "general"、"news" 或 "finance"。
默认值为 "general"。
include_answer class-attribute instance-attribute ¶
在搜索结果中包含对原始查询的简短回答。
默认值为 False。
include_raw_content class-attribute instance-attribute ¶
包含每个搜索结果经过清理和解析的 HTML 内容。"markdown" 返回 Markdown 格式的搜索结果内容。"text" 返回结果中的纯文本,可能会增加延迟。
默认值为 "markdown"。
include_image_descriptions class-attribute instance-attribute ¶
include_image_descriptions: bool | None = None
为搜索结果中的每张图片包含描述性文本。
默认值为 False。
country class-attribute instance-attribute ¶
country: str | None = None
提升来自特定国家/地区的搜索结果。这将优先显示所选国家/地区的内容。仅当 topic 为 general 时可用。
要查看支持的国家/地区,请访问我们的文档 https://docs.tavily.com/documentation/api-reference/endpoint/search 默认值为 None。
include_favicon class-attribute instance-attribute ¶
include_favicon: bool | None = None
是否为每个结果包含网站图标的 URL。
默认值为 False。
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init_subclass__ ¶
__init_subclass__(**kwargs: Any) -> None
在子类创建期间验证工具类定义。
| 参数 | 描述 |
|---|---|
**kwargs
|
传递给父类的额外关键字参数。
类型: |
| 引发 | 描述 |
|---|---|
SchemaAnnotationError
|
如果 |
run ¶
run(
tool_input: str | dict[str, Any],
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调(事件处理程序)的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |
arun async ¶
arun(
tool_input: str | dict,
verbose: bool | None = None,
start_color: str | None = "green",
color: str | None = "green",
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
config: RunnableConfig | None = None,
tool_call_id: str | None = None,
**kwargs: Any,
) -> Any
异步运行工具。
| 参数 | 描述 |
|---|---|
tool_input
|
工具的输入。 |
verbose
|
是否记录工具的进度。
类型: |
start_color
|
启动工具时使用的颜色。
类型: |
color
|
结束工具时使用的颜色。
类型: |
回调
|
在工具执行期间调用的回调。
类型: |
tags
|
与工具关联的可选标签列表。 |
metadata
|
与工具关联的可选元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
配置
|
工具的配置。
类型: |
tool_call_id
|
工具调用的 ID。
类型: |
**kwargs
|
要传递给工具回调的关键字参数
类型: |
| 返回 | 描述 |
|---|---|
任意
|
工具的输出。 |
| 引发 | 描述 |
|---|---|
ToolException
|
如果工具执行期间发生错误。 |