可运行对象
langchain_core.runnables.base.Runnable ¶
Bases: ABC, Generic[Input, Output]
一个可以被调用、批处理、流式处理、转换和组合的工作单元。
关键方法¶
invoke/ainvoke:将单个输入转换为输出。batch/abatch:高效地将多个输入转换为多个输出。stream/astream:在单个输入产生输出时,以流的形式输出。astream_log:以流的形式输出结果以及选定的中间结果。
内置优化
-
批处理 (Batch):默认情况下,批处理运行使用线程池执行器并行调用 invoke()。可重写此方法以优化批处理。
-
异步 (Async):带 'a' 后缀的方法是异步的。默认情况下,它们使用 asyncio 的线程池执行同步对应方法。可重写此方法以实现原生异步。
所有方法都接受一个可选的 config 参数,该参数可用于配置执行、添加标签和元数据以进行追踪和调试等。
Runnable 通过 input_schema 属性、output_schema 属性和 config_schema 方法,公开有关其输入、输出和配置的结构化信息。
组合¶
Runnable 对象可以组合在一起,以声明式的方式创建链。
以这种方式构建的任何链都将自动支持同步、异步、批处理和流式处理。
主要的组合原语是 RunnableSequence 和 RunnableParallel。
RunnableSequence 按顺序调用一系列 runnable,其中一个 Runnable 的输出作为下一个的输入。使用 | 操作符或通过将 runnable 列表传递给 RunnableSequence 来构造。
RunnableParallel 并发调用 runnable,为每个 runnable 提供相同的输入。在序列中使用字典字面量或通过将字典传递给 RunnableParallel 来构造。
例如,
from langchain_core.runnables import RunnableLambda
# A RunnableSequence constructed using the `|` operator
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(1) # 4
sequence.batch([1, 2, 3]) # [4, 6, 8]
# A sequence that contains a RunnableParallel constructed using a dict literal
sequence = RunnableLambda(lambda x: x + 1) | {
"mul_2": RunnableLambda(lambda x: x * 2),
"mul_5": RunnableLambda(lambda x: x * 5),
}
sequence.invoke(1) # {'mul_2': 4, 'mul_5': 10}
标准方法¶
所有 Runnable 都公开了可用于修改其行为的附加方法(例如,添加重试策略、添加生命周期监听器、使其可配置等)。
这些方法适用于任何 Runnable,包括通过组合其他 Runnable 构建的 Runnable 链。有关详细信息,请参阅各个方法。
例如,
from langchain_core.runnables import RunnableLambda
import random
def add_one(x: int) -> int:
return x + 1
def buggy_double(y: int) -> int:
"""Buggy code that will fail 70% of the time"""
if random.random() > 0.3:
print('This code failed, and will probably be retried!') # noqa: T201
raise ValueError('Triggered buggy code')
return y * 2
sequence = (
RunnableLambda(add_one) |
RunnableLambda(buggy_double).with_retry( # Retry on failure
stop_after_attempt=10,
wait_exponential_jitter=False
)
)
print(sequence.input_schema.model_json_schema()) # Show inferred input schema
print(sequence.output_schema.model_json_schema()) # Show inferred output schema
print(sequence.invoke(2)) # invoke the sequence (note the retry above!!)
调试与追踪¶
随着链越来越长,能够看到中间结果以调试和追踪链会变得很有用。
您可以将全局调试标志设置为 True 以启用所有链的调试输出
或者,您可以将现有的或自定义的回调函数传递给任何给定的链
from langchain_core.tracers import ConsoleCallbackHandler
chain.invoke(..., config={"callbacks": [ConsoleCallbackHandler()]})
要获得一个用户界面(以及更多功能),请查看 LangSmith。
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke 抽象方法 ¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke 异步 ¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch 异步 ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed 异步 ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream 异步 ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log 异步 ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events 异步 ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform 异步 ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
langchain_core.runnables.base.RunnableBinding ¶
Bases: RunnableBindingBase[Input, Output]
用附加功能包装一个 Runnable。
一个 RunnableBinding 可以被认为是一个“runnable 装饰器”,它保留了 Runnable 的基本特性,即批处理、流式处理和异步支持,同时添加了额外的功能。
任何继承自 Runnable 的类都可以绑定到一个 RunnableBinding。Runnable 公开了一套标准方法,用于创建 RunnableBindings 或 RunnableBindings 的子类(例如,RunnableRetry、RunnableWithFallbacks),这些方法可以添加额外的功能。
这些方法包括
bind:绑定在运行时传递给底层Runnable的 kwargs。with_config:绑定在运行时传递给底层Runnable的配置。with_listeners:将生命周期监听器绑定到底层Runnable。with_types:覆盖底层Runnable的输入和输出类型。with_retry:将重试策略绑定到底层Runnable。with_fallbacks:将回退策略绑定到底层Runnable。
示例:bind:绑定在运行时传递给底层 Runnable 的 kwargs。
```python
# Create a Runnable binding that invokes the chat model with the
# additional kwarg `stop=['-']` when running it.
from langchain_community.chat_models import ChatOpenAI
model = ChatOpenAI()
model.invoke('Say "Parrot-MAGIC"', stop=["-"]) # Should return `Parrot`
# Using it the easy way via `bind` method which returns a new
# RunnableBinding
runnable_binding = model.bind(stop=["-"])
runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`
```
Can also be done by instantiating a `RunnableBinding` directly (not
recommended):
```python
from langchain_core.runnables import RunnableBinding
runnable_binding = RunnableBinding(
bound=model,
kwargs={"stop": ["-"]}, # <-- Note the additional kwargs
)
runnable_binding.invoke('Say "Parrot-MAGIC"') # Should return `Parrot`
```
| 方法 | 描述 |
|---|---|
bind |
将额外的 kwargs 绑定到一个 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
with_alisteners |
将异步生命周期侦听器绑定到 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
__init__ |
从一个 |
is_lc_serializable |
返回 True,因为这个类是可序列化的。 |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
lc_secrets 属性 ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
kwargs 类属性 实例属性 ¶
运行时传递给底层 Runnable 的 kwargs。
例如,当调用 Runnable 绑定时,底层的 Runnable 将被使用相同的输入和这些额外的 kwargs 来调用。
config_factories 类属性 实例属性 ¶
config_factories: list[Callable[[RunnableConfig], RunnableConfig]] = Field(
default_factory=list
)
要绑定到底层 Runnable 的配置工厂。
custom_input_type 类属性 实例属性 ¶
custom_input_type: Any | None = None
使用自定义类型覆盖底层 Runnable 的输入类型。
该类型可以是 Pydantic 模型,或类型注解(例如,list[str])。
custom_output_type 类属性 实例属性 ¶
custom_output_type: Any | None = None
使用自定义类型覆盖底层 Runnable 的输出类型。
该类型可以是 Pydantic 模型,或类型注解(例如,list[str])。
bind ¶
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time,以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
with_types ¶
with_retry ¶
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
invoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke 异步 ¶
ainvoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch 异步 ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed 异步 ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream 异步 ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log 异步 ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events 异步 ¶
astream_events(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform 异步 ¶
atransform(
input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
__init__ ¶
__init__(
*,
bound: Runnable[Input, Output],
kwargs: Mapping[str, Any] | None = None,
config: RunnableConfig | None = None,
config_factories: list[Callable[[RunnableConfig], RunnableConfig]] | None = None,
custom_input_type: type[Input] | BaseModel | None = None,
custom_output_type: type[Output] | BaseModel | None = None,
**other_kwargs: Any,
) -> None
从一个 Runnable 和 kwargs 创建一个 RunnableBinding。
| 参数 | 描述 |
|---|---|
bound
|
此
类型: |
kwargs
|
在运行底层 |
配置
|
绑定到底层
类型: |
config_factories
|
在绑定到底层
类型: |
custom_input_type
|
指定以使用自定义类型覆盖底层 |
custom_output_type
|
指定以使用自定义类型覆盖底层 |
**other_kwargs
|
解包到基类中。
类型: |
get_lc_namespace 类方法 ¶
lc_id 类方法 ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
langchain_core.runnables.base.RunnableGenerator ¶
Bases: Runnable[Input, Output]
运行生成器函数的 Runnable。
RunnableGenerator 可以直接实例化,也可以通过在序列中使用生成器来创建。
RunnableGenerator 可用于实现自定义行为,例如自定义输出解析器,同时保留流式处理能力。给定一个签名为 Iterator[A] -> Iterator[B] 的生成器函数,将其包装在 RunnableGenerator 中,可以使其在从上一步骤流式输入数据块时立即发出输出数据块。
注意
如果一个生成器函数的签名为 A -> Iterator[B],即它需要在发出数据块之前完成上一步的输入(例如,大多数 LLM 需要完整的提示才能开始生成),那么可以将其包装在 RunnableLambda 中。
下面是一个展示 RunnableGenerator 基本机制的示例
```python
from typing import Any, AsyncIterator, Iterator
from langchain_core.runnables import RunnableGenerator
def gen(input: Iterator[Any]) -> Iterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(gen)
runnable.invoke(None) # "Have a nice day"
list(runnable.stream(None)) # ["Have", " a", " nice", " day"]
runnable.batch([None, None]) # ["Have a nice day", "Have a nice day"]
# Async version:
async def agen(input: AsyncIterator[Any]) -> AsyncIterator[str]:
for token in ["Have", " a", " nice", " day"]:
yield token
runnable = RunnableGenerator(agen)
await runnable.ainvoke(None) # "Have a nice day"
[p async for p in runnable.astream(None)] # ["Have", " a", " nice", " day"]
```
RunnableGenerator 使得在流式处理上下文中实现自定义行为变得容易。下面我们展示一个示例
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableGenerator, RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
model = ChatOpenAI()
chant_chain = (
ChatPromptTemplate.from_template("Give me a 3 word chant about {topic}")
| model
| StrOutputParser()
)
def character_generator(input: Iterator[str]) -> Iterator[str]:
for token in input:
if "," in token or "." in token:
yield "👏" + token
else:
yield token
runnable = chant_chain | character_generator
assert type(runnable.last) is RunnableGenerator
"".join(runnable.stream({"topic": "waste"})) # Reduce👏, Reuse👏, Recycle👏.
# Note that RunnableLambda can be used to delay streaming of one step in a
# sequence until the previous step is finished:
def reverse_generator(input: str) -> Iterator[str]:
# Yield characters of input in reverse order.
for character in input[::-1]:
yield character
runnable = chant_chain | RunnableLambda(reverse_generator)
"".join(runnable.stream({"topic": "waste"})) # ".elcycer ,esuer ,ecudeR"
```
| 方法 | 描述 |
|---|---|
__init__ |
初始化一个 |
get_input_schema |
获取可用于验证 |
get_output_schema |
获取可用于验证 |
transform |
将输入转换为输出。 |
stream |
|
invoke |
将单个输入转换为输出。 |
atransform |
将输入转换为输出。 |
astream |
|
ainvoke |
将单个输入转换为输出。 |
get_name |
获取 |
get_input_jsonschema |
获取表示 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
astream_log |
流式传输 |
astream_events |
生成事件流。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
__init__ ¶
__init__(
transform: Callable[[Iterator[Input]], Iterator[Output]]
| Callable[[AsyncIterator[Input]], AsyncIterator[Output]],
atransform: Callable[[AsyncIterator[Input]], AsyncIterator[Output]] | None = None,
*,
name: str | None = None,
) -> None
初始化一个 RunnableGenerator。
| 参数 | 描述 |
|---|---|
transform
|
转换函数。
类型: |
atransform
|
异步转换函数。
类型: |
name
|
类型: |
| 引发 | 描述 |
|---|---|
TypeError
|
如果转换不是一个生成器函数。 |
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
invoke ¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
atransform ¶
atransform(
input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
ainvoke 异步 ¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
get_name ¶
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch 异步 ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed 异步 ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
astream_log 异步 ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events 异步 ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
langchain_core.runnables.base.RunnableLambda ¶
Bases: Runnable[Input, Output]
RunnableLambda 将一个 Python 可调用对象转换为 Runnable。
将一个可调用对象包装在 RunnableLambda 中,使得该可调用对象可以在同步或异步上下文中使用。
RunnableLambda 可以像任何其他 Runnable 一样进行组合,并提供与 LangChain 追踪的无缝集成。
RunnableLambda 最适合不需要支持流式处理的代码。如果您需要支持流式处理(即能够操作输入块并产生输出块),请改用 RunnableGenerator。
请注意,如果一个 RunnableLambda 返回一个 Runnable 的实例,该实例将在执行期间被调用(或流式处理)。
示例
# This is a RunnableLambda
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
runnable.invoke(1) # returns 2
runnable.batch([1, 2, 3]) # returns [2, 3, 4]
# Async is supported by default by delegating to the sync implementation
await runnable.ainvoke(1) # returns 2
await runnable.abatch([1, 2, 3]) # returns [2, 3, 4]
# Alternatively, can provide both synd and sync implementations
async def add_one_async(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one, afunc=add_one_async)
runnable.invoke(1) # Uses add_one
await runnable.ainvoke(1) # Uses add_one_async
| 方法 | 描述 |
|---|---|
__init__ |
从一个可调用对象、一个异步可调用对象或两者创建一个 |
get_input_schema |
此 |
get_output_schema |
获取可用于验证 |
get_graph |
返回此 |
__repr__ |
返回此 |
invoke |
同步调用此 |
ainvoke |
异步调用此 |
transform |
将输入转换为输出。 |
stream |
|
atransform |
将输入转换为输出。 |
astream |
|
get_name |
获取 |
get_input_jsonschema |
获取表示 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
astream_log |
流式传输 |
astream_events |
生成事件流。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
deps cached property ¶
__init__ ¶
__init__(
func: Callable[[Input], Iterator[Output]]
| Callable[[Input], Runnable[Input, Output]]
| Callable[[Input], Output]
| Callable[[Input, RunnableConfig], Output]
| Callable[[Input, CallbackManagerForChainRun], Output]
| Callable[[Input, CallbackManagerForChainRun, RunnableConfig], Output]
| Callable[[Input], Awaitable[Output]]
| Callable[[Input], AsyncIterator[Output]]
| Callable[[Input, RunnableConfig], Awaitable[Output]]
| Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]]
| Callable[
[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]
],
afunc: Callable[[Input], Awaitable[Output]]
| Callable[[Input], AsyncIterator[Output]]
| Callable[[Input, RunnableConfig], Awaitable[Output]]
| Callable[[Input, AsyncCallbackManagerForChainRun], Awaitable[Output]]
| Callable[
[Input, AsyncCallbackManagerForChainRun, RunnableConfig], Awaitable[Output]
]
| None = None,
name: str | None = None,
) -> None
从一个可调用对象、一个异步可调用对象或两者创建一个 RunnableLambda。
接受同步和异步两种变体,以便为同步和异步执行提供高效的实现。
| 参数 | 描述 |
|---|---|
func
|
同步或异步可调用对象
类型: |
afunc
|
一个异步可调用对象,接收一个输入并返回一个输出。
类型: |
name
|
类型: |
| 引发 | 描述 |
|---|---|
TypeError
|
如果 `func` 不是一个可调用类型。 |
TypeError
|
如果同时提供了 `func` 和 `afunc`。 |
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
此 Runnable 输入的 Pydantic 模式。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
此 `Runnable` 的输入模式。 |
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
invoke ¶
invoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
同步调用此 Runnable。
| 参数 | 描述 |
|---|---|
输入
|
此 `Runnable` 的输入。
类型: |
配置
|
要使用的配置。
类型: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
输出
|
此 `Runnable` 的输出。 |
| 引发 | 描述 |
|---|---|
TypeError
|
如果 `Runnable` 是一个协程函数。 |
ainvoke async ¶
ainvoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
异步调用此 Runnable。
| 参数 | 描述 |
|---|---|
输入
|
此 `Runnable` 的输入。
类型: |
配置
|
要使用的配置。
类型: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
输出
|
此 `Runnable` 的输出。 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
get_name ¶
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
langchain_core.runnables.base.RunnableParallel ¶
基类:RunnableSerializable[Input, dict[str, Any]]
并行运行一个 `Runnable` 映射的 Runnable。
返回其输出的映射。
RunnableParallel 是与 RunnableSequence 并列的两个主要组合原语之一。它并发调用多个 Runnable,并为每个 Runnable 提供相同的输入。
可以直接实例化 `RunnableParallel`,也可以在序列中使用字典字面量来创建。
这里有一个使用函数来说明 `RunnableParallel` 用法的简单示例。
```python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
def mul_three(x: int) -> int:
return x * 3
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
runnable_3 = RunnableLambda(mul_three)
sequence = runnable_1 | { # this dict is coerced to a RunnableParallel
"mul_two": runnable_2,
"mul_three": runnable_3,
}
# Or equivalently:
# sequence = runnable_1 | RunnableParallel(
# {"mul_two": runnable_2, "mul_three": runnable_3}
# )
# Also equivalently:
# sequence = runnable_1 | RunnableParallel(
# mul_two=runnable_2,
# mul_three=runnable_3,
# )
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
```
`RunnableParallel` 可以轻松地并行运行 `Runnable`。在下面的示例中,我们同时从两个不同的 `Runnable` 对象流式传输输出。
```python
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnableParallel
from langchain_openai import ChatOpenAI
model = ChatOpenAI()
joke_chain = (
ChatPromptTemplate.from_template("tell me a joke about {topic}") | model
)
poem_chain = (
ChatPromptTemplate.from_template("write a 2-line poem about {topic}")
| model
)
runnable = RunnableParallel(joke=joke_chain, poem=poem_chain)
# Display stream
output = {key: "" for key, _ in runnable.output_schema()}
for chunk in runnable.stream({"topic": "bear"}):
for key in chunk:
output[key] = output[key] + chunk[key].content
print(output) # noqa: T201
```
| 方法 | 描述 |
|---|---|
__init__ |
创建一个 `RunnableParallel`。 |
is_lc_serializable |
返回 True,因为这个类是可序列化的。 |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
get_name |
获取 |
get_input_schema |
获取 `Runnable` 的输入模式。 |
get_output_schema |
获取 `Runnable` 的输出模式。 |
get_graph |
获取 `Runnable` 的图表示。 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
transform |
将输入转换为输出。 |
stream |
|
atransform |
将输入转换为输出。 |
astream |
|
get_input_jsonschema |
获取表示 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
astream_log |
流式传输 |
astream_events |
生成事件流。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
config_specs property ¶
config_specs: list[ConfigurableFieldSpec]
获取 `Runnable` 的配置规范。
| 返回 | 描述 |
|---|---|
list[ConfigurableFieldSpec]
|
`Runnable` 的配置规范。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
__init__ ¶
__init__(
steps__: Mapping[
str,
Runnable[Input, Any]
| Callable[[Input], Any]
| Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]],
]
| None = None,
**kwargs: Runnable[Input, Any]
| Callable[[Input], Any]
| Mapping[str, Runnable[Input, Any] | Callable[[Input], Any]],
) -> None
创建一个 `RunnableParallel`。
| 参数 | 描述 |
|---|---|
steps__
|
要包含的步骤。
类型: |
**kwargs
|
要包含的额外步骤。
类型: |
get_lc_namespace classmethod ¶
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取 `Runnable` 的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
`Runnable` 的输入模式。 |
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取 `Runnable` 的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
`Runnable` 的输出模式。 |
get_graph ¶
get_graph(config: RunnableConfig | None = None) -> Graph
获取 `Runnable` 的图表示。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
Graph
|
`Runnable` 的图表示。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果一个 `Runnable` 没有首节点或尾节点。 |
invoke ¶
invoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any
) -> dict[str, Any]
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
ainvoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> dict[str, Any]
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> Iterator[dict[str, Any]]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[dict[str, Any]]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input], config: RunnableConfig | None = None, **kwargs: Any
) -> AsyncIterator[dict[str, Any]]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[dict[str, Any]]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
langchain_core.runnables.base.RunnableSequence ¶
基类:RunnableSerializable[Input, Output]
`Runnable` 对象的序列,其中一个的输出是下一个的输入。
`RunnableSequence` 是 LangChain 中最重要的组合运算符,因为它几乎在每个链中都会使用。
可以直接实例化 `RunnableSequence`,或者更常见地是使用 `|` 运算符,其中左或右操作数(或两者)必须是 `Runnable`。
任何 `RunnableSequence` 都会自动支持同步、异步和批处理。
对于 I/O 绑定的 `Runnable`,`batch` 和 `abatch` 的默认实现利用了线程池和 asyncio gather,因此会比简单调用 `invoke` 或 `ainvoke` 更快。
批处理是通过按顺序在 `RunnableSequence` 的每个组件上调用批处理方法来实现的。
一个 `RunnableSequence` 会保留其组件的流式处理属性,因此如果序列的所有组件都实现了 `transform` 方法——该方法实现了将流式输入映射到流式输出的逻辑——那么该序列就能够将输入流式传输到输出!
如果序列中的任何组件没有实现 transform,那么流式处理将仅在该组件运行后开始。如果有多个阻塞组件,流式处理将在最后一个组件之后开始。
注意
默认情况下,`RunnableLambdas` 不支持 `transform`!因此,如果你需要使用 `RunnableLambdas`,请注意它们在 `RunnableSequence` 中的位置(如果你需要使用 `stream`/`astream` 方法)。
如果你需要任意逻辑并需要流式处理,你可以子类化 Runnable,并为你需要的任何逻辑实现 `transform`。
这里有一个使用简单函数来说明 `RunnableSequence` 用法的简单示例。
```python
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1 | runnable_2
# Or equivalently:
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
```
这是一个使用流式处理由 LLM 生成的 JSON 输出的示例。
```python
from langchain_core.output_parsers.json import SimpleJsonOutputParser
from langchain_openai import ChatOpenAI
prompt = PromptTemplate.from_template(
"In JSON format, give me a list of {topic} and their "
"corresponding names in French, Spanish and in a "
"Cat Language."
)
model = ChatOpenAI()
chain = prompt | model | SimpleJsonOutputParser()
async for chunk in chain.astream({"topic": "colors"}):
print("-") # noqa: T201
print(chunk, sep="", flush=True) # noqa: T201
```
| 方法 | 描述 |
|---|---|
__init__ |
创建一个新的 `RunnableSequence`。 |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
is_lc_serializable |
返回 True,因为这个类是可序列化的。 |
get_input_schema |
获取 `Runnable` 的输入模式。 |
get_output_schema |
获取 `Runnable` 的输出模式。 |
get_graph |
获取 `Runnable` 的图表示。 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
abatch |
默认实现使用 |
transform |
将输入转换为输出。 |
stream |
|
atransform |
将输入转换为输出。 |
astream |
|
get_name |
获取 |
get_input_jsonschema |
获取表示 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_prompts |
返回此 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
batch_as_completed |
在输入列表上并行运行 |
abatch_as_completed |
在输入列表上并行运行 |
astream_log |
流式传输 |
astream_events |
生成事件流。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
middle class-attribute instance-attribute ¶
序列中的中间 `Runnable`。
steps property ¶
config_specs property ¶
config_specs: list[ConfigurableFieldSpec]
获取 `Runnable` 的配置规范。
| 返回 | 描述 |
|---|---|
list[ConfigurableFieldSpec]
|
`Runnable` 的配置规范。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
__init__ ¶
__init__(
*steps: RunnableLike,
name: str | None = None,
first: Runnable[Any, Any] | None = None,
middle: list[Runnable[Any, Any]] | None = None,
last: Runnable[Any, Any] | None = None,
) -> None
创建一个新的 `RunnableSequence`。
| 参数 | 描述 |
|---|---|
steps
|
要包含在序列中的步骤。
类型: |
name
|
类型: |
first
|
序列中的第一个 `Runnable`。 |
middle
|
序列中的中间 `Runnable` 对象。 |
last
|
序列中的最后一个 Runnable。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果序列的步骤少于 2 个。 |
get_lc_namespace classmethod ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取 `Runnable` 的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
`Runnable` 的输入模式。 |
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取 `Runnable` 的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
`Runnable` 的输出模式。 |
get_graph ¶
get_graph(config: RunnableConfig | None = None) -> Graph
获取 `Runnable` 的图表示。
| 参数 | 描述 |
|---|---|
配置
|
要使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
Graph
|
`Runnable` 的图表示。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果一个 `Runnable` 没有首节点或尾节点。 |
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
invoke ¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
ainvoke(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
get_name ¶
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
langchain_core.runnables.base.RunnableSerializable ¶
基类:Serializable, Runnable[Input, Output]
可以序列化为 JSON 的 Runnable。
| 方法 | 描述 |
|---|---|
to_json |
将 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
__init__ |
|
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
lc_secrets property ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke abstractmethod ¶
invoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
langchain_core.runnables.RunnableConfig ¶
基类:TypedDict
Runnable 的配置。
metadata instance-attribute ¶
此次调用及任何子调用的元数据(例如,一个链调用一个 LLM)。键应为字符串,值应为可 JSON 序列化的。
callbacks instance-attribute ¶
此次调用及任何子调用的回调函数(例如,一个链调用一个 LLM)。标签会传递给所有回调函数,元数据会传递给 handle*Start 回调函数。
max_concurrency instance-attribute ¶
max_concurrency: int | None
最大并行调用数。如果未提供,则默认为 ThreadPoolExecutor 的默认值。
configurable instance-attribute ¶
先前在此 Runnable 或子 Runnable 上通过 configurable_fields 或 configurable_alternatives 设为可配置的属性的运行时值。请检查 output_schema 以获取已设为可配置的属性的描述。