langchain-ibm¶
langchain_ibm ¶
ChatWatsonx ¶
IBM watsonx.ai 聊天模型集成。
设置
要使用此功能,您应该安装 langchain_ibm python 包,并将环境变量 WATSONX_API_KEY 设置为您的 API 密钥,或者将其作为名为 api_key 的参数传递给构造函数。
已弃用
apikey 和 WATSONX_APIKEY 已弃用,并将在 2.0.0 版本中移除。请改用 api_key 和 WATSONX_API_KEY。
实例化
使用所需参数创建模型实例。例如
from langchain_ibm import ChatWatsonx
from ibm_watsonx_ai.foundation_models.schema import TextChatParameters
parameters = TextChatParameters(
top_p=1, temperature=0.5, max_completion_tokens=None
)
model = ChatWatsonx(
model_id="meta-llama/llama-3-3-70b-instruct",
url="https://us-south.ml.cloud.ibm.com",
project_id="*****",
params=parameters,
# api_key="*****"
)
调用
从模型生成响应
messages = [
(
"system",
"You are a helpful translator. Translate the user sentence to French.",
),
("human", "I love programming."),
]
model.invoke(messages)
结果为 AIMessage 响应
AIMessage(
content="J'adore programmer.",
additional_kwargs={},
response_metadata={
"token_usage": {
"completion_tokens": 7,
"prompt_tokens": 30,
"total_tokens": 37,
},
"model_name": "ibm/granite-3-3-8b-instruct",
"system_fingerprint": "",
"finish_reason": "stop",
},
id="chatcmpl-529352c4-93ba-4801-8f1d-a3b4e3935194---daed91fb74d0405f200db1e63da9a48a---7a3ef799-4413-47e4-b24c-85d267e37fa2",
usage_metadata={"input_tokens": 30, "output_tokens": 7, "total_tokens": 37},
)
流
从模型流式传输响应
结果为包含部分内容的 AIMessageChunk 对象序列
AIMessageChunk(content="", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775")
AIMessageChunk(content="J", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775")
AIMessageChunk(content="'", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775")
AIMessageChunk(content="ad", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775")
AIMessageChunk(content="or", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775")
AIMessageChunk(
content=" programmer", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775"
)
AIMessageChunk(content=".", id="run--e48a38d3-1500-4b5e-870c-6313e8cff775")
AIMessageChunk(
content="",
response_metadata={
"finish_reason": "stop",
"model_name": "ibm/granite-3-3-8b-instruct",
},
id="run--e48a38d3-1500-4b5e-870c-6313e8cff775",
)
AIMessageChunk(
content="",
id="run--e48a38d3-1500-4b5e-870c-6313e8cff775",
usage_metadata={"input_tokens": 30, "output_tokens": 7, "total_tokens": 37},
)
要收集完整的消息,您可以连接这些块
AIMessageChunk(
content="J'adore programmer.",
response_metadata={
"finish_reason": "stop",
"model_name": "ibm/granite-3-3-8b-instruct",
},
id="chatcmpl-88a48b71-c149-4a0c-9c02-d6b97ca5dc6c---b7ba15879a8c5283b1e8a3b8db0229f0---0037ca4f-8a74-4f84-a46c-ab3fd1294f24",
usage_metadata={"input_tokens": 30, "output_tokens": 7, "total_tokens": 37},
)
异步
invoke、stream 和 batch 的异步等效方法也可用
# Invoke
await model.ainvoke(messages)
# Stream
async for chunk in model.astream(messages):
print(chunk.text)
# Batch
await model.abatch([messages])
结果为 AIMessage 响应
AIMessage(
content="J'adore programmer.",
additional_kwargs={},
response_metadata={
"token_usage": {
"completion_tokens": 7,
"prompt_tokens": 30,
"total_tokens": 37,
},
"model_name": "ibm/granite-3-3-8b-instruct",
"system_fingerprint": "",
"finish_reason": "stop",
},
id="chatcmpl-5bef2d81-ef56-463b-a8fa-c2bcc2a3c348---821e7750d18925f2b36226db66667e26---6396c786-9da9-4468-883e-11ed90a05937",
usage_metadata={"input_tokens": 30, "output_tokens": 7, "total_tokens": 37},
)
对于批量调用,结果为一个 list[AIMessage]。
工具调用
from pydantic import BaseModel, Field
class GetWeather(BaseModel):
'''Get the current weather in a given location'''
location: str = Field(
..., description="The city and state, e.g. San Francisco, CA"
)
class GetPopulation(BaseModel):
'''Get the current population in a given location'''
location: str = Field(
..., description="The city and state, e.g. San Francisco, CA"
)
model_with_tools = model.bind_tools(
[GetWeather, GetPopulation]
# strict = True # Enforce tool args schema is respected
)
ai_msg = model_with_tools.invoke(
"Which city is hotter today and which is bigger: LA or NY?"
)
ai_msg.tool_calls
[
{
"name": "GetWeather",
"args": {"location": "Los Angeles, CA"},
"id": "chatcmpl-tool-59632abcee8f48a18a5f3a81422b917b",
"type": "tool_call",
},
{
"name": "GetWeather",
"args": {"location": "New York, NY"},
"id": "chatcmpl-tool-c6f3b033b4594918bb53f656525b0979",
"type": "tool_call",
},
{
"name": "GetPopulation",
"args": {"location": "Los Angeles, CA"},
"id": "chatcmpl-tool-175a23281e4747ea81cbe472b8e47012",
"type": "tool_call",
},
{
"name": "GetPopulation",
"args": {"location": "New York, NY"},
"id": "chatcmpl-tool-e1ccc534835945aebab708eb5e685bf7",
"type": "tool_call",
},
]
推理输出
from langchain_ibm import ChatWatsonx
from ibm_watsonx_ai.foundation_models.schema import TextChatParameters
parameters = TextChatParameters(
include_reasoning=True, reasoning_effort="medium"
)
model = ChatWatsonx(
model_id="openai/gpt-oss-120b",
url="https://us-south.ml.cloud.ibm.com",
project_id="*****",
params=parameters,
# api_key="*****"
)
response = model.invoke("What is 3^3?")
# Response text
print(f"Output: {response.content}")
# Reasoning summaries
print(f"Reasoning: {response.additional_kwargs['reasoning_content']}")
0.3.19 版本新增:更新了 AIMessage 格式
langchain-ibm >= 0.3.19 允许用户设置推理输出参数,并将推理摘要的输出格式化到 additional_kwargs 字段中。
结构化输出
from pydantic import BaseModel, Field
class Joke(BaseModel):
'''Joke to tell user.'''
setup: str = Field(description="The setup of the joke")
punchline: str = Field(description="The punchline to the joke")
rating: int | None = Field(description="How funny the joke is, 1 to 10")
structured_model = model.with_structured_output(Joke)
structured_model.invoke("Tell me a joke about cats")
Joke(
setup="Why was the cat sitting on the computer?",
punchline="To keep an eye on the mouse!",
rating=None,
)
更多信息请参见 with_structured_output。
JSON 模式
图像输入
import base64
import httpx
from langchain.messages import HumanMessage
image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
image_data = base64.b64encode(httpx.get(image_url).content).decode("utf-8")
message = HumanMessage(
content=[
{"type": "text", "text": "describe the weather in this image"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
},
]
)
ai_msg = model.invoke([message])
ai_msg.content
令牌使用情况
Logprobs
logprobs_model = model.bind(logprobs=True)
ai_msg = logprobs_model.invoke(messages)
ai_msg.response_metadata["logprobs"]
{
'content': [
{
'token': 'J',
'logprob': -0.0017940393
},
{
'token': "'",
'logprob': -1.7523613e-05
},
{
'token': 'ad',
'logprob': -0.16112353
},
{
'token': 'ore',
'logprob': -0.0003091811
},
{
'token': ' programmer',
'logprob': -0.24849245
},
{
'token': '.',
'logprob': -2.5033638e-05
},
{
'token': '<|end_of_text|>',
'logprob': -7.080781e-05
}
]
}
响应元数据
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
__init__ |
|
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
set_verbose |
如果 verbose 是 |
generate_prompt |
将一系列提示传递给模型并返回模型生成的内容。 |
agenerate_prompt |
异步地将一系列提示传递并返回模型生成的内容。 |
get_token_ids |
返回文本中 token 的有序 ID。 |
get_num_tokens |
获取文本中存在的 token 数量。 |
get_num_tokens_from_messages |
获取消息中的 token 数量。 |
generate |
将一系列提示传递给模型并返回模型生成的内容。 |
agenerate |
异步地将一系列提示传递给模型并返回生成的内容。 |
dict |
返回 LLM 的字典。 |
is_lc_serializable |
是 lc 可序列化的。 |
validate_environment |
验证凭据和 python 包是否存在于环境中。 |
bind_tools |
将类工具(tool-like)对象绑定到此聊天模型。 |
with_structured_output |
返回与给定模式匹配的格式化输出的模型包装器。 |
cache 类属性 实例属性 ¶
是否缓存响应。
- 如果为
True,将使用全局缓存。 - 如果为
False,将不使用缓存 - 如果为
None,如果设置了全局缓存,则使用全局缓存,否则不使用缓存。 - 如果是
BaseCache的实例,将使用提供的缓存。
目前不支持模型的流式方法的缓存。
verbose 类属性 实例属性 ¶
是否打印响应文本。
metadata 类属性 实例属性 ¶
添加到运行跟踪中的元数据。
custom_get_token_ids 类属性 实例属性 ¶
用于计算 token 的可选编码器。
rate_limiter 类属性 实例属性 ¶
rate_limiter: BaseRateLimiter | None = Field(default=None, exclude=True)
一个可选的速率限制器,用于限制请求数量。
disable_streaming 类属性 实例属性 ¶
是否为此模型禁用流式传输。
如果绕过流式传输,则 stream/astream/astream_events 将转而调用 invoke/ainvoke。
- 如果为
True,将始终绕过流式传输情况。 - 如果为
'tool_calling',仅当模型被调用时带有tools关键字参数时,才会绕过流式传输情况。换句话说,仅当提供 tools 参数时,LangChain 才会自动切换到非流式行为 (invoke)。这提供了两全其美的方案。 - 如果为
False(默认值),则在可用时始终使用流式传输情况。
此标志的主要原因是,代码可能是使用 stream 编写的,而用户可能希望将给定模型换成另一个实现不完全支持流式传输的模型。
output_version 类属性 实例属性 ¶
要存储在消息内容中的 AIMessage 输出格式的版本。
AIMessage.content_blocks 将懒解析 content 的内容为标准格式。此标志可用于额外将标准格式存储在消息内容中,例如,用于序列化目的。
支持的值
'v0':内容中特定于提供商的格式(可以使用content_blocks进行懒解析)'v1':内容中的标准化格式(与content_blocks一致)
合作伙伴包(例如 langchain-openai)也可以使用此字段以向后兼容的方式推出新的内容格式。
在版本 1.0 中添加
profile 属性 ¶
返回模型的性能分析信息。
此属性将依赖于 langchain-model-profiles 包来检索聊天模型的能力,例如上下文窗口大小和支持的功能。
| 引发 | 描述 |
|---|---|
ImportError
|
如果未安装 |
| 返回 | 描述 |
|---|---|
ModelProfile
|
一个 |
model 类属性 实例属性 ¶
model: str | None = None
要使用的基础模型的名称或别名。当使用 IBM 的 watsonx.ai 模型网关(公开预览版)时,您可以通过一个单一的、与 OpenAI 兼容的接口指定任何受支持的第三方模型——OpenAI、Anthropic、NVIDIA、Cerebras 或 IBM 自己的 Granite 系列。模型必须通过网关明确配置(选择加入),以确保安全、与供应商无关的访问和无需重新配置的轻松切换。
有关配置和使用的更多详细信息,请参见 IBM watsonx Model Gateway 文档
url 类属性 实例属性 ¶
Watson Machine Learning 或 CPD 实例的 URL。
api_key 类属性 实例属性 ¶
api_key: SecretStr | None = Field(
default_factory=secret_from_env_multi(
names_priority=["WATSONX_API_KEY", "WATSONX_APIKEY"],
deprecated={"WATSONX_APIKEY"},
),
serialization_alias="api_key",
validation_alias=AliasChoices("api_key", "apikey"),
description="API key to the Watson Machine Learning or CPD instance.",
)
Watson Machine Learning 或 CPD 实例的 API 密钥。
token 类属性 实例属性 ¶
token: SecretStr | None = Field(
alias="token", default_factory=secret_from_env("WATSONX_TOKEN", default=None)
)
CPD 实例的令牌。
password 类属性 实例属性 ¶
password: SecretStr | None = Field(
alias="password", default_factory=secret_from_env("WATSONX_PASSWORD", default=None)
)
CPD 实例的密码。
username 类属性 实例属性 ¶
username: SecretStr | None = Field(
alias="username", default_factory=secret_from_env("WATSONX_USERNAME", default=None)
)
CPD 实例的用户名。
instance_id 类属性 实例属性 ¶
instance_id: SecretStr | None = Field(
alias="instance_id",
default_factory=secret_from_env("WATSONX_INSTANCE_ID", default=None),
)
CPD 实例的 instance_id。
params 类属性 实例属性 ¶
params: dict | TextChatParameters | None = None
在请求生成期间要使用的模型参数。
注意
如果在 params 属性和关键字参数中提供了相同的聊天生成参数,则会引发 ValueError。
frequency_penalty 类属性 实例属性 ¶
frequency_penalty: float | None = None
正值会根据新词元在文本中已出现的频率对其进行惩罚,从而降低模型逐字重复同一行的可能性。
top_logprobs 类属性 实例属性 ¶
top_logprobs: int | None = None
一个整数,指定在每个词元位置返回的最可能词元的数量,每个词元都带有关联的对数概率。如果使用此参数,则必须将 logprobs 选项设置为 true。
max_tokens 类属性 实例属性 ¶
max_tokens: int | None = None
在聊天补全中可以生成的最大词元数。输入词元和生成词元的总长度受模型上下文长度的限制。此值现已弃用,建议使用“max_completion_tokens”参数。
max_completion_tokens 类属性 实例属性 ¶
max_completion_tokens: int | None = None
在聊天补全中可以生成的最大词元数。输入词元和生成词元的总长度受模型上下文长度的限制。
presence_penalty 类属性 实例属性 ¶
presence_penalty: float | None = None
正值会根据新词元是否已出现在文本中对其进行惩罚,从而增加模型谈论新话题的可能性。
temperature 类属性 实例属性 ¶
temperature: float | None = None
要使用的采样温度。较高的值(如 0.8)会使输出更随机,而较低的值(如 0.2)会使其更具重点和确定性。
我们通常建议更改此参数或 top_p,但不要同时更改两者。
top_p 类属性 实例属性 ¶
top_p: float | None = None
一种替代温度采样的核采样方法,模型会考虑具有 top_p 概率质量的词元的结果。因此,0.1 意味着只考虑构成前 10% 概率质量的词元。
我们通常建议更改此参数或 temperature,但不要同时更改两者。
verify 类属性 实例属性 ¶
您可以将以下之一作为 verify 传递:* CA_BUNDLE 文件的路径 * 包含受信任 CA 证书的目录路径 * True - 将采用默认的信任库路径 * False - 不进行验证
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AIMessage
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke 异步 ¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AIMessage
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch 异步 ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed 异步 ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> Iterator[AIMessageChunk]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream 异步 ¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[AIMessageChunk]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log 异步 ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events 异步 ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform 异步 ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
get_lc_namespace 类方法 ¶
lc_id 类方法 ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose ¶
generate_prompt ¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> LLMResult
将一系列提示传递给模型并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate_prompt 异步 ¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
get_token_ids ¶
get_num_tokens ¶
get_num_tokens_from_messages ¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
获取消息中的 token 数量。
用于检查输入是否适合模型的上下文窗口。
注意
get_num_tokens_from_messages 的基本实现忽略了工具模式。
| 参数 | 描述 |
|---|---|
messages
|
要进行分词的消息输入。
类型: |
工具
|
如果提供,则为要转换为工具模式的 dict、
类型: |
| 返回 | 描述 |
|---|---|
int
|
所有消息中词元数量的总和。 |
generate ¶
generate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any,
) -> LLMResult
将一系列提示传递给模型并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
messages
|
消息列表的列表。
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要应用的标签。 |
metadata
|
要应用的元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate 异步 ¶
agenerate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递给模型并返回生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
messages
|
消息列表的列表。
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要应用的标签。 |
metadata
|
要应用的元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
bind_tools ¶
bind_tools(
tools: Sequence[dict[str, Any] | type | Callable | BaseTool],
*,
tool_choice: dict | str | bool | None = None,
strict: bool | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]
将类工具(tool-like)对象绑定到此聊天模型。
| 参数 | 描述 |
|---|---|
工具
|
要绑定到此聊天模型的工具定义列表。可以是字典、pydantic 模型、可调用对象或 BaseTool。Pydantic 模型、可调用对象和 BaseTool 将自动转换为其模式字典表示形式。 |
工具选择
|
要求模型调用的工具。选项包括
|
strict
|
如果为
类型: |
kwargs
|
任何其他参数都直接传递给
类型: |
with_structured_output ¶
with_structured_output(
schema: dict | type | None = None,
*,
method: Literal[
"function_calling", "json_mode", "json_schema"
] = "function_calling",
include_raw: bool = False,
strict: bool | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, dict | BaseModel]
返回与给定模式匹配的格式化输出的模型包装器。
| 参数 | 描述 |
|---|---|
模式
|
输出模式。可以作为以下形式传入:
如果 |
method
|
用于引导模型生成的方法,可选值之一:
类型: |
包含原始数据
|
如果为 False,则仅返回解析后的结构化输出。如果在模型输出解析过程中发生错误,则会引发该错误。如果为 True,则会返回原始模型响应(BaseMessage)和解析后的模型响应。如果在输出解析过程中发生错误,也会捕获并返回该错误。最终输出始终是一个包含键
类型: |
strict
|
类型: |
kwargs
|
其他关键字参数
类型: |
| 返回 | 描述 |
|---|---|
Runnable[LanguageModelInput, dict | BaseModel]
|
一个 Runnable,其输入与 |
Runnable[LanguageModelInput, dict | BaseModel]
|
如果 |
Runnable[LanguageModelInput, dict | BaseModel]
|
|
Runnable[LanguageModelInput, dict | BaseModel]
|
|
Runnable[LanguageModelInput, dict | BaseModel]
|
|
示例:schema=Pydantic 类,method='function_calling',include_raw=True
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatWatsonx(...)
structured_model = model.with_structured_output(
AnswerWithJustification, include_raw=True
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
{
"raw": AIMessage(
content="",
additional_kwargs={
"tool_calls": [
{
"id": "call_Ao02pnFYXD6GN1yzc0uXPsvF",
"function": {
"arguments": '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}',
"name": "AnswerWithJustification",
},
"type": "function",
}
]
},
),
"parsed": AnswerWithJustification(
answer="They weigh the same.",
justification="Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.",
),
"parsing_error": None,
}
示例:schema=JSON 模式,method='function_calling',include_raw=False
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
dict_schema = convert_to_openai_tool(AnswerWithJustification)
model = ChatWatsonx(...)
structured_model = model.with_structured_output(dict_schema)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
示例:schema=Pydantic 类,method='json_schema',include_raw=True
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatWatsonx(...)
structured_model = model.with_structured_output(
AnswerWithJustification, method="json_schema", include_raw=True
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
{
"raw": AIMessage(
content="",
additional_kwargs={
"tool_calls": [
{
"id": "chatcmpl-tool-bfbd6f6dd33b438990c5ddf277485971",
"type": "function",
"function": {
"name": "AnswerWithJustification",
"arguments": '{"answer": "They weigh the same", "justification": "A pound is a unit of weight or mass, so both a pound of bricks and a pound of feathers weigh the same amount, one pound."}',
},
}
]
},
response_metadata={
"token_usage": {
"completion_tokens": 45,
"prompt_tokens": 275,
"total_tokens": 320,
},
"model_name": "meta-llama/llama-3-3-70b-instruct",
"system_fingerprint": "",
"finish_reason": "stop",
},
id="chatcmpl-461ca5bd-1982-412c-b886-017c483bf481---8c18b06eead65ae4691364798787bda7---71896588-efa5-439f-a25f-d1abfe289f5a",
tool_calls=[
{
"name": "AnswerWithJustification",
"args": {
"answer": "They weigh the same",
"justification": "A pound is a unit of weight or mass, so both a pound of bricks and a pound of feathers weigh the same amount, one pound.",
},
"id": "chatcmpl-tool-bfbd6f6dd33b438990c5ddf277485971",
"type": "tool_call",
}
],
usage_metadata={
"input_tokens": 275,
"output_tokens": 45,
"total_tokens": 320,
},
),
"parsed": AnswerWithJustification(
answer="They weigh the same",
justification="A pound is a unit of weight or mass, so both a pound of bricks and a pound of feathers weigh the same amount, one pound.",
),
"parsing_error": None,
}
示例:schema=function 模式,method='json_schema',include_raw=False
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
function__schema = {
"name": "AnswerWithJustification",
"description": "An answer to the user question along with justification for the answer.",
"parameters": {
"type": "object",
"properties": {
"answer": {"type": "string"},
"justification": {
"description": "A justification for the answer.",
"type": "string",
},
},
"required": ["answer"],
},
}
model = ChatWatsonx(...)
structured_model = model.with_structured_output(
function_schema, method="json_schema", include_raw=False
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
示例:schema=Pydantic 类,method='json_mode',include_raw=True
from langchain_ibm import ChatWatsonx
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
answer: str
justification: str
model = ChatWatsonx(...)
structured_model = model.with_structured_output(
AnswerWithJustification, method="json_mode", include_raw=True
)
structured_model.invoke(
"Answer the following question. "
"Make sure to return a JSON blob with keys 'answer' and 'justification'.\n\n"
"What's heavier a pound of bricks or a pound of feathers?"
)
{
"raw": AIMessage(
content='{\n "answer": "They are both the same weight.",\n "justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight." \n}'
),
"parsed": AnswerWithJustification(
answer="They are both the same weight.",
justification="Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight.",
),
"parsing_error": None,
}
示例:schema=None,method='json_mode',include_raw=True
from langchain_ibm import ChatWatsonx
model = ChatWatsonx(...)
structured_model = model.with_structured_output(
method="json_mode", include_raw=True
)
structured_model.invoke(
"Answer the following question. "
"Make sure to return a JSON blob with keys 'answer' and 'justification'.\n\n"
"What's heavier a pound of bricks or a pound of feathers?"
)
{
"raw": AIMessage(
content='{\n "answer": "They are both the same weight.",\n "justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight." \n}'
),
"parsed": {
"answer": "They are both the same weight.",
"justification": "Both a pound of bricks and a pound of feathers weigh one pound. The difference lies in the volume and density of the materials, not the weight.",
},
"parsing_error": None,
}
WatsonxEmbeddings ¶
基类:BaseModel, Embeddings
IBM watsonx.ai 嵌入模型集成。
设置
要使用此功能,您应该安装 langchain_ibm python 包,并将环境变量 WATSONX_API_KEY 设置为您的 API 密钥,或者将其作为名为 api_key 的参数传递给构造函数。
已弃用
apikey 和 WATSONX_APIKEY 已弃用,并将在 2.0.0 版本中移除。请改用 api_key 和 WATSONX_API_KEY。
实例化
嵌入单个文本
嵌入多个文本
异步
| 方法 | 描述 |
|---|---|
validate_environment |
验证凭据和 python 包是否存在于环境中。 |
embed_documents |
嵌入搜索文档。 |
aembed_documents |
异步嵌入搜索文档。 |
embed_query |
嵌入查询文本。 |
aembed_query |
异步嵌入查询文本。 |
model 类属性 实例属性 ¶
model: str | None = None
要使用的基础模型的名称或别名。当使用 IBM 的 watsonx.ai 模型网关(公开预览版)时,您可以通过一个单一的、与 OpenAI 兼容的接口指定任何受支持的第三方模型——OpenAI、Anthropic、NVIDIA、Cerebras 或 IBM 自己的 Granite 系列。模型必须通过网关明确配置(选择加入),以确保安全、与供应商无关的访问和无需重新配置的轻松切换。
有关配置和使用的更多详细信息,请参见 IBM watsonx Model Gateway 文档
url 类属性 实例属性 ¶
Watson Machine Learning 或 CPD 实例的 URL。
api_key 类属性 实例属性 ¶
api_key: SecretStr | None = Field(
default_factory=secret_from_env_multi(
names_priority=["WATSONX_API_KEY", "WATSONX_APIKEY"],
deprecated={"WATSONX_APIKEY"},
),
serialization_alias="api_key",
validation_alias=AliasChoices("api_key", "apikey"),
description="API key to the Watson Machine Learning or CPD instance.",
)
Watson Machine Learning 或 CPD 实例的 API 密钥。
token 类属性 实例属性 ¶
token: SecretStr | None = Field(
alias="token", default_factory=secret_from_env("WATSONX_TOKEN", default=None)
)
CPD 实例的令牌。
password 类属性 实例属性 ¶
password: SecretStr | None = Field(
alias="password", default_factory=secret_from_env("WATSONX_PASSWORD", default=None)
)
CPD 实例的密码。
username 类属性 实例属性 ¶
username: SecretStr | None = Field(
alias="username", default_factory=secret_from_env("WATSONX_USERNAME", default=None)
)
CPD 实例的用户名。
instance_id 类属性 实例属性 ¶
instance_id: SecretStr | None = Field(
alias="instance_id",
default_factory=secret_from_env("WATSONX_INSTANCE_ID", default=None),
)
CPD 实例的 instance_id。
verify 类属性 实例属性 ¶
您可以将以下之一作为 verify 传递:* CA_BUNDLE 文件的路径 * 包含受信任 CA 证书的目录路径 * True - 将采用默认的信任库路径 * False - 不进行验证
aembed_documents 异步 ¶
异步嵌入搜索文档。
WatsonxLLM ¶
基类: BaseLLM
IBM watsonx.ai 大语言模型类。
设置
要使用大语言模型,您需要安装 langchain_ibm python 包,并将环境变量 WATSONX_API_KEY 设置为您的 API 密钥,或者将其作为名为 api_key 的参数传递给构造函数。
已弃用
apikey 和 WATSONX_APIKEY 已弃用,并将在 2.0.0 版本中移除。请改用 api_key 和 WATSONX_API_KEY。
实例化
from langchain_ibm import WatsonxLLM
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames
parameters = {
GenTextParamsMetaNames.DECODING_METHOD: "sample",
GenTextParamsMetaNames.MAX_NEW_TOKENS: 100,
GenTextParamsMetaNames.MIN_NEW_TOKENS: 1,
GenTextParamsMetaNames.TEMPERATURE: 0.5,
GenTextParamsMetaNames.TOP_K: 50,
GenTextParamsMetaNames.TOP_P: 1,
}
model = WatsonxLLM(
model_id="google/flan-t5-xl",
url="https://us-south.ml.cloud.ibm.com",
project_id="*****",
params=parameters,
# api_key="*****"
)
调用
流
异步
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
__init__ |
|
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
set_verbose |
如果 verbose 是 |
generate_prompt |
将一系列提示传递给模型并返回模型生成的内容。 |
agenerate_prompt |
异步地将一系列提示传递并返回模型生成的内容。 |
with_structured_output |
此类未实现。 |
get_num_tokens_from_messages |
获取消息中的 token 数量。 |
generate |
向模型传递一系列提示并返回生成结果。 |
agenerate |
异步地将一系列提示传递给模型并返回生成的内容。 |
__str__ |
返回对象的字符串表示形式以供打印。 |
dict |
返回 LLM 的字典。 |
save |
保存 LLM。 |
is_lc_serializable |
是 lc 可序列化的。 |
validate_environment |
验证凭据和 python 包是否存在于环境中。 |
get_num_tokens |
获取词元数量。 |
get_token_ids |
获取词元 ID。 |
cache 类属性 实例属性 ¶
是否缓存响应。
- 如果为
True,将使用全局缓存。 - 如果为
False,将不使用缓存 - 如果为
None,如果设置了全局缓存,则使用全局缓存,否则不使用缓存。 - 如果是
BaseCache的实例,将使用提供的缓存。
目前不支持模型的流式方法的缓存。
verbose 类属性 实例属性 ¶
是否打印响应文本。
metadata 类属性 实例属性 ¶
添加到运行跟踪中的元数据。
custom_get_token_ids 类属性 实例属性 ¶
用于计算 token 的可选编码器。
model 类属性 实例属性 ¶
model: str | None = None
要使用的基础模型的名称或别名。当使用 IBM 的 watsonx.ai 模型网关(公开预览版)时,您可以通过一个单一的、与 OpenAI 兼容的接口指定任何受支持的第三方模型——OpenAI、Anthropic、NVIDIA、Cerebras 或 IBM 自己的 Granite 系列。模型必须通过网关明确配置(选择加入),以确保安全、与供应商无关的访问和无需重新配置的轻松切换。
有关配置和使用的更多详细信息,请参见 IBM watsonx Model Gateway 文档
url 类属性 实例属性 ¶
Watson Machine Learning 或 CPD 实例的 URL。
api_key 类属性 实例属性 ¶
api_key: SecretStr | None = Field(
default_factory=secret_from_env_multi(
names_priority=["WATSONX_API_KEY", "WATSONX_APIKEY"],
deprecated={"WATSONX_APIKEY"},
),
serialization_alias="api_key",
validation_alias=AliasChoices("api_key", "apikey"),
description="API key to the Watson Machine Learning or CPD instance.",
)
Watson Machine Learning 或 CPD 实例的 API 密钥。
token 类属性 实例属性 ¶
token: SecretStr | None = Field(
alias="token", default_factory=secret_from_env("WATSONX_TOKEN", default=None)
)
CPD 实例的令牌。
password 类属性 实例属性 ¶
password: SecretStr | None = Field(
alias="password", default_factory=secret_from_env("WATSONX_PASSWORD", default=None)
)
CPD 实例的密码。
username 类属性 实例属性 ¶
username: SecretStr | None = Field(
alias="username", default_factory=secret_from_env("WATSONX_USERNAME", default=None)
)
CPD 实例的用户名。
instance_id class-attribute instance-attribute ¶
instance_id: SecretStr | None = Field(
alias="instance_id",
default_factory=secret_from_env("WATSONX_INSTANCE_ID", default=None),
)
CPD 实例的 instance_id。
verify class-attribute instance-attribute ¶
您可以将以下之一作为 verify 传递:* CA_BUNDLE 文件的路径 * 包含受信任 CA 证书的目录路径 * True - 将采用默认的信任库路径 * False - 不进行验证
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> str
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> str
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[LanguageModelInput],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any,
) -> list[str]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[LanguageModelInput],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any,
) -> list[str]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> Iterator[str]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[str]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose ¶
generate_prompt ¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
**kwargs: Any,
) -> LLMResult
将一系列提示传递给模型并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate_prompt async ¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
with_structured_output ¶
with_structured_output(
schema: dict | type, **kwargs: Any
) -> Runnable[LanguageModelInput, dict | BaseModel]
此类未实现。
get_num_tokens_from_messages ¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
获取消息中的 token 数量。
用于检查输入是否适合模型的上下文窗口。
注意
get_num_tokens_from_messages 的基本实现忽略了工具模式。
| 参数 | 描述 |
|---|---|
messages
|
要进行分词的消息输入。
类型: |
工具
|
如果提供,则为要转换为工具模式的 dict、
类型: |
| 返回 | 描述 |
|---|---|
int
|
所有消息中词元数量的总和。 |
generate ¶
generate(
prompts: list[str],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
*,
tags: list[str] | list[list[str]] | None = None,
metadata: dict[str, Any] | list[dict[str, Any]] | None = None,
run_name: str | list[str] | None = None,
run_id: UUID | list[UUID | None] | None = None,
**kwargs: Any,
) -> LLMResult
向模型传递一系列提示并返回生成结果。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
字符串提示列表。 |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要与每个提示关联的标签列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
metadata
|
要与每个提示关联的元数据字典列表。如果提供,列表的长度必须与提示列表的长度匹配。
TYPE: |
运行名称
|
要与每个提示关联的运行名称列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
run_id
|
要与每个提示关联的运行 ID 列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果提示不是列表。 |
ValueError
|
如果 `callbacks`、`tags`、`metadata` 或 `run_name`(如果提供)的长度与提示的长度不匹配。 |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate async ¶
agenerate(
prompts: list[str],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
*,
tags: list[str] | list[list[str]] | None = None,
metadata: dict[str, Any] | list[dict[str, Any]] | None = None,
run_name: str | list[str] | None = None,
run_id: UUID | list[UUID | None] | None = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递给模型并返回生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
字符串提示列表。 |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要与每个提示关联的标签列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
metadata
|
要与每个提示关联的元数据字典列表。如果提供,列表的长度必须与提示列表的长度匹配。
TYPE: |
运行名称
|
要与每个提示关联的运行名称列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
run_id
|
要与每个提示关联的运行 ID 列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 `callbacks`、`tags`、`metadata` 或 `run_name`(如果提供)的长度与提示的长度不匹配。 |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
save ¶
WatsonxRerank ¶
基类:BaseDocumentCompressor
使用 watsonx Rerank API 的文档压缩器。
设置
要使用此功能,您应该安装 langchain_ibm python 包,并将环境变量 WATSONX_API_KEY 设置为您的 API 密钥,或者将其作为名为 api_key 的参数传递给构造函数。
已弃用
apikey 和 WATSONX_APIKEY 已弃用,并将在 2.0.0 版本中移除。请改用 api_key 和 WATSONX_API_KEY。
实例化
from langchain_ibm import WatsonxRerank
from ibm_watsonx_ai.foundation_models.schema import RerankParameters
parameters = RerankParameters(truncate_input_tokens=20)
ranker = WatsonxRerank(
model_id="cross-encoder/ms-marco-minilm-l-12-v2",
url="https://us-south.ml.cloud.ibm.com",
project_id="*****",
params=parameters,
# api_key="*****"
)
重新排序
query = "red cat chasing a laser pointer"
documents = [
"A red cat darts across the living room, pouncing on a red laser dot.",
"Two dogs play fetch in the park with a tennis ball.",
"The tabby cat naps on a sunny windowsill all afternoon.",
"A recipe for tuna casserole with crispy breadcrumbs.",
]
ranker.rerank(documents=documents, query=query)
| 方法 | 描述 |
|---|---|
acompress_documents |
异步地根据查询上下文压缩检索到的文档。 |
validate_environment |
验证凭据和 python 包是否存在于环境中。 |
重新排序 |
重新排序文档。 |
compress_documents |
使用 watsonx 的 rerank API 压缩文档。 |
url class-attribute instance-attribute ¶
Watson Machine Learning 或 CPD 实例的 URL。
api_key class-attribute instance-attribute ¶
api_key: SecretStr | None = Field(
default_factory=secret_from_env_multi(
names_priority=["WATSONX_API_KEY", "WATSONX_APIKEY"],
deprecated={"WATSONX_APIKEY"},
),
serialization_alias="api_key",
validation_alias=AliasChoices("api_key", "apikey"),
description="API key to the Watson Machine Learning or CPD instance.",
)
Watson Machine Learning 或 CPD 实例的 API 密钥。
token class-attribute instance-attribute ¶
token: SecretStr | None = Field(
alias="token", default_factory=secret_from_env("WATSONX_TOKEN", default=None)
)
CPD 实例的令牌。
password class-attribute instance-attribute ¶
password: SecretStr | None = Field(
alias="password", default_factory=secret_from_env("WATSONX_PASSWORD", default=None)
)
CPD 实例的密码。
username class-attribute instance-attribute ¶
username: SecretStr | None = Field(
alias="username", default_factory=secret_from_env("WATSONX_USERNAME", default=None)
)
CPD 实例的用户名。
instance_id class-attribute instance-attribute ¶
instance_id: SecretStr | None = Field(
alias="instance_id",
default_factory=secret_from_env("WATSONX_INSTANCE_ID", default=None),
)
CPD 实例的 instance_id。
params class-attribute instance-attribute ¶
params: dict | RerankParameters | None = None
在请求生成期间要使用的模型参数。
verify class-attribute instance-attribute ¶
您可以将以下之一作为 verify 传递:* CA_BUNDLE 文件的路径 * 包含受信任 CA 证书的目录路径 * True - 将采用默认的信任库路径 * False - 不进行验证
acompress_documents async ¶
rerank ¶
rerank(
documents: Sequence[str | Document | dict], query: str, **kwargs: Any
) -> list[dict[str, Any]]
重新排序文档。