langchain-google-genai¶
注意
此软件包参考尚未完全迁移到 v1。
langchain_google_genai ¶
LangChain Google Generative AI 集成.
该模块将 Google 的生成式 AI 模型(特别是 Gemini 系列)与 LangChain 框架集成。它提供了与聊天模型交互和生成嵌入的类,利用了 Google 先进的 AI 功能。
聊天模型
ChatGoogleGenerativeAI 类是与 Google Gemini 聊天模型交互的主要接口。它允许用户使用指定的 Gemini 模型发送和接收消息,适用于各种对话式 AI 应用。
大语言模型(LLM)
GoogleGenerativeAI 类是与 Google Gemini LLM 交互的主要接口。它允许用户使用指定的 Gemini 模型生成文本。
嵌入
GoogleGenerativeAIEmbeddings 类提供了使用 Google 模型生成嵌入的功能。这些嵌入可用于一系列 NLP 任务,包括语义分析、相似性比较等。
安装
要安装该软件包,请使用 pip
.. code-block:: python pip install -U langchain-google-genai
使用聊天模型
在使用所需的 API 密钥设置好您的环境后,您就可以与 Google Gemini 模型进行交互了。
.. code-block:: python
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-2.5-pro")
llm.invoke("Sing a ballad of LangChain.")
使用大语言模型(LLM)
该软件包还支持使用 Google 的模型生成文本。
.. code-block:: python
from langchain_google_genai import GoogleGenerativeAI
llm = GoogleGenerativeAI(model="gemini-2.5-pro")
llm.invoke("Once upon a time, a library called LangChain")
嵌入生成
该软件包还支持使用 Google 的模型创建嵌入,这对于文本相似性和其他 NLP 应用非常有用。
.. code-block:: python
from langchain_google_genai import GoogleGenerativeAIEmbeddings
embeddings = GoogleGenerativeAIEmbeddings(model="models/gemini-embedding-001")
embeddings.embed_query("hello, world!")
ChatGoogleGenerativeAI ¶
基类:_BaseGoogleGenerativeAI, BaseChatModel
Google AI 聊天模型集成。
实例化
要使用,您必须拥有以下任一
1. The ``GOOGLE_API_KEY`` environment variable set with your API key, or
2. Pass your API key using the ``google_api_key`` kwarg to the
ChatGoogleGenerativeAI constructor.
.. code-block:: python
from langchain_google_genai import ChatGoogleGenerativeAI
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")
llm.invoke("Write me a ballad about LangChain")
调用
.. code-block:: python
messages = [
("system", "Translate the user sentence to French."),
("human", "I love programming."),
]
llm.invoke(messages)
.. code-block:: python
AIMessage(
content="J'adore programmer. \\n",
response_metadata={
"prompt_feedback": {"block_reason": 0, "safety_ratings": []},
"finish_reason": "STOP",
"safety_ratings": [
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HARASSMENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
],
},
id="run-56cecc34-2e54-4b52-a974-337e47008ad2-0",
usage_metadata={
"input_tokens": 18,
"output_tokens": 5,
"total_tokens": 23,
},
)
流
.. code-block:: python
for chunk in llm.stream(messages):
print(chunk)
.. code-block:: python
AIMessageChunk(
content="J",
response_metadata={"finish_reason": "STOP", "safety_ratings": []},
id="run-e905f4f4-58cb-4a10-a960-448a2bb649e3",
usage_metadata={
"input_tokens": 18,
"output_tokens": 1,
"total_tokens": 19,
},
)
AIMessageChunk(
content="'adore programmer. \\n",
response_metadata={
"finish_reason": "STOP",
"safety_ratings": [
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HARASSMENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
],
},
id="run-e905f4f4-58cb-4a10-a960-448a2bb649e3",
usage_metadata={
"input_tokens": 18,
"output_tokens": 5,
"total_tokens": 23,
},
)
.. code-block:: python
stream = llm.stream(messages)
full = next(stream)
for chunk in stream:
full += chunk
full
.. code-block:: python
AIMessageChunk(
content="J'adore programmer. \\n",
response_metadata={
"finish_reason": "STOPSTOP",
"safety_ratings": [
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HARASSMENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
],
},
id="run-3ce13a42-cd30-4ad7-a684-f1f0b37cdeec",
usage_metadata={
"input_tokens": 36,
"output_tokens": 6,
"total_tokens": 42,
},
)
异步
.. code-block:: python
await llm.ainvoke(messages)
# stream:
# async for chunk in (await llm.astream(messages))
# batch:
# await llm.abatch([messages])
上下文缓存
上下文缓存允许您存储和重用内容(例如 PDF、图像)以加快处理速度。cached_content 参数接受通过 Google Generative AI API 创建的缓存名称。以下是两个示例:直接缓存单个文件,以及使用 Part 缓存多个文件。
单个文件示例:缓存单个文件并查询它。
.. code-block:: python
from google import genai
from google.genai import types
import time
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
client = genai.Client()
# Upload file
file = client.files.upload(file="./example_file")
while file.state.name == "PROCESSING":
time.sleep(2)
file = client.files.get(name=file.name)
# Create cache
model = "models/gemini-2.5-flash"
cache = client.caches.create(
model=model,
config=types.CreateCachedContentConfig(
display_name="Cached Content",
system_instruction=(
"You are an expert content analyzer, and your job is to answer "
"the user's query based on the file you have access to."
),
contents=[file],
ttl="300s",
),
)
# Query with LangChain
llm = ChatGoogleGenerativeAI(
model=model,
cached_content=cache.name,
)
message = HumanMessage(content="Summarize the main points of the content.")
llm.invoke([message])
多个文件示例:使用 Part 缓存两个文件并一起查询它们。
.. code-block:: python
from google import genai
from google.genai.types import CreateCachedContentConfig, Content, Part
import time
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage
client = genai.Client()
# Upload files
file_1 = client.files.upload(file="./file1")
while file_1.state.name == "PROCESSING":
time.sleep(2)
file_1 = client.files.get(name=file_1.name)
file_2 = client.files.upload(file="./file2")
while file_2.state.name == "PROCESSING":
time.sleep(2)
file_2 = client.files.get(name=file_2.name)
# Create cache with multiple files
contents = [
Content(
role="user",
parts=[
Part.from_uri(file_uri=file_1.uri, mime_type=file_1.mime_type),
Part.from_uri(file_uri=file_2.uri, mime_type=file_2.mime_type),
],
)
]
model = "gemini-2.5-flash"
cache = client.caches.create(
model=model,
config=CreateCachedContentConfig(
display_name="Cached Contents",
system_instruction=(
"You are an expert content analyzer, and your job is to answer "
"the user's query based on the files you have access to."
),
contents=contents,
ttl="300s",
),
)
# Query with LangChain
llm = ChatGoogleGenerativeAI(
model=model,
cached_content=cache.name,
)
message = HumanMessage(
content="Provide a summary of the key information across both files."
)
llm.invoke([message])
工具调用
.. code-block:: python
from pydantic import BaseModel, Field
class GetWeather(BaseModel):
'''Get the current weather in a given location'''
location: str = Field(
..., description="The city and state, e.g. San Francisco, CA"
)
class GetPopulation(BaseModel):
'''Get the current population in a given location'''
location: str = Field(
..., description="The city and state, e.g. San Francisco, CA"
)
llm_with_tools = llm.bind_tools([GetWeather, GetPopulation])
ai_msg = llm_with_tools.invoke(
"Which city is hotter today and which is bigger: LA or NY?"
)
ai_msg.tool_calls
.. code-block:: python
[
{
"name": "GetWeather",
"args": {"location": "Los Angeles, CA"},
"id": "c186c99f-f137-4d52-947f-9e3deabba6f6",
},
{
"name": "GetWeather",
"args": {"location": "New York City, NY"},
"id": "cebd4a5d-e800-4fa5-babd-4aa286af4f31",
},
{
"name": "GetPopulation",
"args": {"location": "Los Angeles, CA"},
"id": "4f92d897-f5e4-4d34-a3bc-93062c92591e",
},
{
"name": "GetPopulation",
"args": {"location": "New York City, NY"},
"id": "634582de-5186-4e4b-968b-f192f0a93678",
},
]
将搜索与 Gemini 2 结合使用
.. code-block:: python
from google.ai.generativelanguage_v1beta.types import Tool as GenAITool
llm = ChatGoogleGenerativeAI(model="gemini-2.5-flash")
resp = llm.invoke(
"When is the next total solar eclipse in US?",
tools=[GenAITool(google_search={})],
)
结构化输出
.. code-block:: python
from typing import Optional
from pydantic import BaseModel, Field
class Joke(BaseModel):
'''Joke to tell user.'''
setup: str = Field(description="The setup of the joke")
punchline: str = Field(description="The punchline to the joke")
rating: Optional[int] = Field(
description="How funny the joke is, from 1 to 10"
)
# Default method uses function calling
structured_llm = llm.with_structured_output(Joke)
# For more reliable output, use json_schema with native responseSchema
structured_llm_json = llm.with_structured_output(Joke, method="json_schema")
structured_llm_json.invoke("Tell me a joke about cats")
.. code-block:: python
Joke(
setup="Why are cats so good at video games?",
punchline="They have nine lives on the internet",
rating=None,
)
支持两种方法用于结构化输出
method="function_calling"(默认):使用工具调用来提取结构化数据。与所有模型兼容。method="json_schema":使用 Gemini 的原生结构化输出与 responseSchema。更可靠,但需要 Gemini 1.5+ 模型。为向后兼容,method="json_mode"也可用,但这是一种用词不当。
推荐使用 json_schema 方法,因为它直接约束模型的生成过程,而不是依赖于后处理的工具调用,从而具有更好的可靠性。
图像输入
.. code-block:: python
import base64
import httpx
from langchain_core.messages import HumanMessage
image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
image_data = base64.b64encode(httpx.get(image_url).content).decode("utf-8")
message = HumanMessage(
content=[
{"type": "text", "text": "describe the weather in this image"},
{
"type": "image_url",
"image_url": {"url": f"data:image/jpeg;base64,{image_data}"},
},
]
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
"The weather in this image appears to be sunny and pleasant. The sky is a
bright blue with scattered white clouds, suggesting fair weather. The lush
green grass and trees indicate a warm and possibly slightly breezy day.
There are no signs of rain or storms."
PDF 输入
.. code-block:: python
import base64
from langchain_core.messages import HumanMessage
pdf_bytes = open("/path/to/your/test.pdf", "rb").read()
pdf_base64 = base64.b64encode(pdf_bytes).decode("utf-8")
message = HumanMessage(
content=[
{"type": "text", "text": "describe the document in a sentence"},
{
"type": "file",
"source_type": "base64",
"mime_type": "application/pdf",
"data": pdf_base64,
},
]
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
"This research paper describes a system developed for SemEval-2025 Task 9,
which aims to automate the detection of food hazards from recall reports,
addressing the class imbalance problem by leveraging LLM-based data
augmentation techniques and transformer-based models to improve
performance."
视频输入
.. code-block:: python
import base64
from langchain_core.messages import HumanMessage
video_bytes = open("/path/to/your/video.mp4", "rb").read()
video_base64 = base64.b64encode(video_bytes).decode("utf-8")
message = HumanMessage(
content=[
{
"type": "text",
"text": "describe what's in this video in a sentence",
},
{
"type": "file",
"source_type": "base64",
"mime_type": "video/mp4",
"data": video_base64,
},
]
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
"Tom and Jerry, along with a turkey, engage in a chaotic Thanksgiving-themed
adventure involving a corn-on-the-cob chase, maze antics, and a disastrous
attempt to prepare a turkey dinner."
您也可以直接传递 YouTube URL
.. code-block:: python
from langchain_core.messages import HumanMessage
message = HumanMessage(
content=[
{"type": "text", "text": "summarize the video in 3 sentences."},
{
"type": "media",
"file_uri": "https://www.youtube.com/watch?v=9hE5-98ZeCg",
"mime_type": "video/mp4",
},
]
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
"The video is a demo of multimodal live streaming in Gemini 2.0. The
narrator is sharing his screen in AI Studio and asks if the AI can see it.
The AI then reads text that is highlighted on the screen, defines the word
“multimodal,” and summarizes everything that was seen and heard."
音频输入
.. code-block:: python
import base64
from langchain_core.messages import HumanMessage
audio_bytes = open("/path/to/your/audio.mp3", "rb").read()
audio_base64 = base64.b64encode(audio_bytes).decode("utf-8")
message = HumanMessage(
content=[
{"type": "text", "text": "summarize this audio in a sentence"},
{
"type": "file",
"source_type": "base64",
"mime_type": "audio/mp3",
"data": audio_base64,
},
]
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
"In this episode of the Made by Google podcast, Stephen Johnson and Simon
Tokumine discuss NotebookLM, a tool designed to help users understand
complex material in various modalities, with a focus on its unexpected uses,
the development of audio overviews, and the implementation of new features
like mind maps and source discovery."
文件上传(基于 URI):您还可以将文件上传到 Google 的服务器,并通过 URI 引用它们。这适用于 PDF、图像、视频和音频文件。
.. code-block:: python
import time
from google import genai
from langchain_core.messages import HumanMessage
client = genai.Client()
myfile = client.files.upload(file="/path/to/your/sample.pdf")
while myfile.state.name == "PROCESSING":
time.sleep(2)
myfile = client.files.get(name=myfile.name)
message = HumanMessage(
content=[
{"type": "text", "text": "What is in the document?"},
{
"type": "media",
"file_uri": myfile.uri,
"mime_type": "application/pdf",
},
]
)
ai_msg = llm.invoke([message])
ai_msg.content
.. code-block:: python
"This research paper assesses and mitigates multi-turn jailbreak
vulnerabilities in large language models using the Crescendo attack study,
evaluating attack success rates and mitigation strategies like prompt
hardening and LLM-as-guardrail."
令牌使用情况
.. code-block:: python
ai_msg = llm.invoke(messages)
ai_msg.usage_metadata
.. code-block:: python
{"input_tokens": 18, "output_tokens": 5, "total_tokens": 23}
响应元数据 .. code-block:: python
ai_msg = llm.invoke(messages)
ai_msg.response_metadata
.. code-block:: python
{
"prompt_feedback": {"block_reason": 0, "safety_ratings": []},
"finish_reason": "STOP",
"safety_ratings": [
{
"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HATE_SPEECH",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_HARASSMENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
{
"category": "HARM_CATEGORY_DANGEROUS_CONTENT",
"probability": "NEGLIGIBLE",
"blocked": False,
},
],
}
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
set_verbose |
如果 verbose 是 |
generate_prompt |
将一系列提示传递给模型并返回模型生成的内容。 |
agenerate_prompt |
异步地将一系列提示传递并返回模型生成的内容。 |
get_token_ids |
返回文本中 token 的有序 ID。 |
get_num_tokens_from_messages |
获取消息中的 token 数量。 |
generate |
将一系列提示传递给模型并返回模型生成的内容。 |
agenerate |
异步地将一系列提示传递给模型并返回生成的内容。 |
dict |
返回 LLM 的字典。 |
__init__ |
用于参数验证。 |
is_lc_serializable |
这个类是否可序列化? |
build_extra |
根据传入的额外参数构建额外的关键字参数(kwargs)。 |
validate_environment |
验证参数并将其传递给 google-generativeai 包。 |
invoke |
重写 invoke 以添加 code_execution 参数。 |
get_num_tokens |
获取文本中存在的词元(token)数量。使用模型的词元分析器。 |
with_structured_output |
返回与给定模式匹配的格式化输出的模型包装器。 |
bind_tools |
将类工具(tool-like)对象绑定到此聊天模型。 |
cache 类属性 实例属性 ¶
是否缓存响应。
- 如果为
True,将使用全局缓存。 - 如果为
False,将不使用缓存 - 如果为
None,如果设置了全局缓存,则使用全局缓存,否则不使用缓存。 - 如果是
BaseCache的实例,将使用提供的缓存。
目前不支持模型的流式方法的缓存。
verbose 类属性 实例属性 ¶
是否打印响应文本。
metadata 类属性 实例属性 ¶
添加到运行跟踪中的元数据。
custom_get_token_ids 类属性 实例属性 ¶
用于计算 token 的可选编码器。
rate_limiter 类属性 实例属性 ¶
rate_limiter: BaseRateLimiter | None = Field(default=None, exclude=True)
一个可选的速率限制器,用于限制请求数量。
disable_streaming 类属性 实例属性 ¶
是否为此模型禁用流式传输。
如果绕过流式传输,则 stream/astream/astream_events 将转而调用 invoke/ainvoke。
- 如果为
True,将始终绕过流式传输情况。 - 如果为
'tool_calling',仅当模型被调用时带有tools关键字参数时,才会绕过流式传输情况。换句话说,仅当提供 tools 参数时,LangChain 才会自动切换到非流式行为 (invoke)。这提供了两全其美的方案。 - 如果为
False(默认值),则在可用时始终使用流式传输情况。
此标志的主要原因是,代码可能是使用 stream 编写的,而用户可能希望将给定模型换成另一个实现不完全支持流式传输的模型。
output_version 类属性 实例属性 ¶
要存储在消息内容中的 AIMessage 输出格式的版本。
AIMessage.content_blocks 将懒解析 content 的内容为标准格式。此标志可用于额外将标准格式存储在消息内容中,例如,用于序列化目的。
支持的值
'v0':内容中特定于提供商的格式(可以使用content_blocks进行懒解析)'v1':内容中的标准化格式(与content_blocks一致)
合作伙伴包(例如 langchain-openai)也可以使用此字段以向后兼容的方式推出新的内容格式。
在版本 1.0 中添加
profile 属性 ¶
返回模型的性能分析信息。
此属性将依赖于 langchain-model-profiles 包来检索聊天模型的能力,例如上下文窗口大小和支持的功能。
| 引发 | 描述 |
|---|---|
ImportError
|
如果未安装 |
| 返回 | 描述 |
|---|---|
ModelProfile
|
一个 |
model 类属性 实例属性 ¶
model: str = Field(
...,
description="The name of the model to use.\nExamples:\n - gemini-2.5-flash\n - models/text-bison-001",
)
要使用的模型名称。
google_api_key 类属性 实例属性 ¶
google_api_key: SecretStr | None = Field(
alias="api_key", default_factory=secret_from_env("GOOGLE_API_KEY", default=None)
)
Google AI API 密钥。如果未指定,将从环境变量 GOOGLE_API_KEY 读取。
max_output_tokens 类属性 实例属性 ¶
候选内容中包含的最大词元数。必须大于零。如果未设置,将使用模型的默认值,该值因模型而异。有关模型特定限制,请参阅 https://ai.google.dev/gemini-api/docs/models。
max_retries 类属性 实例属性 ¶
生成时进行的最大重试次数。如果未设置,将默认为 6。
safety_settings 类属性 实例属性 ¶
safety_settings: dict[HarmCategory, HarmBlockThreshold] | None = None
用于所有生成的默认安全设置。
例如
.. code-block:: python from google.generativeai.types.safety_types import HarmBlockThreshold, HarmCategory
safety_settings = {
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
}
convert_system_message_to_human 类属性 实例属性 ¶
convert_system_message_to_human: bool = False
是否将任何前导的 SystemMessage 合并到后续的 HumanMessage 中。
Gemini 不支持系统消息;任何不支持的消息都将引发错误。
response_mime_type 类属性 实例属性 ¶
response_mime_type: str | None = None
可选。生成的候选文本的输出响应 mimetype。仅在 Gemini 1.5 及更高版本的模型中支持。
支持的 mimetype
'text/plain':(默认)文本输出。'application/json':候选内容中的 JSON 响应。'text/x.enum':纯文本中的枚举。
模型还需要被提示输出适当的响应类型,否则行为是未定义的。这是一个预览功能。
response_schema 类属性 实例属性 ¶
可选。强制输出一个模式。字典的格式应遵循 Open API 模式。
cached_content 类属性 实例属性 ¶
cached_content: str | None = None
用作服务预测上下文的缓存内容的名称。
注意:仅在显式缓存中使用,用户可以控制缓存(例如,缓存什么内容)并享受有保障的成本节省。格式:cachedContents/{cachedContent}。
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
ainvoke 异步 ¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AIMessage
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch 异步 ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed 异步 ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> Iterator[AIMessageChunk]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream 异步 ¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[AIMessageChunk]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log 异步 ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events 异步 ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform 异步 ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
get_lc_namespace 类方法 ¶
lc_id 类方法 ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose ¶
generate_prompt ¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> LLMResult
将一系列提示传递给模型并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate_prompt 异步 ¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
get_token_ids ¶
get_num_tokens_from_messages ¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
获取消息中的 token 数量。
用于检查输入是否适合模型的上下文窗口。
注意
get_num_tokens_from_messages 的基本实现忽略了工具模式。
| 参数 | 描述 |
|---|---|
messages
|
要进行分词的消息输入。
类型: |
工具
|
如果提供,则为要转换为工具模式的 dict、
类型: |
| 返回 | 描述 |
|---|---|
int
|
所有消息中词元数量的总和。 |
generate ¶
generate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any,
) -> LLMResult
将一系列提示传递给模型并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
messages
|
消息列表的列表。
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要应用的标签。 |
metadata
|
要应用的元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate 异步 ¶
agenerate(
messages: list[list[BaseMessage]],
stop: list[str] | None = None,
callbacks: Callbacks = None,
*,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
run_name: str | None = None,
run_id: UUID | None = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递给模型并返回生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
messages
|
消息列表的列表。
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要应用的标签。 |
metadata
|
要应用的元数据。 |
运行名称
|
运行的名称。
类型: |
run_id
|
运行的 ID。
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
is_lc_serializable 类方法 ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
invoke ¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
code_execution: bool | None = None,
stop: list[str] | None = None,
**kwargs: Any,
) -> AIMessage
重写 invoke 以添加 code_execution 参数。
支持的模型:gemini-1.5-pro、gemini-1.5-flash、gemini-2.0-flash 和 gemini-2.0-pro。启用后,模型可以执行代码来解决问题。
get_num_tokens ¶
with_structured_output ¶
with_structured_output(
schema: dict | Type[BaseModel],
method: Literal["function_calling", "json_mode", "json_schema"]
| None = "function_calling",
*,
include_raw: bool = False,
**kwargs: Any,
) -> Runnable[LanguageModelInput, dict | BaseModel]
返回与给定模式匹配的格式化输出的模型包装器。
| 参数 | 描述 |
|---|---|
模式
|
输出模式。可以作为以下形式传入:
如果 有关在指定 Pydantic 或 |
包含原始数据
|
如果为 最终输出总是一个带有键
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果有任何不支持的 |
NotImplementedError
|
如果模型没有实现 |
| 返回 | 描述 |
|---|---|
Runnable[LanguageModelInput, Dict | BaseModel]
|
一个接受与 如果
|
示例:Pydantic 模式 (include_raw=False)
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(AnswerWithJustification)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> AnswerWithJustification(
# answer='They weigh the same',
# justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'
# )
示例:Pydantic 模式 (include_raw=True)
from pydantic import BaseModel
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(
AnswerWithJustification, include_raw=True
)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'raw': AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_Ao02pnFYXD6GN1yzc0uXPsvF', 'function': {'arguments': '{"answer":"They weigh the same.","justification":"Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ."}', 'name': 'AnswerWithJustification'}, 'type': 'function'}]}),
# 'parsed': AnswerWithJustification(answer='They weigh the same.', justification='Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume or density of the objects may differ.'),
# 'parsing_error': None
# }
示例:dict 模式 (include_raw=False)
from pydantic import BaseModel
from langchain_core.utils.function_calling import convert_to_openai_tool
class AnswerWithJustification(BaseModel):
'''An answer to the user question along with justification for the answer.'''
answer: str
justification: str
dict_schema = convert_to_openai_tool(AnswerWithJustification)
model = ChatModel(model="model-name", temperature=0)
structured_model = model.with_structured_output(dict_schema)
structured_model.invoke(
"What weighs more a pound of bricks or a pound of feathers"
)
# -> {
# 'answer': 'They weigh the same',
# 'justification': 'Both a pound of bricks and a pound of feathers weigh one pound. The weight is the same, but the volume and density of the two substances differ.'
# }
行为在 0.2.26 版本中已更改
添加了对 TypedDict 类的支持。
bind_tools ¶
bind_tools(
tools: Sequence[dict[str, Any] | type | Callable[..., Any] | BaseTool | Tool],
tool_config: dict | _ToolConfigDict | None = None,
*,
tool_choice: _ToolChoiceType | bool | None = None,
**kwargs: Any,
) -> Runnable[LanguageModelInput, AIMessage]
将类工具(tool-like)对象绑定到此聊天模型。
假设模型与 google-generativeAI 工具调用 API 兼容。
| 参数 | 描述 |
|---|---|
工具
|
要绑定到此聊天模型的一系列工具定义。可以是 Pydantic 模型、可调用对象或 BaseTool。Pydantic 模型、可调用对象和 BaseTool 将自动转换为其模式字典表示。现在支持在其参数中使用联合类型的工具,并将其转换为
类型: |
**kwargs
|
任何要传递给 :class:
类型: |
GoogleGenerativeAIEmbeddings ¶
基类:BaseModel, Embeddings
Google Generative AI 嵌入.
要使用,您必须拥有以下任一
1. The ``GOOGLE_API_KEY`` environment variable set with your API key, or
2. Pass your API key using the google_api_key kwarg to the
GoogleGenerativeAIEmbeddings constructor.
示例
.. code-block:: python
from langchain_google_genai import GoogleGenerativeAIEmbeddings
embeddings = GoogleGenerativeAIEmbeddings(model="gemini-embedding-001")
embeddings.embed_query("What's our Q1 revenue?")
| 方法 | 描述 |
|---|---|
validate_environment |
验证参数并将其传递给 google-generativeai 包。 |
embed_documents |
使用 |
embed_query |
使用 |
aembed_documents |
使用 |
aembed_query |
使用 |
embed_documents ¶
embed_documents(
texts: list[str],
*,
batch_size: int = _DEFAULT_BATCH_SIZE,
task_type: str | None = None,
titles: list[str] | None = None,
output_dimensionality: int | None = None,
) -> list[list[float]]
使用 批量端点 __ 嵌入字符串列表。
Google Generative AI 目前将最大批量大小设置为 100 个字符串。
| 参数 | 描述 |
|---|---|
texts
|
List[str] 要嵌入的字符串列表。 |
batch_size
|
[int] 要发送到模型的嵌入批量大小
类型: |
task_type
|
类型: |
titles
|
一个可选的标题列表,用于提供的文本。仅在 TaskType 为 |
output_dimensionality
|
可选的
TYPE: |
| 返回 | 描述 |
|---|---|
list[list[float]]
|
嵌入列表,每个文本一个。 |
embed_query ¶
aembed_documents 异步 ¶
aembed_documents(
texts: list[str],
*,
batch_size: int = _DEFAULT_BATCH_SIZE,
task_type: str | None = None,
titles: list[str] | None = None,
output_dimensionality: int | None = None,
) -> list[list[float]]
使用 批量端点 __ 嵌入字符串列表。
Google Generative AI 目前将最大批量大小设置为 100 个字符串。
| 参数 | 描述 |
|---|---|
texts
|
List[str] 要嵌入的字符串列表。 |
batch_size
|
[int] 要发送到模型的嵌入批量大小
类型: |
task_type
|
类型: |
titles
|
一个可选的标题列表,用于提供的文本。仅在 TaskType 为 |
output_dimensionality
|
可选的
TYPE: |
| 返回 | 描述 |
|---|---|
list[list[float]]
|
嵌入列表,每个文本一个。 |
AqaInput ¶
AqaOutput ¶
GenAIAqa ¶
基类:RunnableSerializable[AqaInput, AqaOutput]
Google 的归因问答服务。
给定用户的查询和段落列表,Google 的服务器将返回一个基于所提供段落列表的回答。它不会基于参数化记忆来生成回答。
| 属性 | 描述 |
|---|---|
answer_style |
仅关键字参数。有关详细信息,请参阅
类型: |
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
__init__ |
构建一个 Google Generative AI AQA 模型。 |
invoke |
使用提供的段落生成一个有根据的响应。 |
lc_secrets 属性 ¶
构造函数参数名称到密钥 ID 的映射。
例如,{"openai_api_key": "OPENAI_API_KEY"}
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
ainvoke async ¶
ainvoke(input: Input, config: RunnableConfig | None = None, **kwargs: Any) -> Output
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> list[Output]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: Input, config: RunnableConfig | None = None, **kwargs: Any | None
) -> AsyncIterator[Output]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
__init__ ¶
__init__(
*,
answer_style: int = ABSTRACTIVE,
safety_settings: list[SafetySetting] | None = None,
temperature: float | None = None,
**kwargs: Any,
) -> None
构建一个 Google Generative AI AQA 模型。
所有参数都是可选的。
| 参数 | 描述 |
|---|---|
answer_style
|
请参阅
类型: |
safety_settings
|
请参阅
类型: |
temperature
|
0.0 到 1.0。
类型: |
invoke ¶
invoke(
input: AqaInput, config: RunnableConfig | None = None, **kwargs: Any
) -> AqaOutput
使用提供的段落生成一个有根据的响应。
GoogleVectorStore ¶
Bases: VectorStore
Google GenerativeAI 向量存储。
目前,它在服务器端计算嵌入向量。
示例:将文本添加到现有语料库。
store = GoogleVectorStore(corpus_id="123")
store.add_documents(documents, document_id="456")
示例:创建一个新语料库。
store = GoogleVectorStore.create_corpus(
corpus_id="123", display_name="My Google corpus")
示例:查询语料库以获取相关段落。
store.as_retriever() .get_relevant_documents("Who caught the gingerbread man?")
示例:向语料库请求有根据的回复!
aqa = store.as_aqa()
response = aqa.invoke("Who caught the gingerbread man?")
print(response.answer)
print(response.attributed_passages)
print(response.answerability_probability)
您还可以在 Google 的文档级别进行操作。
示例:将文本添加到现有的 Google 向量存储文档中。
doc_store = GoogleVectorStore(corpus_id="123", document_id="456")
doc_store.add_documents(documents)
示例:创建一个新的 Google 向量存储文档。
doc_store = GoogleVectorStore.create_document(
corpus_id="123", document_id="456", display_name="My Google document")
示例:查询 Google 文档。
doc_store.as_retriever() .get_relevant_documents("Who caught the gingerbread man?")
有关更多详细信息,请参阅该类的方法。
| 方法 | 描述 |
|---|---|
get_by_ids |
根据 ID 获取文档。 |
aget_by_ids |
通过 ID 异步获取文档。 |
aadd_texts |
通过嵌入异步运行更多文本并添加到 `VectorStore`。 |
add_documents |
在 `VectorStore` 中添加或更新文档。 |
aadd_documents |
异步运行更多文档通过嵌入并添加到 `VectorStore`。 |
search |
使用指定的搜索类型返回与查询最相似的文档。 |
asearch |
异步返回与查询最相似的文档,使用指定的搜索类型。 |
asimilarity_search_with_score |
异步运行带距离的相似性搜索。 |
similarity_search_with_relevance_scores |
返回文档和在 `[0, 1]` 范围内的相关性分数。 |
asimilarity_search_with_relevance_scores |
异步返回文档和在 `[0, 1]` 范围内的相关性分数。 |
asimilarity_search |
异步返回与查询最相似的文档。 |
similarity_search_by_vector |
返回与嵌入向量最相似的文档。 |
asimilarity_search_by_vector |
异步返回与嵌入向量最相似的文档。 |
max_marginal_relevance_search |
返回使用最大边际相关性选择的文档。 |
amax_marginal_relevance_search |
异步返回使用最大边际相关性选择的文档。 |
max_marginal_relevance_search_by_vector |
返回使用最大边际相关性选择的文档。 |
amax_marginal_relevance_search_by_vector |
异步返回使用最大边际相关性选择的文档。 |
from_documents |
返回从文档和嵌入初始化的 `VectorStore`。 |
afrom_documents |
异步返回从文档和嵌入初始化的 `VectorStore`。 |
afrom_texts |
异步返回从文本和嵌入初始化的 `VectorStore`。 |
as_retriever |
返回从此 `VectorStore` 初始化的 `VectorStoreRetriever`。 |
__init__ |
返回现有的 Google Semantic Retriever 语料库或文档。 |
create_corpus |
创建一个 Google Semantic Retriever 语料库。 |
create_document |
创建一个 Google Semantic Retriever 文档。 |
from_texts |
返回包含指定文本的现有文档的向量存储。 |
add_texts |
将文本添加到向量存储中。 |
similarity_search |
在向量存储中搜索相关文本。 |
similarity_search_with_score |
运行带距离的相似性搜索。 |
delete |
删除数据块。 |
adelete |
异步删除数据块。 |
as_aqa |
构建一个 Google Generative AI AQA 引擎。 |
get_by_ids ¶
aget_by_ids async ¶
aadd_texts async ¶
aadd_texts(
texts: Iterable[str],
metadatas: list[dict] | None = None,
*,
ids: list[str] | None = None,
**kwargs: Any,
) -> list[str]
通过嵌入异步运行更多文本并添加到 `VectorStore`。
| 参数 | 描述 |
|---|---|
texts
|
要添加到 `VectorStore` 的字符串的可迭代对象。 |
metadatas
|
与文本关联的元数据可选列表。 |
ids
|
可选列表 |
**kwargs
|
`VectorStore` 特定参数。
类型: |
| 返回 | 描述 |
|---|---|
list[str]
|
将文本添加到 `VectorStore` 后返回的 ID 列表。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果元数据的数量与文本的数量不匹配。 |
ValueError
|
如果 ID 的数量与文本的数量不匹配。 |
add_documents ¶
aadd_documents async ¶
search ¶
使用指定的搜索类型返回与查询最相似的文档。
| 参数 | 描述 |
|---|---|
query
|
输入文本。
类型: |
search_type
|
要执行的搜索类型。可以是 `'similarity'`、`'mmr'` 或 `'similarity_score_threshold'`。
类型: |
**kwargs
|
传递给搜索方法的参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Document]
|
与查询最相似的 `Document` 对象列表。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 `search_type` 不是 `'similarity'`、`'mmr'` 或 `'similarity_score_threshold'` 之一。 |
asearch async ¶
异步返回与查询最相似的文档,使用指定的搜索类型。
| 参数 | 描述 |
|---|---|
query
|
输入文本。
类型: |
search_type
|
要执行的搜索类型。可以是 `'similarity'`、`'mmr'` 或 `'similarity_score_threshold'`。
类型: |
**kwargs
|
传递给搜索方法的参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Document]
|
与查询最相似的 `Document` 对象列表。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 `search_type` 不是 `'similarity'`、`'mmr'` 或 `'similarity_score_threshold'` 之一。 |
asimilarity_search_with_score async ¶
similarity_search_with_relevance_scores ¶
asimilarity_search_with_relevance_scores async ¶
asimilarity_search async ¶
similarity_search_by_vector ¶
asimilarity_search_by_vector async ¶
max_marginal_relevance_search ¶
max_marginal_relevance_search(
query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any
) -> list[Document]
返回使用最大边际相关性选择的文档。
最大边际相关性优化查询相似度与所选文档之间的多样性。
| 参数 | 描述 |
|---|---|
query
|
用于查找相似文档的文本。
类型: |
k
|
要返回的 `Document` 对象数量。
TYPE: |
fetch_k
|
要获取并传递给 MMR 算法的 `Document` 对象数量。
类型: |
lambda_mult
|
一个介于 `0` 和 `1` 之间的数字,决定了结果之间的多样性程度,其中 `0` 对应最大多样性,`1` 对应最小多样性。
类型: |
**kwargs
|
传递给搜索方法的参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Document]
|
通过最大边际相关性选择的 `Document` 对象列表。 |
amax_marginal_relevance_search async ¶
amax_marginal_relevance_search(
query: str, k: int = 4, fetch_k: int = 20, lambda_mult: float = 0.5, **kwargs: Any
) -> list[Document]
异步返回使用最大边际相关性选择的文档。
最大边际相关性优化查询相似度与所选文档之间的多样性。
| 参数 | 描述 |
|---|---|
query
|
用于查找相似文档的文本。
类型: |
k
|
要返回的 `Document` 对象数量。
TYPE: |
fetch_k
|
要获取并传递给 MMR 算法的 `Document` 对象数量。
类型: |
lambda_mult
|
一个介于 `0` 和 `1` 之间的数字,决定了结果之间的多样性程度,其中 `0` 对应最大多样性,`1` 对应最小多样性。
类型: |
**kwargs
|
传递给搜索方法的参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Document]
|
通过最大边际相关性选择的 `Document` 对象列表。 |
max_marginal_relevance_search_by_vector ¶
max_marginal_relevance_search_by_vector(
embedding: list[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]
返回使用最大边际相关性选择的文档。
最大边际相关性优化查询相似度与所选文档之间的多样性。
| 参数 | 描述 |
|---|---|
embedding
|
用于查找相似文档的嵌入。 |
k
|
要返回的 `Document` 对象数量。
TYPE: |
fetch_k
|
要获取并传递给 MMR 算法的 `Document` 对象数量。
类型: |
lambda_mult
|
一个介于 `0` 和 `1` 之间的数字,决定了结果之间的多样性程度,其中 `0` 对应最大多样性,`1` 对应最小多样性。
类型: |
**kwargs
|
传递给搜索方法的参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Document]
|
通过最大边际相关性选择的 `Document` 对象列表。 |
amax_marginal_relevance_search_by_vector async ¶
amax_marginal_relevance_search_by_vector(
embedding: list[float],
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
**kwargs: Any,
) -> list[Document]
异步返回使用最大边际相关性选择的文档。
最大边际相关性优化查询相似度与所选文档之间的多样性。
| 参数 | 描述 |
|---|---|
embedding
|
用于查找相似文档的嵌入。 |
k
|
要返回的 `Document` 对象数量。
TYPE: |
fetch_k
|
要获取并传递给 MMR 算法的 `Document` 对象数量。
类型: |
lambda_mult
|
一个介于 `0` 和 `1` 之间的数字,决定了结果之间的多样性程度,其中 `0` 对应最大多样性,`1` 对应最小多样性。
类型: |
**kwargs
|
传递给搜索方法的参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Document]
|
通过最大边际相关性选择的 `Document` 对象列表。 |
from_documents classmethod ¶
from_documents(documents: list[Document], embedding: Embeddings, **kwargs: Any) -> Self
返回从文档和嵌入初始化的 `VectorStore`。
| 参数 | 描述 |
|---|---|
documents
|
要添加到 `VectorStore` 的 `Document` 对象列表。 |
embedding
|
要使用的嵌入函数。
TYPE: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
Self
|
从文档和嵌入初始化的 `VectorStore`。 |
afrom_documents async classmethod ¶
afrom_documents(
documents: list[Document], embedding: Embeddings, **kwargs: Any
) -> Self
异步返回从文档和嵌入初始化的 `VectorStore`。
| 参数 | 描述 |
|---|---|
documents
|
要添加到 `VectorStore` 的 `Document` 对象列表。 |
embedding
|
要使用的嵌入函数。
TYPE: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
Self
|
从文档和嵌入初始化的 `VectorStore`。 |
afrom_texts async classmethod ¶
as_retriever ¶
as_retriever(**kwargs: Any) -> VectorStoreRetriever
返回从此 `VectorStore` 初始化的 `VectorStoreRetriever`。
| 参数 | 描述 |
|---|---|
**kwargs
|
传递给搜索函数的关键字参数。可以包括
类型: |
| 返回 | 描述 |
|---|---|
VectorStoreRetriever
|
`VectorStore` 的检索器类。 |
示例
# Retrieve more documents with higher diversity
# Useful if your dataset has many similar documents
docsearch.as_retriever(
search_type="mmr", search_kwargs={"k": 6, "lambda_mult": 0.25}
)
# Fetch more documents for the MMR algorithm to consider
# But only return the top 5
docsearch.as_retriever(search_type="mmr", search_kwargs={"k": 5, "fetch_k": 50})
# Only retrieve documents that have a relevance score
# Above a certain threshold
docsearch.as_retriever(
search_type="similarity_score_threshold",
search_kwargs={"score_threshold": 0.8},
)
# Only get the single most similar document from the dataset
docsearch.as_retriever(search_kwargs={"k": 1})
# Use a filter to only retrieve documents from a specific paper
docsearch.as_retriever(
search_kwargs={"filter": {"paper_title": "GPT-4 Technical Report"}}
)
__init__ ¶
返回现有的 Google Semantic Retriever 语料库或文档。
如果只提供了语料库 ID,则向量存储将在该语料库中的所有文档上操作。
如果提供了文档 ID,则向量存储将仅在该文档上操作。
| 引发 | 描述 |
|---|---|
DoesNotExistsException
|
如果这些 ID 在 Google 服务器上不匹配任何内容。在这种情况下,请考虑使用 |
create_corpus classmethod ¶
create_corpus(
corpus_id: str | None = None, display_name: str | None = None
) -> GoogleVectorStore
创建一个 Google Semantic Retriever 语料库。
| 参数 | 描述 |
|---|---|
corpus_id
|
用于创建新语料库的 ID。如果未提供,Google 服务器将提供一个。
类型: |
display_name
|
新语料库的标题。如果未提供,Google 服务器将提供一个。
类型: |
| 返回 | 描述 |
|---|---|
GoogleVectorStore
|
一个指向新创建的语料库的向量存储实例。 |
create_document classmethod ¶
create_document(
corpus_id: str,
document_id: str | None = None,
display_name: str | None = None,
metadata: dict[str, Any] | None = None,
) -> GoogleVectorStore
创建一个 Google Semantic Retriever 文档。
| 参数 | 描述 |
|---|---|
corpus_id
|
现有语料库的 ID。
类型: |
document_id
|
用于创建新的 Google Semantic Retriever 文档的 ID。如果未提供,Google 服务器将提供一个。
类型: |
display_name
|
新文档的标题。如果未提供,Google 服务器将提供一个。
类型: |
| 返回 | 描述 |
|---|---|
GoogleVectorStore
|
一个指向新创建的 |
GoogleVectorStore
|
文档的向量存储实例。 |
from_texts classmethod ¶
from_texts(
texts: list[str],
embedding: Embeddings | None = None,
metadatas: list[dict[str, Any]] | None = None,
*,
corpus_id: str | None = None,
document_id: str | None = None,
**kwargs: Any,
) -> GoogleVectorStore
返回包含指定文本的现有文档的向量存储。
| 参数 | 描述 |
|---|---|
corpus_id
|
必需。必须是一个现有的语料库。
类型: |
document_id
|
必需。必须是一个现有的文档。
类型: |
texts
|
要加载到向量存储中的文本。 |
| 返回 | 描述 |
|---|---|
GoogleVectorStore
|
一个指向指定的 Google Semantic Retriever 的向量存储 |
GoogleVectorStore
|
文档。 |
| 引发 | 描述 |
|---|---|
DoesNotExistsException
|
如果这些 ID 在 Google 服务器上不匹配任何内容。 |
add_texts ¶
similarity_search ¶
similarity_search(
query: str, k: int = 4, filter: dict[str, Any] | None = None, **kwargs: Any
) -> list[Document]
在向量存储中搜索相关文本。
similarity_search_with_score ¶
similarity_search_with_score(
query: str, k: int = 4, filter: dict[str, Any] | None = None, **kwargs: Any
) -> list[tuple[Document, float]]
运行带距离的相似性搜索。
delete ¶
删除数据块。
请注意,“ids” 不是语料库 ID 或文档 ID。而是 add_texts 返回的实体名称。
| 返回 | 描述 |
|---|---|
bool | None
|
如果成功,则为 True。否则,您无论如何都会收到一个异常。 |
adelete async ¶
异步删除数据块。
请注意,“ids” 不是语料库 ID 或文档 ID。而是 add_texts 返回的实体名称。
| 返回 | 描述 |
|---|---|
bool | None
|
如果成功,则为 True。否则,您无论如何都会收到一个异常。 |
as_aqa ¶
as_aqa(
*,
answer_style: int = 1,
safety_settings: list[Any] | None = None,
temperature: float | None = None,
) -> Runnable[str, AqaOutput]
GoogleGenerativeAI ¶
基类:_BaseGoogleGenerativeAI, BaseLLM
Google GenerativeAI 模型。
示例
.. code-block:: python
from langchain_google_genai import GoogleGenerativeAI
llm = GoogleGenerativeAI(model="gemini-2.5-pro")
| 方法 | 描述 |
|---|---|
get_name |
获取 |
get_input_schema |
获取可用于验证 |
get_input_jsonschema |
获取表示 |
get_output_schema |
获取可用于验证 |
get_output_jsonschema |
获取表示 |
config_schema |
此 |
get_config_jsonschema |
获取表示 |
get_graph |
返回此 |
get_prompts |
返回此 |
__or__ |
Runnable "or" 运算符。 |
__ror__ |
Runnable "reverse-or" 运算符。 |
pipe |
管道连接 |
pick |
从此 |
assign |
向此 |
invoke |
将单个输入转换为输出。 |
ainvoke |
将单个输入转换为输出。 |
batch |
默认实现使用线程池执行器并行运行 invoke。 |
batch_as_completed |
在输入列表上并行运行 |
abatch |
默认实现使用 |
abatch_as_completed |
在输入列表上并行运行 |
stream |
|
astream |
|
astream_log |
流式传输 |
astream_events |
生成事件流。 |
transform |
将输入转换为输出。 |
atransform |
将输入转换为输出。 |
bind |
将参数绑定到 |
with_config |
将配置绑定到 |
with_listeners |
将生命周期侦听器绑定到 |
with_alisteners |
将异步生命周期侦听器绑定到 |
with_types |
将输入和输出类型绑定到 |
with_retry |
创建一个新的 |
map |
返回一个新的 |
with_fallbacks |
向 |
as_tool |
从 |
is_lc_serializable |
这个类是否可序列化? |
get_lc_namespace |
获取 LangChain 对象的命名空间。 |
lc_id |
为此类返回一个用于序列化目的的唯一标识符。 |
to_json |
将 |
to_json_not_implemented |
序列化一个“未实现”的对象。 |
configurable_fields |
在运行时配置特定的 |
configurable_alternatives |
为可在运行时设置的 |
set_verbose |
如果 verbose 是 |
generate_prompt |
将一系列提示传递给模型并返回模型生成的内容。 |
agenerate_prompt |
异步地将一系列提示传递并返回模型生成的内容。 |
with_structured_output |
此类未实现。 |
get_token_ids |
返回文本中 token 的有序 ID。 |
get_num_tokens_from_messages |
获取消息中的 token 数量。 |
generate |
向模型传递一系列提示并返回生成结果。 |
agenerate |
异步地将一系列提示传递给模型并返回生成的内容。 |
__str__ |
返回对象的字符串表示形式以供打印。 |
dict |
返回 LLM 的字典。 |
save |
保存 LLM。 |
__init__ |
用于参数验证。 |
validate_environment |
验证参数并将其传递给 google-generativeai 包。 |
get_num_tokens |
获取文本中存在的 token 数量。 |
cache class-attribute instance-attribute ¶
是否缓存响应。
- 如果为
True,将使用全局缓存。 - 如果为
False,将不使用缓存 - 如果为
None,如果设置了全局缓存,则使用全局缓存,否则不使用缓存。 - 如果是
BaseCache的实例,将使用提供的缓存。
目前不支持模型的流式方法的缓存。
verbose class-attribute instance-attribute ¶
是否打印响应文本。
callbacks class-attribute instance-attribute ¶
callbacks: Callbacks = Field(default=None, exclude=True)
添加到运行跟踪中的回调。
tags class-attribute instance-attribute ¶
添加到运行跟踪中的标签。
metadata class-attribute instance-attribute ¶
添加到运行跟踪中的元数据。
custom_get_token_ids class-attribute instance-attribute ¶
用于计算 token 的可选编码器。
model class-attribute instance-attribute ¶
model: str = Field(
...,
description="The name of the model to use.\nExamples:\n - gemini-2.5-flash\n - models/text-bison-001",
)
要使用的模型名称。
google_api_key class-attribute instance-attribute ¶
google_api_key: SecretStr | None = Field(
alias="api_key", default_factory=secret_from_env("GOOGLE_API_KEY", default=None)
)
Google AI API 密钥。如果未指定,将从环境变量 GOOGLE_API_KEY 读取。
credentials class-attribute instance-attribute ¶
credentials: Any = None
要使用的默认自定义凭据 (google.auth.credentials.Credentials)
temperature class-attribute instance-attribute ¶
temperature: float = 0.7
使用此温度进行推理。必须在 [0.0, 2.0] 范围内。如果未设置,将默认为 0.7。
top_p class-attribute instance-attribute ¶
top_p: float | None = None
使用核心采样进行解码:考虑概率总和至少为 top_p 的最小词元集。必须在 [0.0, 1.0] 范围内。
top_k class-attribute instance-attribute ¶
top_k: int | None = None
使用 top-k 采样进行解码:考虑 top_k 个最可能词元的集合。必须为正数。
max_output_tokens class-attribute instance-attribute ¶
候选内容中包含的最大词元数。必须大于零。如果未设置,将使用模型的默认值,该值因模型而异。有关模型特定限制,请参阅 https://ai.google.dev/gemini-api/docs/models。
max_retries class-attribute instance-attribute ¶
生成时进行的最大重试次数。如果未设置,将默认为 6。
timeout class-attribute instance-attribute ¶
等待响应的最大秒数。
safety_settings class-attribute instance-attribute ¶
safety_settings: dict[HarmCategory, HarmBlockThreshold] | None = None
用于所有生成的默认安全设置。
例如
.. code-block:: python from google.generativeai.types.safety_types import HarmBlockThreshold, HarmCategory
safety_settings = {
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_MEDIUM_AND_ABOVE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_LOW_AND_ABOVE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
}
get_name ¶
get_input_schema ¶
get_input_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输入的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输入模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输入模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输入的 Pydantic 模型。 |
get_input_jsonschema ¶
get_input_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输入的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
get_output_schema ¶
get_output_schema(config: RunnableConfig | None = None) -> type[BaseModel]
获取可用于验证 Runnable 输出的 Pydantic 模型。
利用 configurable_fields 和 configurable_alternatives 方法的 Runnable 对象将具有一个动态输出模式,该模式取决于调用 Runnable 时使用的配置。
此方法允许获取特定配置的输出模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
type[BaseModel]
|
一个可用于验证输出的 Pydantic 模型。 |
get_output_jsonschema ¶
get_output_jsonschema(config: RunnableConfig | None = None) -> dict[str, Any]
获取表示 Runnable 输出的 JSON 模式。
| 参数 | 描述 |
|---|---|
配置
|
生成模式时使用的配置。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
表示 |
示例
在 0.3.0 版本中新增。
config_schema ¶
get_config_jsonschema ¶
get_prompts ¶
get_prompts(config: RunnableConfig | None = None) -> list[BasePromptTemplate]
返回此 Runnable 使用的提示列表。
__or__ ¶
__or__(
other: Runnable[Any, Other]
| Callable[[Iterator[Any]], Iterator[Other]]
| Callable[[AsyncIterator[Any]], AsyncIterator[Other]]
| Callable[[Any], Other]
| Mapping[str, Runnable[Any, Other] | Callable[[Any], Other] | Any],
) -> RunnableSerializable[Input, Other]
Runnable "or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
__ror__ ¶
__ror__(
other: Runnable[Other, Any]
| Callable[[Iterator[Other]], Iterator[Any]]
| Callable[[AsyncIterator[Other]], AsyncIterator[Any]]
| Callable[[Other], Any]
| Mapping[str, Runnable[Other, Any] | Callable[[Other], Any] | Any],
) -> RunnableSerializable[Other, Output]
Runnable "reverse-or" 运算符。
将此 Runnable 与另一个对象组合以创建 RunnableSequence。
| 参数 | 描述 |
|---|---|
other
|
另一个
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Other, Output]
|
一个新的 |
pipe ¶
pipe(
*others: Runnable[Any, Other] | Callable[[Any], Other], name: str | None = None
) -> RunnableSerializable[Input, Other]
管道连接 Runnable 对象。
将此 Runnable 与类 Runnable 对象组合以构成一个 RunnableSequence。
等同于 RunnableSequence(self, *others) 或 self | others[0] | ...
示例
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
def mul_two(x: int) -> int:
return x * 2
runnable_1 = RunnableLambda(add_one)
runnable_2 = RunnableLambda(mul_two)
sequence = runnable_1.pipe(runnable_2)
# Or equivalently:
# sequence = runnable_1 | runnable_2
# sequence = RunnableSequence(first=runnable_1, last=runnable_2)
sequence.invoke(1)
await sequence.ainvoke(1)
# -> 4
sequence.batch([1, 2, 3])
await sequence.abatch([1, 2, 3])
# -> [4, 6, 8]
| 参数 | 描述 |
|---|---|
*其他
|
其他要组合的 |
name
|
生成的
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Other]
|
一个新的 |
pick ¶
从此 Runnable 的输出 dict 中选择键。
选择单个键
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
chain = RunnableMap(str=as_str, json=as_json)
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3]}
json_only_chain = chain.pick("json")
json_only_chain.invoke("[1, 2, 3]")
# -> [1, 2, 3]
选择键列表
from typing import Any
import json
from langchain_core.runnables import RunnableLambda, RunnableMap
as_str = RunnableLambda(str)
as_json = RunnableLambda(json.loads)
def as_bytes(x: Any) -> bytes:
return bytes(x, "utf-8")
chain = RunnableMap(str=as_str, json=as_json, bytes=RunnableLambda(as_bytes))
chain.invoke("[1, 2, 3]")
# -> {"str": "[1, 2, 3]", "json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
json_and_bytes_chain = chain.pick(["json", "bytes"])
json_and_bytes_chain.invoke("[1, 2, 3]")
# -> {"json": [1, 2, 3], "bytes": b"[1, 2, 3]"}
| 参数 | 描述 |
|---|---|
keys
|
从输出字典中选择的一个键或键列表。 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
assign ¶
assign(
**kwargs: Runnable[dict[str, Any], Any]
| Callable[[dict[str, Any]], Any]
| Mapping[str, Runnable[dict[str, Any], Any] | Callable[[dict[str, Any]], Any]],
) -> RunnableSerializable[Any, Any]
向此 Runnable 的 dict 输出分配新字段。
from langchain_community.llms.fake import FakeStreamingListLLM
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import SystemMessagePromptTemplate
from langchain_core.runnables import Runnable
from operator import itemgetter
prompt = (
SystemMessagePromptTemplate.from_template("You are a nice assistant.")
+ "{question}"
)
model = FakeStreamingListLLM(responses=["foo-lish"])
chain: Runnable = prompt | model | {"str": StrOutputParser()}
chain_with_assign = chain.assign(hello=itemgetter("str") | model)
print(chain_with_assign.input_schema.model_json_schema())
# {'title': 'PromptInput', 'type': 'object', 'properties':
{'question': {'title': 'Question', 'type': 'string'}}}
print(chain_with_assign.output_schema.model_json_schema())
# {'title': 'RunnableSequenceOutput', 'type': 'object', 'properties':
{'str': {'title': 'Str',
'type': 'string'}, 'hello': {'title': 'Hello', 'type': 'string'}}}
| 参数 | 描述 |
|---|---|
**kwargs
|
一个键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Any, Any]
|
一个新的 |
invoke ¶
invoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> str
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
ainvoke async ¶
ainvoke(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> str
将单个输入转换为输出。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
调用
类型: |
| 返回 | 描述 |
|---|---|
输出
|
|
batch ¶
batch(
inputs: list[LanguageModelInput],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any,
) -> list[str]
默认实现使用线程池执行器并行运行 invoke。
批处理的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
batch_as_completed ¶
batch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> Iterator[tuple[int, Output | Exception]]
在输入列表上并行运行 invoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
tuple[int, Output | Exception]
|
由输入索引和 |
abatch async ¶
abatch(
inputs: list[LanguageModelInput],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any,
) -> list[str]
默认实现使用 asyncio.gather 并行运行 ainvoke。
batch 的默认实现对于 IO 密集型的 runnable 效果很好。
如果子类能够更有效地进行批处理,则必须重写此方法;例如,如果底层的 Runnable 使用支持批处理模式的 API。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
list[Output]
|
来自 |
abatch_as_completed async ¶
abatch_as_completed(
inputs: Sequence[Input],
config: RunnableConfig | Sequence[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any | None,
) -> AsyncIterator[tuple[int, Output | Exception]]
在输入列表上并行运行 ainvoke。
在结果完成时生成它们。
| 参数 | 描述 |
|---|---|
inputs
|
类型: |
配置
|
调用
类型: |
返回异常
|
是否返回异常而不是引发它们。
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[tuple[int, Output | Exception]]
|
一个由输入索引和 |
stream ¶
stream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> Iterator[str]
stream 的默认实现,它调用 invoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
astream async ¶
astream(
input: LanguageModelInput,
config: RunnableConfig | None = None,
*,
stop: list[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[str]
astream 的默认实现,它调用 ainvoke。
如果子类支持流式输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
astream_log async ¶
astream_log(
input: Any,
config: RunnableConfig | None = None,
*,
diff: bool = True,
with_streamed_output_list: bool = True,
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
流式传输 Runnable 的所有输出,如回调系统所报告。
这包括 LLM、检索器、工具等的所有内部运行。
输出以 Log 对象的形式流式传输,其中包括一个 Jsonpatch 操作列表,描述了运行状态在每一步中如何变化,以及运行的最终状态。
可以按顺序应用 Jsonpatch 操作来构造状态。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
差异
|
是生成每一步之间的差异还是当前状态。
类型: |
带流式输出列表
|
是否生成
类型: |
包含名称
|
仅包含具有这些名称的日志。 |
包含类型
|
仅包含具有这些类型的日志。 |
包含标签
|
仅包含具有这些标签的日志。 |
排除名称
|
排除具有这些名称的日志。 |
排除类型
|
排除具有这些类型的日志。 |
排除标签
|
排除具有这些标签的日志。 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[RunLogPatch] | AsyncIterator[RunLog]
|
一个 |
astream_events async ¶
astream_events(
input: Any,
config: RunnableConfig | None = None,
*,
version: Literal["v1", "v2"] = "v2",
include_names: Sequence[str] | None = None,
include_types: Sequence[str] | None = None,
include_tags: Sequence[str] | None = None,
exclude_names: Sequence[str] | None = None,
exclude_types: Sequence[str] | None = None,
exclude_tags: Sequence[str] | None = None,
**kwargs: Any,
) -> AsyncIterator[StreamEvent]
生成事件流。
用于创建一个 StreamEvent 的迭代器,提供有关 Runnable 进度的实时信息,包括来自中间结果的 StreamEvent。
一个 StreamEvent 是一个具有以下模式的字典
event:事件名称的格式为:on_[runnable_type]_(start|stream|end)。name:生成事件的Runnable的名称。run_id:与发出事件的Runnable的给定执行相关联的随机生成的 ID。作为父Runnable执行的一部分被调用的子Runnable被分配其自己的唯一 ID。parent_ids:生成事件的父可运行对象的 ID。根Runnable将有一个空列表。父 ID 的顺序是从根到直接父级。仅适用于 API 的 v2 版本。API 的 v1 版本将返回一个空列表。tags:生成事件的Runnable的标签。metadata:生成事件的Runnable的元数据。data:与事件关联的数据。此字段的内容取决于事件的类型。有关更多详细信息,请参见下表。
下表说明了各种链可能发出的某些事件。为简洁起见,已从表中省略了元数据字段。链定义已包含在表之后。
注意
此参考表适用于模式的 v2 版本。
| 事件 | name | chunk | 输入 | output |
|---|---|---|---|---|
on_chat_model_start |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
||
on_chat_model_stream |
'[model name]' |
AIMessageChunk(content="hello") |
||
on_chat_model_end |
'[model name]' |
{"messages": [[SystemMessage, HumanMessage]]} |
AIMessageChunk(content="hello world") |
|
on_llm_start |
'[model name]' |
{'input': 'hello'} |
||
on_llm_stream |
'[model name]' |
'你好' |
||
on_llm_end |
'[model name]' |
'你好,人类!' |
||
on_chain_start |
'format_docs' |
|||
on_chain_stream |
'format_docs' |
'hello world!, goodbye world!' |
||
on_chain_end |
'format_docs' |
[Document(...)] |
'hello world!, goodbye world!' |
|
on_tool_start |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_tool_end |
'some_tool' |
{"x": 1, "y": "2"} |
||
on_retriever_start |
'[retriever name]' |
{"query": "hello"} |
||
on_retriever_end |
'[retriever name]' |
{"query": "hello"} |
[Document(...), ..] |
|
on_prompt_start |
'[template_name]' |
{"question": "hello"} |
||
on_prompt_end |
'[template_name]' |
{"question": "hello"} |
ChatPromptValue(messages: [SystemMessage, ...]) |
除了标准事件外,用户还可以分派自定义事件(见下例)。
自定义事件将仅在 API 的 v2 版本中出现!
自定义事件具有以下格式
| 属性 | 类型 | 描述 |
|---|---|---|
name |
str |
用户为事件定义的名称。 |
data |
任意 |
与事件关联的数据。这可以是任何东西,但我们建议使其可 JSON 序列化。 |
以下是与上面显示的标准事件相关的声明
format_docs:
def format_docs(docs: list[Document]) -> str:
'''Format the docs.'''
return ", ".join([doc.page_content for doc in docs])
format_docs = RunnableLambda(format_docs)
some_tool:
prompt:
template = ChatPromptTemplate.from_messages(
[
("system", "You are Cat Agent 007"),
("human", "{question}"),
]
).with_config({"run_name": "my_template", "tags": ["my_template"]})
例如
from langchain_core.runnables import RunnableLambda
async def reverse(s: str) -> str:
return s[::-1]
chain = RunnableLambda(func=reverse)
events = [event async for event in chain.astream_events("hello", version="v2")]
# Will produce the following events
# (run_id, and parent_ids has been omitted for brevity):
[
{
"data": {"input": "hello"},
"event": "on_chain_start",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"chunk": "olleh"},
"event": "on_chain_stream",
"metadata": {},
"name": "reverse",
"tags": [],
},
{
"data": {"output": "olleh"},
"event": "on_chain_end",
"metadata": {},
"name": "reverse",
"tags": [],
},
]
from langchain_core.callbacks.manager import (
adispatch_custom_event,
)
from langchain_core.runnables import RunnableLambda, RunnableConfig
import asyncio
async def slow_thing(some_input: str, config: RunnableConfig) -> str:
"""Do something that takes a long time."""
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 1 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
await adispatch_custom_event(
"progress_event",
{"message": "Finished step 2 of 3"},
config=config # Must be included for python < 3.10
)
await asyncio.sleep(1) # Placeholder for some slow operation
return "Done"
slow_thing = RunnableLambda(slow_thing)
async for event in slow_thing.astream_events("some_input", version="v2"):
print(event)
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
版本
|
要使用的模式版本,可以是
类型: |
包含名称
|
仅包括来自具有匹配名称的 |
包含类型
|
仅包括来自具有匹配类型的 |
包含标签
|
仅包括来自具有匹配标签的 |
排除名称
|
排除来自具有匹配名称的 |
排除类型
|
排除来自具有匹配类型的 |
排除标签
|
排除来自具有匹配标签的 |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[StreamEvent]
|
|
| 引发 | 描述 |
|---|---|
NotImplementedError
|
如果版本不是 |
transform ¶
transform(
input: Iterator[Input], config: RunnableConfig | None = None, **kwargs: Any | None
) -> Iterator[Output]
将输入转换为输出。
transform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
输出
|
|
atransform async ¶
atransform(
input: AsyncIterator[Input],
config: RunnableConfig | None = None,
**kwargs: Any | None,
) -> AsyncIterator[Output]
将输入转换为输出。
atransform 的默认实现,它缓冲输入并调用 astream。
如果子类可以在输入仍在生成时开始产生输出,则必须重写此方法。
| 参数 | 描述 |
|---|---|
输入
|
类型: |
配置
|
用于
类型: |
**kwargs
|
要传递给
类型: |
| YIELDS | 描述 |
|---|---|
AsyncIterator[Output]
|
|
bind ¶
将参数绑定到 Runnable,返回一个新的 Runnable。
当链中的 Runnable 需要一个不在前一个 Runnable 的输出中或未包含在用户输入中的参数时非常有用。
| 参数 | 描述 |
|---|---|
**kwargs
|
要绑定到
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了参数的新 |
示例
from langchain_ollama import ChatOllama
from langchain_core.output_parsers import StrOutputParser
model = ChatOllama(model="llama3.1")
# Without bind
chain = model | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two three four five.'
# With bind
chain = model.bind(stop=["three"]) | StrOutputParser()
chain.invoke("Repeat quoted words exactly: 'One two three four five.'")
# Output is 'One two'
with_config ¶
with_config(
config: RunnableConfig | None = None, **kwargs: Any
) -> Runnable[Input, Output]
将配置绑定到 Runnable,返回一个新的 Runnable。
| 参数 | 描述 |
|---|---|
配置
|
要绑定到
类型: |
**kwargs
|
要传递给
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了配置的新 |
with_listeners ¶
with_listeners(
*,
on_start: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
on_end: Callable[[Run], None] | Callable[[Run, RunnableConfig], None] | None = None,
on_error: Callable[[Run], None]
| Callable[[Run, RunnableConfig], None]
| None = None,
) -> Runnable[Input, Output]
将生命周期侦听器绑定到 Runnable,返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda
from langchain_core.tracers.schemas import Run
import time
def test_runnable(time_to_sleep: int):
time.sleep(time_to_sleep)
def fn_start(run_obj: Run):
print("start_time:", run_obj.start_time)
def fn_end(run_obj: Run):
print("end_time:", run_obj.end_time)
chain = RunnableLambda(test_runnable).with_listeners(
on_start=fn_start, on_end=fn_end
)
chain.invoke(2)
with_alisteners ¶
with_alisteners(
*,
on_start: AsyncListener | None = None,
on_end: AsyncListener | None = None,
on_error: AsyncListener | None = None,
) -> Runnable[Input, Output]
将异步生命周期侦听器绑定到 Runnable。
返回一个新的 Runnable。
Run 对象包含有关运行的信息,包括其 id、type、input、output、error、start_time、end_time 以及添加到运行中的任何标签或元数据。
| 参数 | 描述 |
|---|---|
开始时
|
在
类型: |
结束时
|
在
类型: |
出错时
|
如果
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个绑定了监听器的新 |
示例
from langchain_core.runnables import RunnableLambda, Runnable
from datetime import datetime, timezone
import time
import asyncio
def format_t(timestamp: float) -> str:
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat()
async def test_runnable(time_to_sleep: int):
print(f"Runnable[{time_to_sleep}s]: starts at {format_t(time.time())}")
await asyncio.sleep(time_to_sleep)
print(f"Runnable[{time_to_sleep}s]: ends at {format_t(time.time())}")
async def fn_start(run_obj: Runnable):
print(f"on start callback starts at {format_t(time.time())}")
await asyncio.sleep(3)
print(f"on start callback ends at {format_t(time.time())}")
async def fn_end(run_obj: Runnable):
print(f"on end callback starts at {format_t(time.time())}")
await asyncio.sleep(2)
print(f"on end callback ends at {format_t(time.time())}")
runnable = RunnableLambda(test_runnable).with_alisteners(
on_start=fn_start,
on_end=fn_end
)
async def concurrent_runs():
await asyncio.gather(runnable.ainvoke(2), runnable.ainvoke(3))
asyncio.run(concurrent_runs())
Result:
on start callback starts at 2025-03-01T07:05:22.875378+00:00
on start callback starts at 2025-03-01T07:05:22.875495+00:00
on start callback ends at 2025-03-01T07:05:25.878862+00:00
on start callback ends at 2025-03-01T07:05:25.878947+00:00
Runnable[2s]: starts at 2025-03-01T07:05:25.879392+00:00
Runnable[3s]: starts at 2025-03-01T07:05:25.879804+00:00
Runnable[2s]: ends at 2025-03-01T07:05:27.881998+00:00
on end callback starts at 2025-03-01T07:05:27.882360+00:00
Runnable[3s]: ends at 2025-03-01T07:05:28.881737+00:00
on end callback starts at 2025-03-01T07:05:28.882428+00:00
on end callback ends at 2025-03-01T07:05:29.883893+00:00
on end callback ends at 2025-03-01T07:05:30.884831+00:00
with_types ¶
with_retry ¶
with_retry(
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
exponential_jitter_params: ExponentialJitterParams | None = None,
stop_after_attempt: int = 3,
) -> Runnable[Input, Output]
创建一个新的 Runnable,它在发生异常时重试原始的 Runnable。
| 参数 | 描述 |
|---|---|
如果异常类型则重试
|
一个用于重试的异常类型元组。
类型: |
指数等待抖动
|
是否在两次重试之间的等待时间中添加抖动。
类型: |
尝试后停止
|
放弃前尝试的最大次数。
类型: |
指数抖动参数
|
类型: |
| 返回 | 描述 |
|---|---|
Runnable[Input, Output]
|
一个新的 Runnable,它会在发生异常时重试原始的 Runnable。 |
示例
from langchain_core.runnables import RunnableLambda
count = 0
def _lambda(x: int) -> None:
global count
count = count + 1
if x == 1:
raise ValueError("x is 1")
else:
pass
runnable = RunnableLambda(_lambda)
try:
runnable.with_retry(
stop_after_attempt=2,
retry_if_exception_type=(ValueError,),
).invoke(1)
except ValueError:
pass
assert count == 2
map ¶
with_fallbacks ¶
with_fallbacks(
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,),
exception_key: str | None = None,
) -> RunnableWithFallbacks[Input, Output]
向 Runnable 添加回退机制,返回一个新的 Runnable。
新的 Runnable 将在失败时先尝试原始的 Runnable,然后按顺序尝试每个备选方案。
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
示例
from typing import Iterator
from langchain_core.runnables import RunnableGenerator
def _generate_immediate_error(input: Iterator) -> Iterator[str]:
raise ValueError()
yield ""
def _generate(input: Iterator) -> Iterator[str]:
yield from "foo bar"
runnable = RunnableGenerator(_generate_immediate_error).with_fallbacks(
[RunnableGenerator(_generate)]
)
print("".join(runnable.stream({}))) # foo bar
| 参数 | 描述 |
|---|---|
备用方案
|
如果原始 |
要处理的异常
|
一个要处理的异常类型元组。
类型: |
异常键
|
如果指定了
类型: |
| 返回 | 描述 |
|---|---|
RunnableWithFallbacks[Input, Output]
|
一个新的 |
as_tool ¶
as_tool(
args_schema: type[BaseModel] | None = None,
*,
name: str | None = None,
description: str | None = None,
arg_types: dict[str, type] | None = None,
) -> BaseTool
从 Runnable 创建一个 BaseTool。
as_tool 将从一个 Runnable 实例化一个 BaseTool,该工具具有名称、描述和 args_schema。在可能的情况下,模式会从 runnable.get_input_schema 中推断。或者(例如,如果 Runnable 接受一个字典作为输入,并且特定的字典键没有类型),模式可以通过 args_schema 直接指定。你也可以传递 arg_types 来仅指定必需的参数及其类型。
| 参数 | 描述 |
|---|---|
参数模式
|
工具的模式。 |
name
|
工具的名称。
类型: |
描述
|
工具的描述。
类型: |
参数类型
|
一个从参数名称到类型的字典。 |
| 返回 | 描述 |
|---|---|
BaseTool
|
一个 |
类型化字典输入
from typing_extensions import TypedDict
from langchain_core.runnables import RunnableLambda
class Args(TypedDict):
a: int
b: list[int]
def f(x: Args) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool()
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 args_schema 指定模式
from typing import Any
from pydantic import BaseModel, Field
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
class FSchema(BaseModel):
"""Apply a function to an integer and list of integers."""
a: int = Field(..., description="Integer")
b: list[int] = Field(..., description="List of ints")
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(FSchema)
as_tool.invoke({"a": 3, "b": [1, 2]})
dict 输入,通过 arg_types 指定模式
from typing import Any
from langchain_core.runnables import RunnableLambda
def f(x: dict[str, Any]) -> str:
return str(x["a"] * max(x["b"]))
runnable = RunnableLambda(f)
as_tool = runnable.as_tool(arg_types={"a": int, "b": list[int]})
as_tool.invoke({"a": 3, "b": [1, 2]})
字符串输入
is_lc_serializable classmethod ¶
is_lc_serializable() -> bool
这个类是否可序列化?
根据设计,即使一个类继承自 Serializable,它默认也是不可序列化的。这是为了防止意外序列化不应被序列化的对象。
| 返回 | 描述 |
|---|---|
bool
|
类是否可序列化。默认为 |
get_lc_namespace classmethod ¶
lc_id classmethod ¶
为此类返回一个用于序列化目的的唯一标识符。
唯一标识符是一个描述对象路径的字符串列表。
例如,对于类 langchain.llms.openai.OpenAI,id 是 ["langchain", "llms", "openai", "OpenAI"]。
to_json ¶
将 Runnable 序列化为 JSON。
| 返回 | 描述 |
|---|---|
SerializedConstructor | SerializedNotImplemented
|
一个 |
to_json_not_implemented ¶
序列化一个“未实现”的对象。
| 返回 | 描述 |
|---|---|
SerializedNotImplemented
|
|
configurable_fields ¶
configurable_fields(
**kwargs: AnyConfigurableField,
) -> RunnableSerializable[Input, Output]
在运行时配置特定的 Runnable 字段。
| 参数 | 描述 |
|---|---|
**kwargs
|
一个要配置的
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果在 |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了字段的新 |
from langchain_core.runnables import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatOpenAI(max_tokens=20).configurable_fields(
max_tokens=ConfigurableField(
id="output_token_number",
name="Max tokens in the output",
description="The maximum number of tokens in the output",
)
)
# max_tokens = 20
print("max_tokens_20: ", model.invoke("tell me something about chess").content)
# max_tokens = 200
print(
"max_tokens_200: ",
model.with_config(configurable={"output_token_number": 200})
.invoke("tell me something about chess")
.content,
)
configurable_alternatives ¶
configurable_alternatives(
which: ConfigurableField,
*,
default_key: str = "default",
prefix_keys: bool = False,
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]],
) -> RunnableSerializable[Input, Output]
为可在运行时设置的 Runnable 对象配置备选项。
| 参数 | 描述 |
|---|---|
哪个
|
将用于选择备选项的
类型: |
默认键
|
如果未选择备选项,则使用的默认键。
类型: |
前缀键
|
是否用
类型: |
**kwargs
|
一个从键到
类型: |
| 返回 | 描述 |
|---|---|
RunnableSerializable[Input, Output]
|
一个配置了备选项的新 |
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables.utils import ConfigurableField
from langchain_openai import ChatOpenAI
model = ChatAnthropic(
model_name="claude-sonnet-4-5-20250929"
).configurable_alternatives(
ConfigurableField(id="llm"),
default_key="anthropic",
openai=ChatOpenAI(),
)
# uses the default model ChatAnthropic
print(model.invoke("which organization created you?").content)
# uses ChatOpenAI
print(
model.with_config(configurable={"llm": "openai"})
.invoke("which organization created you?")
.content
)
set_verbose ¶
generate_prompt ¶
generate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
**kwargs: Any,
) -> LLMResult
将一系列提示传递给模型并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate_prompt async ¶
agenerate_prompt(
prompts: list[PromptValue],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递并返回模型生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
类型: |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
with_structured_output ¶
with_structured_output(
schema: dict | type, **kwargs: Any
) -> Runnable[LanguageModelInput, dict | BaseModel]
此类未实现。
get_token_ids ¶
get_num_tokens_from_messages ¶
get_num_tokens_from_messages(
messages: list[BaseMessage], tools: Sequence | None = None
) -> int
获取消息中的 token 数量。
用于检查输入是否适合模型的上下文窗口。
注意
get_num_tokens_from_messages 的基本实现忽略了工具模式。
| 参数 | 描述 |
|---|---|
messages
|
要进行分词的消息输入。
类型: |
工具
|
如果提供,则为要转换为工具模式的 dict、
类型: |
| 返回 | 描述 |
|---|---|
int
|
所有消息中词元数量的总和。 |
generate ¶
generate(
prompts: list[str],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
*,
tags: list[str] | list[list[str]] | None = None,
metadata: dict[str, Any] | list[dict[str, Any]] | None = None,
run_name: str | list[str] | None = None,
run_id: UUID | list[UUID | None] | None = None,
**kwargs: Any,
) -> LLMResult
向模型传递一系列提示并返回生成结果。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
字符串提示列表。 |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要与每个提示关联的标签列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
metadata
|
要与每个提示关联的元数据字典列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
运行名称
|
要与每个提示关联的运行名称列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
run_id
|
要与每个提示关联的运行 ID 列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果提示不是列表。 |
ValueError
|
如果 `callbacks`、`tags`、`metadata` 或 `run_name`(如果提供)的长度与提示的长度不匹配。 |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |
agenerate async ¶
agenerate(
prompts: list[str],
stop: list[str] | None = None,
callbacks: Callbacks | list[Callbacks] | None = None,
*,
tags: list[str] | list[list[str]] | None = None,
metadata: dict[str, Any] | list[dict[str, Any]] | None = None,
run_name: str | list[str] | None = None,
run_id: UUID | list[UUID | None] | None = None,
**kwargs: Any,
) -> LLMResult
异步地将一系列提示传递给模型并返回生成的内容。
对于提供批量 API 的模型,此方法应利用批量调用。
当你想要
- 利用批量调用,
- 需要从模型获得比最高生成值更多的输出,
- 正在构建与底层语言模型类型无关的链(例如,纯文本补全模型与聊天模型)。
| 参数 | 描述 |
|---|---|
prompts
|
字符串提示列表。 |
停止
|
生成时使用的停止词。模型输出在首次出现这些子字符串中的任何一个时被截断。 |
回调
|
要传递的
类型: |
tags
|
要与每个提示关联的标签列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
metadata
|
要与每个提示关联的元数据字典列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
运行名称
|
要与每个提示关联的运行名称列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
run_id
|
要与每个提示关联的运行 ID 列表。如果提供,列表的长度必须与提示列表的长度匹配。 |
**kwargs
|
任意附加的关键字参数。这些通常会传递给模型提供商的 API 调用。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 `callbacks`、`tags`、`metadata` 或 `run_name`(如果提供)的长度与提示的长度不匹配。 |
| 返回 | 描述 |
|---|---|
LLMResult
|
一个 |