客户端 (Client)
langsmith.client ¶
用于与 LangSmith API 交互的客户端。
使用客户端来自定义 API 密钥 / 工作空间连接、SSL 证书等,以便进行跟踪。
还用于创建、读取、更新和删除 LangSmith 资源,例如运行(~跟踪跨度)、数据集、示例(~记录)、反馈(~指标)、项目(跟踪器会话/组)等。
有关详细的 API 文档,请访问:https://langsmith.langchain.ac.cn/。
| 函数 | 描述 |
|---|---|
close_session |
关闭会话。 |
convert_prompt_to_openai_format |
将提示转换为 OpenAI 格式。 |
convert_prompt_to_anthropic_format |
将提示转换为 Anthropic 格式。 |
dump_model |
根据 Pydantic 版本转储模型。 |
prep_obj_for_push |
格式化对象,使其与 Prompt Hub 兼容。 |
Client ¶
用于与 LangSmith API 交互的客户端。
| 方法 | 描述 |
|---|---|
__init__ |
初始化一个 Client 实例。 |
__repr__ |
返回实例的字符串表示形式,并带有指向 URL 的链接。 |
request_with_retries |
发送带重试的请求。 |
upload_dataframe |
将数据框作为单个示例上传到 LangSmith API。 |
upload_csv |
将 CSV 文件上传到 LangSmith API。 |
create_run |
将一次运行持久化到 LangSmith API。 |
batch_ingest_runs |
在 Langsmith 系统中批量摄入/更新插入多个运行。 |
multipart_ingest |
在 Langsmith 系统中批量摄入/更新插入多个运行。 |
update_run |
在 LangSmith API 中更新一次运行。 |
flush_compressed_traces |
强制刷新当前缓冲的压缩运行。 |
flush |
根据模式刷新队列或压缩缓冲区。 |
read_run |
从 LangSmith API 读取一次运行。 |
list_runs |
从 LangSmith API 列出运行。 |
get_run_stats |
获取查询运行的聚合统计信息。 |
get_run_url |
获取一次运行的 URL。 |
share_run |
获取一次运行的共享链接。 |
unshare_run |
删除一次运行的共享链接。 |
read_run_shared_link |
检索指定运行的共享链接。 |
run_is_shared |
获取一次运行的共享状态。 |
read_shared_run |
获取共享的运行。 |
list_shared_runs |
获取共享的运行。 |
read_dataset_shared_schema |
检索数据集的共享模式。 |
share_dataset |
获取数据集的共享链接。 |
unshare_dataset |
删除数据集的共享链接。 |
read_shared_dataset |
获取共享的数据集。 |
list_shared_examples |
获取共享的示例。 |
list_shared_projects |
列出共享的项目。 |
create_project |
在 LangSmith API 上创建一个项目。 |
update_project |
更新一个 LangSmith 项目。 |
read_project |
从 LangSmith API 读取一个项目。 |
has_project |
检查项目是否存在。 |
get_test_results |
将实验中的记录级别信息读取到 Pandas DF 中。 |
list_projects |
从 LangSmith API 列出项目。 |
delete_project |
从 LangSmith 删除一个项目。 |
create_dataset |
在 LangSmith API 中创建一个数据集。 |
has_dataset |
检查您的租户中是否存在数据集。 |
read_dataset |
从 LangSmith API 读取一个数据集。 |
diff_dataset_versions |
获取数据集两个版本之间的差异。 |
read_dataset_openai_finetuning |
以 OpenAI Jsonl 格式下载数据集,并将其加载为字典列表。 |
list_datasets |
列出 LangSmith API 上的数据集。 |
delete_dataset |
从 LangSmith API 删除一个数据集。 |
update_dataset_tag |
更新数据集的标签。 |
list_dataset_versions |
列出数据集版本。 |
read_dataset_version |
通过 as_of 或确切的标签获取数据集版本。 |
clone_public_dataset |
将公共数据集克隆到您自己的 langsmith 租户。 |
create_llm_example |
向 LLM 类型的数据集添加一个示例(行)。 |
create_chat_example |
向聊天类型的数据集添加一个示例(行)。 |
create_example_from_run |
从一次运行中向数据集添加一个示例(行)。 |
update_examples_multipart |
使用 multipart 更新示例。 |
upload_examples_multipart |
使用 multipart 上传示例。 |
upsert_examples_multipart |
更新插入示例。 |
create_examples |
在数据集中创建示例。 |
create_example |
在 LangSmith API 中创建一个数据集示例。 |
read_example |
从 LangSmith API 读取一个示例。 |
list_examples |
检索指定数据集的示例行。 |
index_dataset |
启用数据集索引。示例按其输入进行索引。 |
sync_indexed_dataset |
同步数据集索引。此操作已自动每 5 分钟进行一次,但您可以调用此函数强制同步。 |
similar_examples |
检索输入与当前输入最匹配的数据集示例。 |
update_example |
更新一个特定示例。 |
update_examples |
更新多个示例。 |
delete_example |
通过 ID 删除一个示例。 |
delete_examples |
通过 ID 删除多个示例。 |
list_dataset_splits |
获取数据集的拆分。 |
update_dataset_splits |
更新数据集的拆分。 |
evaluate_run |
评估一次运行。 |
aevaluate_run |
异步评估一次运行。 |
create_feedback |
为一次运行创建反馈。 |
update_feedback |
在 LangSmith API 中更新反馈。 |
read_feedback |
从 LangSmith API 读取反馈。 |
list_feedback |
列出 LangSmith API 上的反馈对象。 |
delete_feedback |
通过 ID 删除反馈。 |
create_feedback_from_token |
通过预签名令牌或 URL 创建反馈。 |
create_presigned_feedback_token |
创建一个预签名 URL 用于发送反馈数据。 |
create_presigned_feedback_tokens |
创建一个预签名 URL 用于发送反馈数据。 |
list_presigned_feedback_tokens |
列出一次运行的反馈摄入令牌。 |
list_annotation_queues |
列出 LangSmith API 上的标注队列。 |
create_annotation_queue |
在 LangSmith API 上创建一个标注队列。 |
read_annotation_queue |
读取具有指定队列 ID 的标注队列。 |
update_annotation_queue |
更新具有指定 queue_id 的标注队列。 |
delete_annotation_queue |
删除具有指定队列 ID 的标注队列。 |
add_runs_to_annotation_queue |
将运行添加到具有指定队列 ID 的标注队列中。 |
delete_run_from_annotation_queue |
从具有指定队列 ID 和运行 ID 的标注队列中删除一次运行。 |
get_run_from_annotation_queue |
从指定索引处的标注队列中获取一次运行。 |
create_comparative_experiment |
在 LangSmith API 上创建一个比较实验。 |
arun_on_dataset |
在数据集上异步运行链或语言模型。 |
run_on_dataset |
在数据集上运行链或语言模型。 |
like_prompt |
喜欢一个提示。 |
unlike_prompt |
不喜欢一个提示。 |
list_prompts |
带分页地列出提示。 |
get_prompt |
通过其标识符获取一个特定的提示。 |
create_prompt |
创建一个新的提示。 |
create_commit |
为现有提示创建一个提交。 |
update_prompt |
更新提示的元数据。 |
delete_prompt |
删除一个提示。 |
pull_prompt_commit |
从 LangSmith API 拉取一个提示对象。 |
list_prompt_commits |
列出给定提示的提交。 |
pull_prompt |
拉取一个提示并将其作为 LangChain PromptTemplate 返回。 |
push_prompt |
将一个提示推送到 LangSmith API。 |
cleanup |
手动触发后台线程的清理。 |
evaluate |
在给定数据集上评估目标系统。 |
aevaluate |
在给定数据集上评估一个异步目标系统。 |
get_experiment_results |
获取实验结果,包括实验会话的聚合统计数据和每个数据集示例的实验运行。 |
info property ¶
info: LangSmithInfo
获取有关 LangSmith API 的信息。
| 返回 | 描述 |
|---|---|
LangSmithInfo
|
ls_schemas.LangSmithInfo:有关 LangSmith API 的信息,如果 API 不可用则为 None。 |
__init__ ¶
__init__(
api_url: str | None = None,
*,
api_key: str | None = None,
retry_config: Retry | None = None,
timeout_ms: int | tuple[int, int] | None = None,
web_url: str | None = None,
session: Session | None = None,
auto_batch_tracing: bool = True,
anonymizer: Callable[[dict], dict] | None = None,
hide_inputs: Callable[[dict], dict] | bool | None = None,
hide_outputs: Callable[[dict], dict] | bool | None = None,
hide_metadata: Callable[[dict], dict] | bool | None = None,
process_buffered_run_ops: Callable[[Sequence[dict]], Sequence[dict]] | None = None,
run_ops_buffer_size: int | None = None,
run_ops_buffer_timeout_ms: float | None = None,
info: dict | LangSmithInfo | None = None,
api_urls: dict[str, str] | None = None,
otel_tracer_provider: TracerProvider | None = None,
otel_enabled: bool | None = None,
tracing_sampling_rate: float | None = None,
workspace_id: str | None = None,
max_batch_size_bytes: int | None = None,
) -> None
初始化一个 Client 实例。
| 参数 | 描述 |
|---|---|
api_url
|
LangSmith API 的 URL。默认为 LANGCHAIN_ENDPOINT 环境变量,如果未设置则为 https://api.smith.langchain.com。
类型: |
api_key
|
LangSmith API 的 API 密钥。默认为 LANGCHAIN_API_KEY 环境变量。
类型: |
retry_config
|
HTTPAdapter 的重试配置。
类型: |
timeout_ms
|
HTTPAdapter 的超时时间。也可以是 (连接超时, 读取超时) 的 2 元组来分别设置它们。 |
web_url
|
LangSmith Web 应用的 URL。默认从 ENDPOINT 自动推断。
类型: |
session
|
用于请求的会话。如果为 None,将创建一个新会话。
类型: |
auto_batch_tracing
|
是否自动批量跟踪。
类型: |
anonymizer
|
一个应用于屏蔽序列化运行输入和输出的函数,在发送到 API 之前执行。 |
hide_inputs
|
在使用此客户端进行跟踪时是否隐藏运行输入。如果为 True,则隐藏整个输入。如果是一个函数,则在创建运行时应用于所有运行输入。 |
hide_outputs
|
在使用此客户端进行跟踪时是否隐藏运行输出。如果为 True,则隐藏整个输出。如果是一个函数,则在创建运行时应用于所有运行输出。 |
hide_metadata
|
在使用此客户端进行跟踪时是否隐藏运行元数据。如果为 True,则隐藏整个元数据。如果是一个函数,则在创建运行时应用于所有运行元数据。 |
process_buffered_run_ops
|
一个应用于缓冲运行操作的函数,允许在将原始运行字典转换为 multipart 和压缩之前对其进行修改。这对于高吞吐量跟踪特别有用,您需要在运行发送到 API 之前对其应用速率限制的 API 或其他耗费资源的过程。请注意,缓冲区只会在达到 run_ops_buffer_size 或在 run_ops_buffer_timeout_ms 过后向缓冲区添加新运行时自动刷新 - 除非您手动调用 client.flush(),否则在这些条件之外不会刷新,因此请确保在代码退出前执行此操作。
类型: |
run_ops_buffer_size
|
在应用 process_buffered_run_ops 并发送到 API 之前,缓冲区中收集的运行操作的最大数量。在提供 process_buffered_run_ops 时是必需的。
TYPE: |
run_ops_buffer_timeout_ms
|
添加新运行时,在刷新运行操作缓冲区前等待的最大时间(毫秒)。默认为 5000。仅在提供 process_buffered_run_ops 时使用。
TYPE: |
info
|
有关 LangSmith API 的信息。如果未提供,将从 API 获取。
类型: |
api_urls
|
写 API URL 及其对应 API 密钥的字典。对多租户设置很有用。数据仅从字典中的第一个 URL 读取。然而,只有运行(POST 和 PATCH)会写入字典中的所有 URL。反馈、会话、数据集、示例、标注队列和评估结果仅写入第一个。 |
otel_tracer_provider
|
用于 OpenTelemetry 集成的可选跟踪器提供程序。如果未提供,将使用 LangSmith 特定的跟踪器提供程序。
类型: |
tracing_sampling_rate
|
跟踪的采样率。如果提供,将覆盖 LANGCHAIN_TRACING_SAMPLING_RATE 环境变量。应为 0 到 1 之间的浮点数,其中 1 表示跟踪所有内容,0 表示不跟踪任何内容。
类型: |
workspace_id
|
工作空间 ID。对于组织范围的 API 密钥是必需的。
类型: |
max_batch_size_bytes
|
一批运行的最大大小(以字节为单位)。如果未提供,则由服务器设置默认值。
TYPE: |
| 引发 | 描述 |
|---|---|
LangSmithUserError
|
如果在使用托管服务时未提供 API 密钥。如果同时提供了 api_url 和 api_urls。 |
request_with_retries ¶
request_with_retries(
method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"],
pathname: str,
*,
request_kwargs: Mapping | None = None,
stop_after_attempt: int = 1,
retry_on: Sequence[type[BaseException]] | None = None,
to_ignore: Sequence[type[BaseException]] | None = None,
handle_response: Callable[[Response, int], Any] | None = None,
_context: str = "",
**kwargs: Any,
) -> Response
发送带重试的请求。
| 参数 | 描述 |
|---|---|
method
|
HTTP 请求方法。
类型: |
pathname
|
请求 URL 的路径名。将附加到 API URL。
类型: |
request_kwargs
|
附加的请求参数。
类型: |
尝试后停止
|
要尝试的次数。
类型: |
retry_on
|
要重试的异常。除了 [LangSmithConnectionError, LangSmithAPIError] 之外。
类型: |
to_ignore
|
要忽略/传递的异常。
类型: |
handle_response
|
一个处理响应并返回是否继续重试的函数。 |
_context
|
请求的上下文。
类型: |
**kwargs
|
要传递给请求的附加关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
响应
|
requests.Response:响应对象。 |
| 引发 | 描述 |
|---|---|
LangSmithAPIError
|
如果发生服务器错误。 |
LangSmithUserError
|
如果请求失败。 |
LangSmithConnectionError
|
如果发生连接错误。 |
LangSmithError
|
如果请求失败。 |
upload_dataframe ¶
upload_dataframe(
df: DataFrame,
name: str,
input_keys: Sequence[str],
output_keys: Sequence[str],
*,
description: str | None = None,
data_type: DataType | None = kv,
) -> Dataset
将数据框作为单个示例上传到 LangSmith API。
| 参数 | 描述 |
|---|---|
df
|
要上传的数据框。
类型: |
name
|
数据集的名称。
类型: |
input_keys
|
输入键。 |
output_keys
|
输出键。 |
描述
|
数据集的描述。
类型: |
data_type
|
数据集的数据类型。
类型: |
| 返回 | 描述 |
|---|---|
数据集
|
上传的数据集。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 csv_file 不是字符串或元组。 |
示例
.. code-block:: python
from langsmith import Client
import os
import pandas as pd
client = Client()
df = pd.read_parquet("path/to/your/myfile.parquet")
input_keys = ["column1", "column2"] # replace with your input column names
output_keys = ["output1", "output2"] # replace with your output column names
dataset = client.upload_dataframe(
df=df,
input_keys=input_keys,
output_keys=output_keys,
name="My Parquet Dataset",
description="Dataset created from a parquet file",
data_type="kv", # The default
)
upload_csv ¶
upload_csv(
csv_file: str | tuple[str, BytesIO],
input_keys: Sequence[str],
output_keys: Sequence[str],
*,
name: str | None = None,
description: str | None = None,
data_type: DataType | None = kv,
) -> Dataset
将 CSV 文件上传到 LangSmith API。
| 参数 | 描述 |
|---|---|
csv_file
|
要上传的 CSV 文件。如果是一个字符串,它应该是路径。如果是一个元组,它应该包含文件名和一个 BytesIO 对象。 |
input_keys
|
输入键。 |
output_keys
|
输出键。 |
name
|
数据集的名称。
类型: |
描述
|
数据集的描述。
类型: |
data_type
|
数据集的数据类型。
类型: |
| 返回 | 描述 |
|---|---|
数据集
|
上传的数据集。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 csv_file 不是字符串或元组。 |
示例
.. code-block:: python
from langsmith import Client
import os
client = Client()
csv_file = "path/to/your/myfile.csv"
input_keys = ["column1", "column2"] # replace with your input column names
output_keys = ["output1", "output2"] # replace with your output column names
dataset = client.upload_csv(
csv_file=csv_file,
input_keys=input_keys,
output_keys=output_keys,
name="My CSV Dataset",
description="Dataset created from a CSV file",
data_type="kv", # The default
)
create_run ¶
create_run(
name: str,
inputs: dict[str, Any],
run_type: RUN_TYPE_T,
*,
project_name: str | None = None,
revision_id: str | None = None,
dangerously_allow_filesystem: bool = False,
api_key: str | None = None,
api_url: str | None = None,
**kwargs: Any,
) -> None
将一次运行持久化到 LangSmith API。
| 参数 | 描述 |
|---|---|
name
|
运行的名称。
类型: |
inputs
|
运行的输入值。 |
run_type
|
运行的类型,例如 tool、chain、llm、retriever、embedding、prompt 或 parser。
类型: |
project_name
|
运行的项目名称。
类型: |
revision_id
|
运行的修订 ID。
类型: |
api_key
|
用于此特定运行的 API 密钥。
类型: |
api_url
|
用于此特定运行的 API URL。
类型: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
None
|
None |
| 引发 | 描述 |
|---|---|
LangSmithUserError
|
如果在使用托管服务时未提供 API 密钥。 |
示例
.. code-block:: python
from langsmith import Client
import datetime
from uuid import uuid4
client = Client()
run_id = uuid4()
client.create_run(
id=run_id,
project_name=project_name,
name="test_run",
run_type="llm",
inputs={"prompt": "hello world"},
outputs={"generation": "hi there"},
start_time=datetime.datetime.now(datetime.timezone.utc),
end_time=datetime.datetime.now(datetime.timezone.utc),
hide_inputs=True,
hide_outputs=True,
)
batch_ingest_runs ¶
batch_ingest_runs(
create: Sequence[Run | RunLikeDict | dict] | None = None,
update: Sequence[Run | RunLikeDict | dict] | None = None,
*,
pre_sampled: bool = False,
) -> None
在 Langsmith 系统中批量摄入/更新插入多个运行。
| 参数 | 描述 |
|---|---|
create
|
表示要创建/发布的运行的
类型: |
更新
|
表示已创建且应更新/修补的运行的
类型: |
pre_sampled
|
运行是否已经经过采样,因此不应再次采样。默认为 False。
类型: |
| 引发 | 描述 |
|---|---|
LangsmithAPIError
|
如果 API 请求中存在错误。 |
| 返回 | 描述 |
|---|---|
None
|
None |
注意
- 运行对象必须包含 dotted_order 和 trace_id 字段才能被 API 接受。
示例
.. code-block:: python
from langsmith import Client
import datetime
from uuid import uuid4
client = Client()
_session = "__test_batch_ingest_runs"
trace_id = uuid4()
trace_id_2 = uuid4()
run_id_2 = uuid4()
current_time = datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y%m%dT%H%M%S%fZ"
)
later_time = (
datetime.datetime.now(datetime.timezone.utc) + timedelta(seconds=1)
).strftime("%Y%m%dT%H%M%S%fZ")
runs_to_create = [
{
"id": str(trace_id),
"session_name": _session,
"name": "run 1",
"run_type": "chain",
"dotted_order": f"{current_time}{str(trace_id)}",
"trace_id": str(trace_id),
"inputs": {"input1": 1, "input2": 2},
"outputs": {"output1": 3, "output2": 4},
},
{
"id": str(trace_id_2),
"session_name": _session,
"name": "run 3",
"run_type": "chain",
"dotted_order": f"{current_time}{str(trace_id_2)}",
"trace_id": str(trace_id_2),
"inputs": {"input1": 1, "input2": 2},
"error": "error",
},
{
"id": str(run_id_2),
"session_name": _session,
"name": "run 2",
"run_type": "chain",
"dotted_order": f"{current_time}{str(trace_id)}."
f"{later_time}{str(run_id_2)}",
"trace_id": str(trace_id),
"parent_run_id": str(trace_id),
"inputs": {"input1": 5, "input2": 6},
},
]
runs_to_update = [
{
"id": str(run_id_2),
"dotted_order": f"{current_time}{str(trace_id)}."
f"{later_time}{str(run_id_2)}",
"trace_id": str(trace_id),
"parent_run_id": str(trace_id),
"outputs": {"output1": 4, "output2": 5},
},
]
client.batch_ingest_runs(create=runs_to_create, update=runs_to_update)
multipart_ingest ¶
multipart_ingest(
create: Sequence[Run | RunLikeDict | dict] | None = None,
update: Sequence[Run | RunLikeDict | dict] | None = None,
*,
pre_sampled: bool = False,
dangerously_allow_filesystem: bool = False,
) -> None
在 Langsmith 系统中批量摄入/更新插入多个运行。
| 参数 | 描述 |
|---|---|
create
|
表示要创建/发布的运行的 |
更新
|
表示已创建且应更新/修补的运行的 |
pre_sampled
|
运行是否已经经过采样,因此不应再次采样。默认为 False。
类型: |
| 引发 | 描述 |
|---|---|
LangsmithAPIError
|
如果 API 请求中存在错误。 |
| 返回 | 描述 |
|---|---|
None
|
None |
注意
- 运行对象必须包含 dotted_order 和 trace_id 字段才能被 API 接受。
示例
.. code-block:: python
from langsmith import Client
import datetime
from uuid import uuid4
client = Client()
_session = "__test_batch_ingest_runs"
trace_id = uuid4()
trace_id_2 = uuid4()
run_id_2 = uuid4()
current_time = datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y%m%dT%H%M%S%fZ"
)
later_time = (
datetime.datetime.now(datetime.timezone.utc) + timedelta(seconds=1)
).strftime("%Y%m%dT%H%M%S%fZ")
runs_to_create = [
{
"id": str(trace_id),
"session_name": _session,
"name": "run 1",
"run_type": "chain",
"dotted_order": f"{current_time}{str(trace_id)}",
"trace_id": str(trace_id),
"inputs": {"input1": 1, "input2": 2},
"outputs": {"output1": 3, "output2": 4},
},
{
"id": str(trace_id_2),
"session_name": _session,
"name": "run 3",
"run_type": "chain",
"dotted_order": f"{current_time}{str(trace_id_2)}",
"trace_id": str(trace_id_2),
"inputs": {"input1": 1, "input2": 2},
"error": "error",
},
{
"id": str(run_id_2),
"session_name": _session,
"name": "run 2",
"run_type": "chain",
"dotted_order": f"{current_time}{str(trace_id)}."
f"{later_time}{str(run_id_2)}",
"trace_id": str(trace_id),
"parent_run_id": str(trace_id),
"inputs": {"input1": 5, "input2": 6},
},
]
runs_to_update = [
{
"id": str(run_id_2),
"dotted_order": f"{current_time}{str(trace_id)}."
f"{later_time}{str(run_id_2)}",
"trace_id": str(trace_id),
"parent_run_id": str(trace_id),
"outputs": {"output1": 4, "output2": 5},
},
]
client.multipart_ingest(create=runs_to_create, update=runs_to_update)
update_run ¶
update_run(
run_id: ID_TYPE,
*,
name: str | None = None,
end_time: datetime | None = None,
error: str | None = None,
inputs: dict | None = None,
outputs: dict | None = None,
events: Sequence[dict] | None = None,
extra: dict | None = None,
tags: list[str] | None = None,
attachments: Attachments | None = None,
dangerously_allow_filesystem: bool = False,
reference_example_id: str | UUID | None = None,
api_key: str | None = None,
api_url: str | None = None,
**kwargs: Any,
) -> None
在 LangSmith API 中更新一次运行。
| 参数 | 描述 |
|---|---|
run_id
|
要更新的运行的 ID。
类型: |
name
|
运行的名称。
类型: |
end_time
|
运行的结束时间。
类型: |
error
|
运行的错误消息。
类型: |
inputs
|
运行的输入值。
类型: |
outputs
|
运行的输出值。
类型: |
events
|
运行的事件。 |
extra
|
运行的额外信息。
类型: |
tags
|
运行的标签。
类型: |
attachments
|
要添加到运行中的附件字典。键是附件名称,值是包含数据和 mime 类型的 Attachment 对象。
类型: |
reference_example_id
|
作为运行输入来源的示例的 ID。用于属于实验的运行。 |
api_key
|
用于此特定运行的 API 密钥。
类型: |
api_url
|
用于此特定运行的 API URL。
类型: |
**kwargs
|
Kwargs 被忽略。
类型: |
| 返回 | 描述 |
|---|---|
None
|
None |
示例
.. code-block:: python
from langsmith import Client
import datetime
from uuid import uuid4
client = Client()
project_name = "__test_update_run"
start_time = datetime.datetime.now()
revision_id = uuid4()
run: dict = dict(
id=uuid4(),
name="test_run",
run_type="llm",
inputs={"text": "hello world"},
project_name=project_name,
api_url=os.getenv("LANGCHAIN_ENDPOINT"),
start_time=start_time,
extra={"extra": "extra"},
revision_id=revision_id,
)
# Create the run
client.create_run(**run)
run["outputs"] = {"output": ["Hi"]}
run["extra"]["foo"] = "bar"
run["name"] = "test_run_updated"
# Update the run
client.update_run(run["id"], **run)
read_run ¶
从 LangSmith API 读取一次运行。
| 参数 | 描述 |
|---|---|
run_id
|
要读取的运行的 ID。
类型: |
load_child_runs
|
是否加载嵌套的子运行。
类型: |
| 返回 | 描述 |
|---|---|
Run
|
从 LangSmith API 读取的运行。
类型: |
示例
.. code-block:: python
from langsmith import Client
# Existing run
run_id = "your-run-id"
client = Client()
stored_run = client.read_run(run_id)
list_runs ¶
list_runs(
*,
project_id: ID_TYPE | Sequence[ID_TYPE] | None = None,
project_name: str | Sequence[str] | None = None,
run_type: str | None = None,
trace_id: ID_TYPE | None = None,
reference_example_id: ID_TYPE | None = None,
query: str | None = None,
filter: str | None = None,
trace_filter: str | None = None,
tree_filter: str | None = None,
is_root: bool | None = None,
parent_run_id: ID_TYPE | None = None,
start_time: datetime | None = None,
error: bool | None = None,
run_ids: Sequence[ID_TYPE] | None = None,
select: Sequence[str] | None = None,
limit: int | None = None,
**kwargs: Any,
) -> Iterator[Run]
从 LangSmith API 列出运行。
| 参数 | 描述 |
|---|---|
project_id
|
用于筛选的项目 ID。 |
project_name
|
用于筛选的项目名称。 |
run_type
|
用于筛选的运行类型。
类型: |
trace_id
|
用于筛选的跟踪 ID。
类型: |
reference_example_id
|
用于筛选的参考示例 ID。
类型: |
query
|
用于筛选的查询字符串。
类型: |
filter
|
用于筛选的筛选器字符串。
类型: |
trace_filter
|
应用于跟踪树中根运行的筛选器。这旨在与常规的
类型: |
tree_filter
|
应用于跟踪树中其他运行(包括兄弟和子运行)的筛选器。这旨在与常规的
类型: |
is_root
|
是否按根运行进行筛选。
类型: |
parent_run_id
|
用于筛选的父运行 ID。
类型: |
start_time
|
用于筛选的开始时间。
类型: |
error
|
是否按错误状态进行筛选。
类型: |
run_ids
|
用于筛选的运行 ID。 |
select
|
要选择的字段。 |
limit
|
要返回的最大运行数。
TYPE: |
**kwargs
|
附加的关键字参数。
类型: |
| YIELDS | 描述 |
|---|---|
Run
|
运行。 |
示例
.. code-block:: python
# List all runs in a project
project_runs = client.list_runs(project_name="<your_project>")
# List LLM and Chat runs in the last 24 hours
todays_llm_runs = client.list_runs(
project_name="<your_project>",
start_time=datetime.now() - timedelta(days=1),
run_type="llm",
)
# List root traces in a project
root_runs = client.list_runs(project_name="<your_project>", is_root=1)
# List runs without errors
correct_runs = client.list_runs(project_name="<your_project>", error=False)
# List runs and only return their inputs/outputs (to speed up the query)
input_output_runs = client.list_runs(
project_name="<your_project>", select=["inputs", "outputs"]
)
# List runs by run ID
run_ids = [
"a36092d2-4ad5-4fb4-9c0d-0dba9a2ed836",
"9398e6be-964f-4aa4-8ae9-ad78cd4b7074",
]
selected_runs = client.list_runs(id=run_ids)
# List all "chain" type runs that took more than 10 seconds and had
# `total_tokens` greater than 5000
chain_runs = client.list_runs(
project_name="<your_project>",
filter='and(eq(run_type, "chain"), gt(latency, 10), gt(total_tokens, 5000))',
)
# List all runs called "extractor" whose root of the trace was assigned feedback "user_score" score of 1
good_extractor_runs = client.list_runs(
project_name="<your_project>",
filter='eq(name, "extractor")',
trace_filter='and(eq(feedback_key, "user_score"), eq(feedback_score, 1))',
)
# List all runs that started after a specific timestamp and either have "error" not equal to null or a "Correctness" feedback score equal to 0
complex_runs = client.list_runs(
project_name="<your_project>",
filter='and(gt(start_time, "2023-07-15T12:34:56Z"), or(neq(error, null), and(eq(feedback_key, "Correctness"), eq(feedback_score, 0.0))))',
)
# List all runs where `tags` include "experimental" or "beta" and `latency` is greater than 2 seconds
tagged_runs = client.list_runs(
project_name="<your_project>",
filter='and(or(has(tags, "experimental"), has(tags, "beta")), gt(latency, 2))',
)
get_run_stats ¶
get_run_stats(
*,
id: list[ID_TYPE] | None = None,
trace: ID_TYPE | None = None,
parent_run: ID_TYPE | None = None,
run_type: str | None = None,
project_names: list[str] | None = None,
project_ids: list[ID_TYPE] | None = None,
reference_example_ids: list[ID_TYPE] | None = None,
start_time: str | None = None,
end_time: str | None = None,
error: bool | None = None,
query: str | None = None,
filter: str | None = None,
trace_filter: str | None = None,
tree_filter: str | None = None,
is_root: bool | None = None,
data_source_type: str | None = None,
) -> dict[str, Any]
获取查询运行的聚合统计信息。
接受与 list_runs 类似的查询参数,并返回与查询匹配的运行的统计信息。
| 参数 | 描述 |
|---|---|
id
|
用于筛选的运行 ID 列表。
类型: |
trace
|
用于筛选的跟踪 ID。
类型: |
parent_run
|
用于筛选的父运行 ID。
类型: |
run_type
|
用于筛选的运行类型。
类型: |
project_names
|
用于筛选的项目名称列表。
类型: |
project_ids
|
用于筛选的项目 ID 列表。
类型: |
reference_example_ids
|
用于筛选的参考示例 ID 列表。
类型: |
start_time
|
用于筛选的开始时间。
类型: |
end_time
|
用于筛选的结束时间。
类型: |
error
|
按错误状态筛选。
类型: |
query
|
用于筛选的查询字符串。
类型: |
filter
|
要应用的筛选器字符串。
类型: |
trace_filter
|
要应用的跟踪筛选器字符串。
类型: |
tree_filter
|
要应用的树筛选器字符串。
类型: |
is_root
|
按根运行状态筛选。
类型: |
data_source_type
|
用于筛选的数据源类型。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
Dict[str, Any]:包含运行统计信息的字典。 |
get_run_url ¶
unshare_run ¶
read_shared_run ¶
list_shared_runs ¶
read_dataset_shared_schema ¶
read_dataset_shared_schema(
dataset_id: ID_TYPE | None = None, *, dataset_name: str | None = None
) -> DatasetShareSchema
检索数据集的共享模式。
| 参数 | 描述 |
|---|---|
dataset_id
|
数据集的 ID。必须提供
类型: |
dataset_name
|
数据集的名称。必须提供
类型: |
| 返回 | 描述 |
|---|---|
DatasetShareSchema
|
ls_schemas.DatasetShareSchema:数据集的共享模式。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果既未提供 |
share_dataset ¶
share_dataset(
dataset_id: ID_TYPE | None = None, *, dataset_name: str | None = None
) -> DatasetShareSchema
获取数据集的共享链接。
| 参数 | 描述 |
|---|---|
dataset_id
|
数据集的 ID。必须提供
类型: |
dataset_name
|
数据集的名称。必须提供
类型: |
| 返回 | 描述 |
|---|---|
DatasetShareSchema
|
ls_schemas.DatasetShareSchema:数据集的共享模式。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果既未提供 |
unshare_dataset ¶
read_shared_dataset ¶
list_shared_examples ¶
list_shared_projects ¶
list_shared_projects(
*,
dataset_share_token: str,
project_ids: list[ID_TYPE] | None = None,
name: str | None = None,
name_contains: str | None = None,
limit: int | None = None,
) -> Iterator[TracerSessionResult]
列出共享的项目。
| 参数 | 描述 |
|---|---|
dataset_share_token
|
数据集的分享令牌。
类型: |
project_ids
|
用于筛选结果的项目 ID 列表,默认为 None。
类型: |
name
|
用于筛选结果的项目名称,默认为 None。
类型: |
name_contains
|
用于在项目名称中搜索的子字符串,默认为 None。
类型: |
limit
|
要返回的最大项目数,默认为 None。
TYPE: |
| YIELDS | 描述 |
|---|---|
TracerSessionResult
|
共享的项目。 |
create_project ¶
create_project(
project_name: str,
*,
description: str | None = None,
metadata: dict | None = None,
upsert: bool = False,
project_extra: dict | None = None,
reference_dataset_id: ID_TYPE | None = None,
) -> TracerSession
在 LangSmith API 上创建一个项目。
| 参数 | 描述 |
|---|---|
project_name
|
项目的名称。
类型: |
project_extra
|
附加的项目信息。
类型: |
metadata
|
与项目关联的附加元数据。
类型: |
描述
|
项目的描述。
类型: |
upsert
|
如果项目已存在,是否更新项目。
类型: |
reference_dataset_id
|
要与项目关联的参考数据集的 ID。
类型: |
| 返回 | 描述 |
|---|---|
TracerSession
|
创建的项目。
类型: |
update_project ¶
update_project(
project_id: ID_TYPE,
*,
name: str | None = None,
description: str | None = None,
metadata: dict | None = None,
project_extra: dict | None = None,
end_time: datetime | None = None,
) -> TracerSession
更新一个 LangSmith 项目。
| 参数 | 描述 |
|---|---|
project_id
|
要更新的项目的 ID。
类型: |
name
|
要给项目的新名称。这仅在项目已分配 end_time(即已完成/关闭)时有效。
类型: |
描述
|
要给项目的新描述。
类型: |
metadata
|
与项目关联的附加元数据。
类型: |
project_extra
|
附加的项目信息。
类型: |
end_time
|
项目完成的时间。
类型: |
| 返回 | 描述 |
|---|---|
TracerSession
|
更新后的项目。
类型: |
read_project ¶
read_project(
*,
project_id: str | None = None,
project_name: str | None = None,
include_stats: bool = False,
) -> TracerSessionResult
has_project ¶
get_test_results ¶
get_test_results(
*, project_id: ID_TYPE | None = None, project_name: str | None = None
) -> DataFrame
list_projects ¶
list_projects(
project_ids: list[ID_TYPE] | None = None,
name: str | None = None,
name_contains: str | None = None,
reference_dataset_id: ID_TYPE | None = None,
reference_dataset_name: str | None = None,
reference_free: bool | None = None,
include_stats: bool | None = None,
dataset_version: str | None = None,
limit: int | None = None,
metadata: dict[str, Any] | None = None,
) -> Iterator[TracerSessionResult]
从 LangSmith API 列出项目。
| 参数 | 描述 |
|---|---|
project_ids
|
用于筛选的项目 ID 列表,默认为 None
类型: |
name
|
用于筛选的项目名称,默认为 None
类型: |
name_contains
|
用于在项目名称中搜索的字符串,默认为 None
类型: |
reference_dataset_id
|
用于筛选的数据集 ID,默认为 None
类型: |
reference_dataset_name
|
用于筛选的参考数据集名称,默认为 None
类型: |
reference_free
|
是否仅筛选未与数据集关联的项目。
类型: |
limit
|
要返回的最大项目数,默认为 None
TYPE: |
metadata
|
用于筛选的元数据。 |
| YIELDS | 描述 |
|---|---|
TracerSessionResult
|
项目列表。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果同时提供了 reference_dataset_id 和 reference_dataset_name。 |
delete_project ¶
从 LangSmith 删除一个项目。
| 参数 | 描述 |
|---|---|
project_name
|
要删除的项目名称。
类型: |
project_id
|
要删除的项目 ID。
类型: |
| 返回 | 描述 |
|---|---|
None
|
None |
| 引发 | 描述 |
|---|---|
ValueError
|
如果既未提供 project_name 也未提供 project_id。 |
create_dataset ¶
create_dataset(
dataset_name: str,
*,
description: str | None = None,
data_type: DataType = kv,
inputs_schema: dict[str, Any] | None = None,
outputs_schema: dict[str, Any] | None = None,
transformations: list[DatasetTransformation] | None = None,
metadata: dict | None = None,
) -> Dataset
在 LangSmith API 中创建一个数据集。
| 参数 | 描述 |
|---|---|
dataset_name
|
数据集的名称。
类型: |
描述
|
数据集的描述。
类型: |
data_type
|
数据集的数据类型。
类型: |
inputs_schema
|
数据集输入的模式定义。 |
outputs_schema
|
数据集输出的模式定义。 |
transformations
|
应用于数据集的转换列表。
类型: |
metadata
|
与数据集关联的附加元数据。
类型: |
| 返回 | 描述 |
|---|---|
数据集
|
创建的数据集。
类型: |
| 引发 | 描述 |
|---|---|
HTTPError
|
如果创建数据集的请求失败。 |
has_dataset ¶
read_dataset ¶
diff_dataset_versions ¶
diff_dataset_versions(
dataset_id: ID_TYPE | None = None,
*,
dataset_name: str | None = None,
from_version: str | datetime,
to_version: str | datetime,
) -> DatasetDiffInfo
获取数据集两个版本之间的差异。
| 参数 | 描述 |
|---|---|
dataset_id
|
数据集的 ID。
类型: |
dataset_name
|
数据集的名称。
类型: |
from_version
|
差异比较的起始版本。 |
to_version
|
差异比较的结束版本。 |
| 返回 | 描述 |
|---|---|
DatasetDiffInfo
|
数据集两个版本之间的差异。
类型: |
示例
.. code-block:: python
# Get the difference between two tagged versions of a dataset
from_version = "prod"
to_version = "dev"
diff = client.diff_dataset_versions(
dataset_name="my-dataset",
from_version=from_version,
to_version=to_version,
)
# Get the difference between two timestamped versions of a dataset
from_version = datetime.datetime(2024, 1, 1)
to_version = datetime.datetime(2024, 2, 1)
diff = client.diff_dataset_versions(
dataset_name="my-dataset",
from_version=from_version,
to_version=to_version,
)
read_dataset_openai_finetuning ¶
read_dataset_openai_finetuning(
dataset_id: ID_TYPE | None = None, *, dataset_name: str | None = None
) -> list
以 OpenAI Jsonl 格式下载数据集,并将其加载为字典列表。
| 参数 | 描述 |
|---|---|
dataset_id
|
要下载的数据集的 ID。
类型: |
dataset_name
|
要下载的数据集的名称。
类型: |
| 返回 | 描述 |
|---|---|
list
|
list[dict]:加载为字典列表的数据集。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果既未提供 dataset_id 也未提供 dataset_name。 |
list_datasets ¶
list_datasets(
*,
dataset_ids: list[ID_TYPE] | None = None,
data_type: str | None = None,
dataset_name: str | None = None,
dataset_name_contains: str | None = None,
metadata: dict[str, Any] | None = None,
limit: int | None = None,
) -> Iterator[Dataset]
列出 LangSmith API 上的数据集。
| 参数 | 描述 |
|---|---|
dataset_ids
|
用于筛选结果的数据集 ID 列表。
类型: |
data_type
|
用于筛选结果的数据集的数据类型。
类型: |
dataset_name
|
用于筛选结果的数据集名称。
类型: |
dataset_name_contains
|
用于在数据集名称中搜索的子字符串。
类型: |
metadata
|
用于筛选结果的元数据字典。 |
limit
|
要返回的最大数据集数。
TYPE: |
| YIELDS | 描述 |
|---|---|
数据集
|
数据集列表。 |
delete_dataset ¶
delete_dataset(
*, dataset_id: ID_TYPE | None = None, dataset_name: str | None = None
) -> None
update_dataset_tag ¶
update_dataset_tag(
*,
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
as_of: datetime,
tag: str,
) -> None
更新数据集的标签。
如果标签已分配给此数据集的其他版本,则标签将移动到新版本。as_of 参数用于确定将新标签应用于哪个版本的数据集。它必须是数据集的精确版本才能成功。您可以使用 read_dataset_version 方法查找要应用标签的精确版本。
| 参数 | 描述 |
|---|---|
dataset_id
|
要更新的数据集的 ID。
类型: |
dataset_name
|
要更新的数据集的名称。
类型: |
as_of
|
应用新标签的数据集的时间戳。
类型: |
tag
|
要应用于数据集的新标签。
类型: |
| 返回 | 描述 |
|---|---|
None
|
None |
示例
.. code-block:: python
dataset_name = "my-dataset"
# Get the version of a dataset <= a given timestamp
dataset_version = client.read_dataset_version(
dataset_name=dataset_name, as_of=datetime.datetime(2024, 1, 1)
)
# Assign that version a new tag
client.update_dataset_tags(
dataset_name="my-dataset",
as_of=dataset_version.as_of,
tag="prod",
)
list_dataset_versions ¶
list_dataset_versions(
*,
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
search: str | None = None,
limit: int | None = None,
) -> Iterator[DatasetVersion]
列出数据集版本。
| 参数 | 描述 |
|---|---|
dataset_id
|
数据集的 ID。
类型: |
dataset_name
|
数据集的名称。
类型: |
search
|
搜索查询。
类型: |
limit
|
要返回的最大版本数。
TYPE: |
| YIELDS | 描述 |
|---|---|
DatasetVersion
|
数据集版本。 |
read_dataset_version ¶
read_dataset_version(
*,
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
as_of: datetime | None = None,
tag: str | None = None,
) -> DatasetVersion
通过 as_of 或确切的标签获取数据集版本。
使用此方法解析给定时间戳或给定标签的最近版本。
| 参数 | 描述 |
|---|---|
dataset_id
|
数据集的 ID。
类型: |
dataset_name
|
数据集的名称。
类型: |
as_of
|
要检索的数据集的时间戳。
类型: |
tag
|
要检索的数据集的标签。
类型: |
| 返回 | 描述 |
|---|---|
DatasetVersion
|
数据集版本。
类型: |
示例
.. code-block:: python
# Get the latest version of a dataset
client.read_dataset_version(dataset_name="my-dataset", tag="latest")
# Get the version of a dataset <= a given timestamp
client.read_dataset_version(
dataset_name="my-dataset",
as_of=datetime.datetime(2024, 1, 1),
)
# Get the version of a dataset with a specific tag
client.read_dataset_version(dataset_name="my-dataset", tag="prod")
clone_public_dataset ¶
create_llm_example ¶
create_chat_example ¶
create_chat_example(
messages: list[Mapping[str, Any] | BaseMessageLike],
generations: Mapping[str, Any] | BaseMessageLike | None = None,
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
created_at: datetime | None = None,
) -> Example
create_example_from_run ¶
update_examples_multipart ¶
update_examples_multipart(
*,
dataset_id: ID_TYPE,
updates: list[ExampleUpdate] | None = None,
dangerously_allow_filesystem: bool = False,
) -> UpsertExamplesResponse
使用 multipart 更新示例。
.. deprecated:: 0.3.9
Use Client.update_examples instead. Will be removed in 0.4.0.
upload_examples_multipart ¶
upload_examples_multipart(
*,
dataset_id: ID_TYPE,
uploads: list[ExampleCreate] | None = None,
dangerously_allow_filesystem: bool = False,
) -> UpsertExamplesResponse
使用 multipart 上传示例。
.. deprecated:: 0.3.9
Use Client.create_examples instead. Will be removed in 0.4.0.
upsert_examples_multipart ¶
upsert_examples_multipart(
*,
upserts: list[ExampleUpsertWithAttachments] | None = None,
dangerously_allow_filesystem: bool = False,
) -> UpsertExamplesResponse
更新插入示例。
.. deprecated:: 0.3.9
Use Client.create_examples and Client.update_examples instead. Will be
removed in 0.4.0.
create_examples ¶
create_examples(
*,
dataset_name: str | None = None,
dataset_id: ID_TYPE | None = None,
examples: Sequence[ExampleCreate | dict] | None = None,
dangerously_allow_filesystem: bool = False,
max_concurrency: Annotated[int, Field(ge=1, le=3)] = 1,
**kwargs: Any,
) -> UpsertExamplesResponse | dict[str, Any]
在数据集中创建示例。
| 参数 | 描述 |
|---|---|
dataset_name
|
要在其中创建示例的数据集的名称。必须且只能指定 dataset_name 或 dataset_id 中的一个。
类型: |
dataset_id
|
要在其中创建示例的数据集的 ID。必须且只能指定 dataset_name 或 dataset_id 中的一个。
类型: |
examples
|
要创建的示例。 |
dangerously_allow_filesystem
|
是否允许从文件系统上传文件。
类型: |
**kwargs
|
旧版关键字参数。如果指定了 'examples',则不应指定此参数。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果同时提供了 'examples' 和旧版参数。 |
| 返回 | 描述 |
|---|---|
UpsertExamplesResponse | dict[str, Any]
|
LangSmith 的 JSON 响应。包括 'count' 和 'example_ids'。 |
.. versionchanged:: 0.3.11
Updated to take argument 'examples', a single list where each
element is the full example to create. This should be used instead of the
legacy 'inputs', 'outputs', etc. arguments which split each examples
attributes across arguments.
Updated to support creating examples with attachments.
示例
.. code-block:: python
from langsmith import Client
client = Client()
dataset = client.create_dataset("agent-qa")
examples = [
{
"inputs": {"question": "what's an agent"},
"outputs": {"answer": "an agent is..."},
"metadata": {"difficulty": "easy"},
},
{
"inputs": {
"question": "can you explain the agent architecture in this diagram?"
},
"outputs": {"answer": "this diagram shows..."},
"attachments": {"diagram": {"mime_type": "image/png", "data": b"..."}},
"metadata": {"difficulty": "medium"},
},
# more examples...
]
response = client.create_examples(dataset_name="agent-qa", examples=examples)
# -> {"example_ids": [...
create_example ¶
create_example(
inputs: Mapping[str, Any] | None = None,
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
created_at: datetime | None = None,
outputs: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
split: str | list[str] | None = None,
example_id: ID_TYPE | None = None,
source_run_id: ID_TYPE | None = None,
use_source_run_io: bool = False,
use_source_run_attachments: list[str] | None = None,
attachments: Attachments | None = None,
) -> Example
在 LangSmith API 中创建一个数据集示例。
示例是数据集中的行,包含模型或链的输入和预期输出(或其他参考信息)。
| 参数 | 描述 |
|---|---|
inputs
|
示例的输入值。 |
dataset_id
|
要在其中创建示例的数据集的 ID。
类型: |
dataset_name
|
要在其中创建示例的数据集的名称。
类型: |
created_at
|
示例的创建时间戳。
类型: |
outputs
|
示例的输出值。 |
metadata
|
示例的元数据。 |
split
|
示例的分割(splits),即数据集的划分,如 'train'、'test' 或 'validation'。 |
example_id
|
要创建的示例的 ID。如果未提供,将创建一个新的示例。
类型: |
source_run_id
|
与此示例关联的源运行(source run)的 ID。
类型: |
use_source_run_io
|
是否使用源运行的输入、输出和附件。
类型: |
use_source_run_attachments
|
从源运行中使用哪些附件。如果 use_source_run_io 为 True,则无论此参数如何,都将使用所有附件。
类型: |
attachments
|
示例的附件。
类型: |
| 返回 | 描述 |
|---|---|
示例
|
已创建的示例。
类型: |
read_example ¶
list_examples ¶
list_examples(
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
example_ids: Sequence[ID_TYPE] | None = None,
as_of: datetime | str | None = None,
splits: Sequence[str] | None = None,
inline_s3_urls: bool = True,
*,
offset: int = 0,
limit: int | None = None,
metadata: dict | None = None,
filter: str | None = None,
include_attachments: bool = False,
**kwargs: Any,
) -> Iterator[Example]
检索指定数据集的示例行。
| 参数 | 描述 |
|---|---|
dataset_id
|
用于筛选的数据集的 ID。默认为 None。
类型: |
dataset_name
|
用于筛选的数据集的名称。默认为 None。
类型: |
example_ids
|
用于筛选的示例 ID。默认为 None。
类型: |
as_of
|
用于检索示例的数据集版本标签或时间戳。响应中的示例将仅限于在标记(或带时间戳)的版本时存在的那些。 |
splits
|
数据集分割(splits)的列表,即数据集的划分,如 'train'、'test' 或 'validation'。仅返回指定分割中的示例。 |
inline_s3_urls
|
是否内联 S3 URL。默认为 True。
类型: |
offset
|
起始偏移量。默认为 0。
类型: |
limit
|
要返回的最大示例数。
TYPE: |
metadata
|
用于筛选的元数据字典。
类型: |
filter
|
要应用于示例的结构化筛选字符串。
类型: |
include_attachments
|
是否在响应中包含附件。默认为 False。
类型: |
**kwargs
|
其他关键字参数将被忽略。
类型: |
| YIELDS | 描述 |
|---|---|
示例
|
示例列表。 |
示例
列出数据集的所有示例
.. code-block:: python
from langsmith import Client
client = Client()
# By Dataset ID
examples = client.list_examples(
dataset_id="c9ace0d8-a82c-4b6c-13d2-83401d68e9ab"
)
# By Dataset Name
examples = client.list_examples(dataset_name="My Test Dataset")
按 ID 列出示例
.. code-block:: python
example_ids = [
"734fc6a0-c187-4266-9721-90b7a025751a",
"d6b4c1b9-6160-4d63-9b61-b034c585074f",
"4d31df4e-f9c3-4a6e-8b6c-65701c2fed13",
]
examples = client.list_examples(example_ids=example_ids)
按元数据列出示例
.. code-block:: python
examples = client.list_examples(
dataset_name=dataset_name, metadata={"foo": "bar"}
)
按结构化筛选器列出示例
.. code-block:: python
examples = client.list_examples(
dataset_name=dataset_name,
filter='and(not(has(metadata, \'{"foo": "bar"}\')), exists(metadata, "tenant_id"))',
)
index_dataset ¶
sync_indexed_dataset ¶
sync_indexed_dataset(*, dataset_id: ID_TYPE, **kwargs: Any) -> None
同步数据集索引。此操作已自动每 5 分钟进行一次,但您可以调用此函数强制同步。
| 参数 | 描述 |
|---|---|
dataset_id
|
要同步的数据集的 ID。
类型: |
| 返回 | 描述 |
|---|---|
None
|
None |
similar_examples ¶
similar_examples(
inputs: dict,
/,
*,
limit: int,
dataset_id: ID_TYPE,
filter: str | None = None,
**kwargs: Any,
) -> list[ExampleSearch]
检索输入与当前输入最匹配的数据集示例。
注意:必须为数据集启用了 few-shot 索引。请参阅 `client.index_dataset()`。
| 参数 | 描述 |
|---|---|
inputs
|
用作搜索查询的输入。必须与数据集输入模式匹配。必须是 JSON 可序列化的。
类型: |
limit
|
要返回的最大示例数。
类型: |
dataset_id
|
要搜索的数据集的 ID。
类型: |
filter
|
应用于搜索结果的筛选字符串。使用与 `list_runs()` 中的 `filter` 参数相同的语法。仅支持部分操作。默认为 None。 例如,您可以使用 `and(eq(metadata.some_tag, 'some_value'), neq(metadata.env, 'dev'))` 来筛选出 some_tag 具有 some_value 且环境不是 dev 的示例。
类型: |
**kwargs
|
作为请求体一部分传递的其他关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
list[ExampleSearch]
|
list[ExampleSearch]:ExampleSearch 对象列表。 |
示例
.. code-block:: python
from langsmith import Client
client = Client()
client.similar_examples(
{"question": "When would i use the runnable generator"},
limit=3,
dataset_id="...",
)
.. code-block:: python
[
ExampleSearch(
inputs={
"question": "How do I cache a Chat model? What caches can I use?"
},
outputs={
"answer": "You can use LangChain's caching layer for Chat Models. This can save you money by reducing the number of API calls you make to the LLM provider, if you're often requesting the same completion multiple times, and speed up your application.\n\nfrom langchain.cache import InMemoryCache\nlangchain.llm_cache = InMemoryCache()\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke')\n\nYou can also use SQLite Cache which uses a SQLite database:\n\nrm .langchain.db\n\nfrom langchain.cache import SQLiteCache\nlangchain.llm_cache = SQLiteCache(database_path=\".langchain.db\")\n\n# The first time, it is not yet in cache, so it should take longer\nllm.predict('Tell me a joke') \n"
},
metadata=None,
id=UUID("b2ddd1c4-dff6-49ae-8544-f48e39053398"),
dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"),
),
ExampleSearch(
inputs={"question": "What's a runnable lambda?"},
outputs={
"answer": "A runnable lambda is an object that implements LangChain's `Runnable` interface and runs a callbale (i.e., a function). Note the function must accept a single argument."
},
metadata=None,
id=UUID("f94104a7-2434-4ba7-8293-6a283f4860b4"),
dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"),
),
ExampleSearch(
inputs={"question": "Show me how to use RecursiveURLLoader"},
outputs={
"answer": 'The RecursiveURLLoader comes from the langchain.document_loaders.recursive_url_loader module. Here\'s an example of how to use it:\n\nfrom langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader\n\n# Create an instance of RecursiveUrlLoader with the URL you want to load\nloader = RecursiveUrlLoader(url="https://example.com")\n\n# Load all child links from the URL page\nchild_links = loader.load()\n\n# Print the child links\nfor link in child_links:\n print(link)\n\nMake sure to replace "https://example.com" with the actual URL you want to load. The load() method returns a list of child links found on the URL page. You can iterate over this list to access each child link.'
},
metadata=None,
id=UUID("0308ea70-a803-4181-a37d-39e95f138f8c"),
dataset_id=UUID("01b6ce0f-bfb6-4f48-bbb8-f19272135d40"),
),
]
update_example ¶
update_example(
example_id: ID_TYPE,
*,
inputs: dict[str, Any] | None = None,
outputs: Mapping[str, Any] | None = None,
metadata: dict | None = None,
split: str | list[str] | None = None,
dataset_id: ID_TYPE | None = None,
attachments_operations: AttachmentsOperations | None = None,
attachments: Attachments | None = None,
) -> dict[str, Any]
更新一个特定示例。
| 参数 | 描述 |
|---|---|
example_id
|
要更新的示例的 ID。
类型: |
inputs
|
要更新的输入值。 |
outputs
|
要更新的输出值。 |
metadata
|
要更新的元数据。
类型: |
split
|
要更新的数据集分割(split),如 'train'、'test' 或 'validation'。 |
dataset_id
|
要更新的数据集的 ID。
类型: |
attachments_operations
|
要执行的附件操作。
类型: |
attachments
|
要添加到示例的附件。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
Dict[str, Any]:更新后的示例。 |
update_examples ¶
update_examples(
*,
dataset_name: str | None = None,
dataset_id: ID_TYPE | None = None,
updates: Sequence[ExampleUpdate | dict] | None = None,
dangerously_allow_filesystem: bool = False,
**kwargs: Any,
) -> dict[str, Any]
更新多个示例。
示例应全部属于同一数据集。
| 参数 | 描述 |
|---|---|
dataset_name
|
要更新的数据集的名称。应且只能指定 'dataset_name' 或 'dataset_id' 中的一个。
类型: |
dataset_id
|
要更新的数据集的 ID。应且只能指定 'dataset_name' 或 'dataset_id' 中的一个。
类型: |
updates
|
示例更新。将覆盖任何指定的字段,而不更新任何未指定的字段。 |
dangerously_allow_filesystem
|
是否允许使用文件系统路径作为附件。
类型: |
**kwargs
|
旧版关键字参数。如果指定了 'updates',则不应指定此参数。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
LangSmith 的 JSON 响应。包括 'message'、'count' 和 'example_ids'。 |
.. versionchanged:: 0.3.9
Updated to ...
示例
.. code-block:: python
from langsmith import Client
client = Client()
dataset = client.create_dataset("agent-qa")
examples = [
{
"inputs": {"question": "what's an agent"},
"outputs": {"answer": "an agent is..."},
"metadata": {"difficulty": "easy"},
},
{
"inputs": {
"question": "can you explain the agent architecture in this diagram?"
},
"outputs": {"answer": "this diagram shows..."},
"attachments": {"diagram": {"mime_type": "image/png", "data": b"..."}},
"metadata": {"difficulty": "medium"},
},
# more examples...
]
response = client.create_examples(dataset_name="agent-qa", examples=examples)
example_ids = response["example_ids"]
updates = [
{
"id": example_ids[0],
"inputs": {"question": "what isn't an agent"},
"outputs": {"answer": "an agent is not..."},
},
{
"id": example_ids[1],
"attachments_operations": [
{"rename": {"diagram": "agent_diagram"}, "retain": []}
],
},
]
response = client.update_examples(dataset_name="agent-qa", updates=updates)
# -> {"example_ids": [...
delete_example ¶
list_dataset_splits ¶
update_dataset_splits ¶
update_dataset_splits(
*,
dataset_id: ID_TYPE | None = None,
dataset_name: str | None = None,
split_name: str,
example_ids: list[ID_TYPE],
remove: bool = False,
) -> None
更新数据集的拆分。
| 参数 | 描述 |
|---|---|
dataset_id
|
要更新的数据集的 ID。
类型: |
dataset_name
|
要更新的数据集的名称。
类型: |
split_name
|
要更新的分割的名称。
类型: |
example_ids
|
要从分割中添加或删除的示例的 ID。
类型: |
remove
|
如果为 True,则从分割中删除示例。如果为 False,则将示例添加到分割中。默认为 False。
类型: |
| 返回 | 描述 |
|---|---|
None
|
None |
evaluate_run ¶
evaluate_run(
run: Run | RunBase | str | UUID,
evaluator: RunEvaluator,
*,
source_info: dict[str, Any] | None = None,
reference_example: Example | str | dict | UUID | None = None,
load_child_runs: bool = False,
) -> EvaluationResult
评估一次运行。
| 参数 | 描述 |
|---|---|
run
|
要评估的运行(run)。
类型: |
evaluator
|
要使用的评估器。
类型: |
source_info
|
关于评估来源的附加信息,将作为反馈元数据记录。 |
reference_example
|
用作评估参考的示例。如果未提供,将使用运行的参考示例。 |
load_child_runs
|
在解析运行 ID 时是否加载子运行。
类型: |
| 返回 | 描述 |
|---|---|
Feedback
|
由评估创建的反馈对象。
类型: |
aevaluate_run async ¶
aevaluate_run(
run: Run | str | UUID,
evaluator: RunEvaluator,
*,
source_info: dict[str, Any] | None = None,
reference_example: Example | str | dict | UUID | None = None,
load_child_runs: bool = False,
) -> EvaluationResult
异步评估一次运行。
| 参数 | 描述 |
|---|---|
run
|
要评估的运行(run)。
类型: |
evaluator
|
要使用的评估器。
类型: |
source_info
|
关于评估来源的附加信息,将作为反馈元数据记录。 |
reference_example
|
用作评估参考的示例。如果未提供,将使用运行的参考示例。 |
| 返回 | 描述 |
|---|---|
EvaluationResult
|
由评估创建的评估结果对象。
类型: |
create_feedback ¶
create_feedback(
run_id: ID_TYPE | None = None,
key: str = "unnamed",
*,
score: float | int | bool | None = None,
value: str | dict | None = None,
trace_id: ID_TYPE | None = None,
correction: dict | None = None,
comment: str | None = None,
source_info: dict[str, Any] | None = None,
feedback_source_type: FeedbackSourceType | str = API,
source_run_id: ID_TYPE | None = None,
feedback_id: ID_TYPE | None = None,
feedback_config: FeedbackConfig | None = None,
stop_after_attempt: int = 10,
project_id: ID_TYPE | None = None,
comparative_experiment_id: ID_TYPE | None = None,
feedback_group_id: ID_TYPE | None = None,
extra: dict | None = None,
error: bool | None = None,
**kwargs: Any,
) -> Feedback
为一次运行创建反馈。
注意:要启用在后台批量上传反馈,您必须指定 trace_id。我们强烈建议在对延迟敏感的环境中使用此功能。
| 参数 | 描述 |
|---|---|
key
|
反馈指标的名称。
类型: |
score
|
对此运行在指标或方面的评分。 |
value
|
此反馈的显示值或非数值。 |
run_id
|
要提供反馈的运行(run)的 ID。必须至少指定 run_id、trace_id 或 project_id 中的一个。
类型: |
trace_id
|
要提供反馈的运行(由 run_id 指定)的追踪(即根父运行)的 ID。如果 run_id 和 trace_id 相同,则只需指定 trace_id。注意:trace_id 是反馈摄取进行批量和后台处理所必需的。
类型: |
correction
|
此运行的正确基准事实。
类型: |
comment
|
关于此反馈的评论,例如对分数的辩解或 LLM 裁判的思维链轨迹。
类型: |
source_info
|
关于此反馈来源的信息。 |
feedback_source_type
|
反馈来源的类型,例如模型(用于模型生成的反馈)或 API。 |
source_run_id
|
如果类型为“模型”,则为生成此反馈的运行的 ID。
类型: |
feedback_id
|
要创建的反馈的 ID。如果未提供,将生成一个随机的 UUID。
类型: |
feedback_config
|
指定如何解释此键的反馈的配置。示例包括连续(带有最小/最大界限)、分类或自由形式。
类型: |
尝试后停止
|
放弃前重试请求的次数。
类型: |
project_id
|
要提供反馈的项目(或实验)的 ID。这用于为实验创建摘要指标。如果指定了 project_id,则不能指定 run_id 或 trace_id,反之亦然。
类型: |
comparative_experiment_id
|
如果此反馈是作为比较实验的一部分记录的,则此项将反馈与该实验关联。
类型: |
feedback_group_id
|
在记录偏好、对运行进行排名或其他比较性反馈时,此项用于将反馈组合在一起。
类型: |
extra
|
反馈的元数据。
类型: |
**kwargs
|
附加的关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
Feedback
|
已创建的反馈对象。
类型: |
示例
.. code-block:: python
from langsmith import trace, traceable, Client
@traceable
def foo(x):
return {"y": x * 2}
@traceable
def bar(y):
return {"z": y - 1}
client = Client()
inputs = {"x": 1}
with trace(name="foobar", inputs=inputs) as root_run:
result = foo(**inputs)
result = bar(**result)
root_run.outputs = result
trace_id = root_run.id
child_runs = root_run.child_runs
# Provide feedback for a trace (a.k.a. a root run)
client.create_feedback(
key="user_feedback",
score=1,
trace_id=trace_id,
)
# Provide feedback for a child run
foo_run_id = [run for run in child_runs if run.name == "foo"][0].id
client.create_feedback(
key="correctness",
score=0,
run_id=foo_run_id,
# trace_id= is optional but recommended to enable batched and backgrounded
# feedback ingestion.
trace_id=trace_id,
)
update_feedback ¶
list_feedback ¶
list_feedback(
*,
run_ids: Sequence[ID_TYPE] | None = None,
feedback_key: Sequence[str] | None = None,
feedback_source_type: Sequence[FeedbackSourceType] | None = None,
limit: int | None = None,
**kwargs: Any,
) -> Iterator[Feedback]
列出 LangSmith API 上的反馈对象。
| 参数 | 描述 |
|---|---|
run_ids
|
用于筛选的运行 ID。 |
feedback_key
|
用于筛选的反馈键。示例:'correctness'。查询对所有反馈键执行并集操作。 |
feedback_source_type
|
反馈来源的类型,例如模型或 API。
类型: |
limit
|
要返回的最大反馈数量。
TYPE: |
**kwargs
|
附加的关键字参数。
类型: |
| YIELDS | 描述 |
|---|---|
Feedback
|
反馈对象。 |
delete_feedback ¶
create_feedback_from_token ¶
create_feedback_from_token(
token_or_url: str | UUID,
score: float | int | bool | None = None,
*,
value: float | int | bool | str | dict | None = None,
correction: dict | None = None,
comment: str | None = None,
metadata: dict | None = None,
) -> None
通过预签名令牌或 URL 创建反馈。
| 参数 | 描述 |
|---|---|
token_or_url
|
用于创建反馈的令牌或 URL。 |
score
|
反馈的分数。默认为 None。 |
value
|
反馈的值。默认为 None。 |
correction
|
反馈的修正。默认为 None。
类型: |
comment
|
反馈的评论。默认为 None。
类型: |
metadata
|
反馈的附加元数据。默认为 None。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果源 API URL 无效。 |
| 返回 | 描述 |
|---|---|
None
|
None |
create_presigned_feedback_token ¶
create_presigned_feedback_token(
run_id: ID_TYPE,
feedback_key: str,
*,
expiration: datetime | timedelta | None = None,
feedback_config: FeedbackConfig | None = None,
feedback_id: ID_TYPE | None = None,
) -> FeedbackIngestToken
创建一个预签名 URL 用于发送反馈数据。
这对于为基于浏览器的客户端提供一种直接向 LangSmith 上传反馈数据而无需访问 API 密钥的方式非常有用。
| 参数 | 描述 |
|---|---|
run_id
|
运行的 ID。
类型: |
feedback_key
|
要创建的反馈的键。
类型: |
expiration
|
预签名 URL 的过期时间。可以是 datetime 或从现在开始的 timedelta 偏移量。默认为 3 小时。 |
feedback_config
|
如果首次创建 feedback_key,这定义了应如何解释该指标,例如连续分数(带有可选界限)或分类值的分布。
类型: |
feedback_id
|
要创建的反馈的 ID。如果未提供,将创建一个新的反馈。
类型: |
| 返回 | 描述 |
|---|---|
FeedbackIngestToken
|
用于上传反馈数据的预签名 URL。 |
create_presigned_feedback_tokens ¶
create_presigned_feedback_tokens(
run_id: ID_TYPE,
feedback_keys: Sequence[str],
*,
expiration: datetime | timedelta | None = None,
feedback_configs: Sequence[FeedbackConfig | None] | None = None,
) -> Sequence[FeedbackIngestToken]
创建一个预签名 URL 用于发送反馈数据。
这对于为基于浏览器的客户端提供一种直接向 LangSmith 上传反馈数据而无需访问 API 密钥的方式非常有用。
| 参数 | 描述 |
|---|---|
run_id
|
运行的 ID。
类型: |
feedback_keys
|
要创建的反馈的键。 |
expiration
|
预签名 URL 的过期时间。可以是 datetime 或从现在开始的 timedelta 偏移量。默认为 3 小时。 |
feedback_configs
|
如果首次创建 feedback_key,这定义了应如何解释该指标,例如连续分数(带有可选界限)或分类值的分布。
类型: |
| 返回 | 描述 |
|---|---|
Sequence[FeedbackIngestToken]
|
Sequence[FeedbackIngestToken]:用于上传反馈数据的预签名 URL。 |
list_presigned_feedback_tokens ¶
list_presigned_feedback_tokens(
run_id: ID_TYPE, *, limit: int | None = None
) -> Iterator[FeedbackIngestToken]
列出一次运行的反馈摄入令牌。
| 参数 | 描述 |
|---|---|
run_id
|
用于筛选的运行(run)的 ID。
类型: |
limit
|
要返回的最大令牌数。
TYPE: |
| YIELDS | 描述 |
|---|---|
FeedbackIngestToken
|
反馈摄取令牌。 |
list_annotation_queues ¶
list_annotation_queues(
*,
queue_ids: list[ID_TYPE] | None = None,
name: str | None = None,
name_contains: str | None = None,
limit: int | None = None,
) -> Iterator[AnnotationQueue]
列出 LangSmith API 上的标注队列。
| 参数 | 描述 |
|---|---|
queue_ids
|
用于筛选的队列 ID。
类型: |
name
|
用于筛选的队列名称。
类型: |
name_contains
|
队列名称应包含的子字符串。
类型: |
limit
|
要返回的最大队列数。
TYPE: |
| YIELDS | 描述 |
|---|---|
AnnotationQueue
|
标注队列。 |
create_annotation_queue ¶
create_annotation_queue(
*,
name: str,
description: str | None = None,
queue_id: ID_TYPE | None = None,
rubric_instructions: str | None = None,
) -> AnnotationQueueWithDetails
read_annotation_queue ¶
read_annotation_queue(queue_id: ID_TYPE) -> AnnotationQueue
读取具有指定队列 ID 的标注队列。
| 参数 | 描述 |
|---|---|
queue_id
|
要读取的标注队列的 ID。
类型: |
| 返回 | 描述 |
|---|---|
AnnotationQueue
|
标注队列对象。
类型: |
update_annotation_queue ¶
delete_annotation_queue ¶
add_runs_to_annotation_queue ¶
add_runs_to_annotation_queue(queue_id: ID_TYPE, *, run_ids: list[ID_TYPE]) -> None
delete_run_from_annotation_queue ¶
get_run_from_annotation_queue ¶
get_run_from_annotation_queue(
queue_id: ID_TYPE, *, index: int
) -> RunWithAnnotationQueueInfo
create_comparative_experiment ¶
create_comparative_experiment(
name: str,
experiments: Sequence[ID_TYPE],
*,
reference_dataset: ID_TYPE | None = None,
description: str | None = None,
created_at: datetime | None = None,
metadata: dict[str, Any] | None = None,
id: ID_TYPE | None = None,
) -> ComparativeExperiment
在 LangSmith API 上创建一个比较实验。
这些实验比较了共享数据集上 2 个或更多实验的结果。
| 参数 | 描述 |
|---|---|
name
|
比较实验的名称。
类型: |
experiments
|
要比较的实验的 ID。 |
reference_dataset
|
这些实验所比较的数据集的 ID。
类型: |
描述
|
比较实验的描述。
类型: |
created_at
|
比较实验的创建时间。
类型: |
metadata
|
比较实验的附加元数据。 |
id
|
比较实验的 ID。
类型: |
| 返回 | 描述 |
|---|---|
ComparativeExperiment
|
已创建的比较实验对象。 |
arun_on_dataset async ¶
arun_on_dataset(
dataset_name: str,
llm_or_chain_factory: Any,
*,
evaluation: Any | None = None,
concurrency_level: int = 5,
project_name: str | None = None,
project_metadata: dict[str, Any] | None = None,
dataset_version: datetime | str | None = None,
verbose: bool = False,
input_mapper: Callable[[dict], Any] | None = None,
revision_id: str | None = None,
**kwargs: Any,
) -> dict[str, Any]
在数据集上异步运行链或语言模型。
.. deprecated:: 0.1.0
此方法已弃用。请改用 :func:langsmith.aevaluate。
run_on_dataset ¶
run_on_dataset(
dataset_name: str,
llm_or_chain_factory: Any,
*,
evaluation: Any | None = None,
concurrency_level: int = 5,
project_name: str | None = None,
project_metadata: dict[str, Any] | None = None,
dataset_version: datetime | str | None = None,
verbose: bool = False,
input_mapper: Callable[[dict], Any] | None = None,
revision_id: str | None = None,
**kwargs: Any,
) -> dict[str, Any]
在数据集上运行链或语言模型。
.. deprecated:: 0.1.0
此方法已弃用。请改用 :func:langsmith.aevaluate。
like_prompt ¶
unlike_prompt ¶
list_prompts ¶
list_prompts(
*,
limit: int = 100,
offset: int = 0,
is_public: bool | None = None,
is_archived: bool | None = False,
sort_field: PromptSortField = updated_at,
sort_direction: Literal["desc", "asc"] = "desc",
query: str | None = None,
) -> ListPromptsResponse
带分页地列出提示。
| 参数 | 描述 |
|---|---|
limit
|
要返回的最大提示数。默认为 100。
类型: |
offset
|
要跳过的提示数。默认为 0。
类型: |
is_public
|
根据提示是否公开进行筛选。
类型: |
is_archived
|
根据提示是否已归档进行筛选。
类型: |
sort_field
|
用于排序的字段。默认为 "updated_at"。
类型: |
sort_direction
|
排序顺序。默认为 "desc"。
类型: |
query
|
通过搜索查询筛选提示。
类型: |
| 返回 | 描述 |
|---|---|
ListPromptsResponse
|
一个包含以下内容的响应对象 |
ListPromptsResponse
|
提示列表。 |
get_prompt ¶
create_prompt ¶
create_prompt(
prompt_identifier: str,
*,
description: str | None = None,
readme: str | None = None,
tags: Sequence[str] | None = None,
is_public: bool = False,
) -> Prompt
创建一个新的提示。
不附加提示对象,仅创建一个空提示。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
提示的标识符。标识符应采用 owner/name:hash、name:hash、owner/name 或 name 的格式
类型: |
描述
|
提示的描述。
类型: |
readme
|
提示的自述文件。
类型: |
tags
|
提示的标签列表。 |
is_public
|
提示是否应公开。默认为 False。
类型: |
| 返回 | 描述 |
|---|---|
Prompt
|
已创建的提示对象。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果当前租户不是所有者。 |
HTTPError
|
如果服务器请求失败。 |
create_commit ¶
create_commit(
prompt_identifier: str, object: Any, *, parent_commit_hash: str | None = None
) -> str
为现有提示创建一个提交。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
提示的标识符。
类型: |
object
|
要提交的 LangChain 对象。
类型: |
parent_commit_hash
|
父提交的哈希值。默认为最新提交。
类型: |
| 返回 | 描述 |
|---|---|
str
|
提示提交的 URL。
类型: |
| 引发 | 描述 |
|---|---|
HTTPError
|
如果服务器请求失败。 |
ValueError
|
如果提示不存在。 |
update_prompt ¶
update_prompt(
prompt_identifier: str,
*,
description: str | None = None,
readme: str | None = None,
tags: Sequence[str] | None = None,
is_public: bool | None = None,
is_archived: bool | None = None,
) -> dict[str, Any]
更新提示的元数据。
要更新提示的内容,请改用 push_prompt 或 create_commit。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
要更新的提示的标识符。
类型: |
描述
|
提示的新描述。
类型: |
readme
|
提示的新自述文件。
类型: |
tags
|
提示的新标签列表。 |
is_public
|
提示的新公开状态。
类型: |
is_archived
|
提示的新归档状态。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, Any]
|
Dict[str, Any]:服务器返回的已更新的提示数据。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果 prompt_identifier 为空。 |
HTTPError
|
如果服务器请求失败。 |
delete_prompt ¶
delete_prompt(prompt_identifier: str) -> None
删除一个提示。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
要删除的提示的标识符。
类型: |
| 返回 | 描述 |
|---|---|
bool
|
如果提示已成功删除,则为 True,否则为 False。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果当前租户不是提示的所有者。 |
pull_prompt_commit ¶
pull_prompt_commit(
prompt_identifier: str, *, include_model: bool | None = False
) -> PromptCommit
从 LangSmith API 拉取一个提示对象。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
提示的标识符。
类型: |
| 返回 | 描述 |
|---|---|
PromptCommit
|
提示对象。
类型: |
| 引发 | 描述 |
|---|---|
ValueError
|
如果没有找到提示的提交。 |
list_prompt_commits ¶
list_prompt_commits(
prompt_identifier: str,
*,
limit: int | None = None,
offset: int = 0,
include_model: bool = False,
) -> Iterator[ListedPromptCommit]
列出给定提示的提交。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
提示的标识符,格式为 'owner/repo_name'。
类型: |
limit
|
要返回的最大提交数。如果为 None,则返回所有提交。默认为 None。
TYPE: |
offset
|
在开始返回结果之前要跳过的提交数。默认为 0。
类型: |
include_model
|
是否在提交数据中包含模型信息。默认为 False。
类型: |
| YIELDS | 描述 |
|---|---|
ListedPromptCommit
|
每个提交的 ListedPromptCommit 对象。 |
注意
此方法使用分页来检索提交。如果需要,它将进行多次 API 调用以检索所有提交或达到指定的限制。
pull_prompt ¶
push_prompt ¶
push_prompt(
prompt_identifier: str,
*,
object: Any | None = None,
parent_commit_hash: str = "latest",
is_public: bool | None = None,
description: str | None = None,
readme: str | None = None,
tags: Sequence[str] | None = None,
) -> str
将一个提示推送到 LangSmith API。
可用于更新提示元数据或提示内容。
如果提示不存在,它将被创建。如果提示存在,它将被更新。
| 参数 | 描述 |
|---|---|
prompt_identifier
|
提示的标识符。
类型: |
object
|
要推送的 LangChain 对象。
类型: |
parent_commit_hash
|
父提交哈希。默认为 "latest"。
类型: |
is_public
|
提示是否应公开。如果为 None(默认),则对现有提示保持当前的可见性状态。对于新提示,None 默认为私有。设置为 True 以公开,或设置为 False 以设为私有。
类型: |
描述
|
提示的描述。默认为空字符串。
类型: |
readme
|
提示的自述文件。默认为空字符串。
类型: |
tags
|
提示的标签列表。默认为空列表。 |
| 返回 | 描述 |
|---|---|
str
|
提示的 URL。
类型: |
evaluate ¶
evaluate(
target: TARGET_T | Runnable | EXPERIMENT_T | tuple[EXPERIMENT_T, EXPERIMENT_T],
/,
data: DATA_T | None = None,
evaluators: Sequence[EVALUATOR_T] | Sequence[COMPARATIVE_EVALUATOR_T] | None = None,
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] | None = None,
metadata: dict | None = None,
experiment_prefix: str | None = None,
description: str | None = None,
max_concurrency: int | None = 0,
num_repetitions: int = 1,
blocking: bool = True,
experiment: EXPERIMENT_T | None = None,
upload_results: bool = True,
error_handling: Literal["log", "ignore"] = "log",
**kwargs: Any,
) -> ExperimentResults | ComparativeExperimentResults
在给定数据集上评估目标系统。
| 参数 | 描述 |
|---|---|
target
|
要评估的目标系统或实验。可以是一个接受 dict 并返回 dict 的函数、一个 langchain Runnable、一个现有的实验 ID 或一个由两个实验 ID 组成的元组。
类型: |
data
|
要评估的数据集。可以是数据集名称、示例列表或示例生成器。
类型: |
evaluators
|
要在每个示例上运行的评估器列表。评估器签名取决于目标类型。默认为 None。
类型: |
summary_evaluators
|
要在整个数据集上运行的摘要评估器列表。如果比较两个现有实验,则不应指定此项。默认为 None。
类型: |
metadata
|
要附加到实验的元数据。默认为 None。
类型: |
experiment_prefix
|
为实验名称提供的前缀。默认为 None。
类型: |
描述
|
实验的自由格式文本描述。
类型: |
max_concurrency
|
要运行的最大并发评估数。如果为 None,则不设限制。如果为 0,则无并发。默认为 0。
类型: |
blocking
|
是否阻塞直到评估完成。默认为 True。
类型: |
num_repetitions
|
运行评估的次数。数据集中的每个项目都将运行和评估这么多次。默认为 1。
类型: |
experiment
|
要扩展的现有实验。如果提供,则忽略 experiment_prefix。仅供高级使用。如果目标是现有实验或两个实验的元组,则不应指定此项。
类型: |
upload_results
|
是否将结果上传到 LangSmith。默认为 True。
类型: |
error_handling
|
如何处理单个运行错误。'log' 会将带有错误消息的运行作为实验的一部分进行追踪,'ignore' 将完全不将该运行计为实验的一部分。
类型: |
**kwargs
|
要传递给评估器的其他关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
ExperimentResults
|
如果目标是函数、Runnable 或现有实验。
类型: |
ComparativeExperimentResults
|
如果目标是两个现有实验的元组。
类型: |
示例
准备数据集
.. code-block:: python
from langsmith import Client
client = Client()
dataset = client.clone_public_dataset(
"https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
)
dataset_name = "Evaluate Examples"
基本用法
.. code-block:: python
def accuracy(outputs: dict, reference_outputs: dict) -> dict:
# Row-level evaluator for accuracy.
pred = outputs["response"]
expected = reference_outputs["answer"]
return {"score": expected.lower() == pred.lower()}
.. code-block:: python
def precision(outputs: list[dict], reference_outputs: list[dict]) -> dict:
# Experiment-level evaluator for precision.
# TP / (TP + FP)
predictions = [out["response"].lower() for out in outputs]
expected = [ref["answer"].lower() for ref in reference_outputs]
# yes and no are the only possible answers
tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
return {"score": tp / (tp + fp)}
def predict(inputs: dict) -> dict:
# This can be any function or just an API call to your app.
return {"response": "Yes"}
results = client.evaluate(
predict,
data=dataset_name,
evaluators=[accuracy],
summary_evaluators=[precision],
experiment_prefix="My Experiment",
description="Evaluating the accuracy of a simple prediction model.",
metadata={
"my-prompt-version": "abcd-1234",
},
)
仅对示例的一个子集进行评估
.. code-block:: python
experiment_name = results.experiment_name
examples = client.list_examples(dataset_name=dataset_name, limit=5)
results = client.evaluate(
predict,
data=examples,
evaluators=[accuracy],
summary_evaluators=[precision],
experiment_prefix="My Experiment",
description="Just testing a subset synchronously.",
)
流式传输每个预测以便更轻松、更主动地进行调试。
.. code-block:: python
results = client.evaluate(
predict,
data=dataset_name,
evaluators=[accuracy],
summary_evaluators=[precision],
description="I don't even have to block!",
blocking=False,
)
for i, result in enumerate(results): # doctest: +ELLIPSIS
pass
使用 `evaluate` API 和现成的 LangChain 评估器
.. code-block:: python
from langsmith.evaluation import LangChainStringEvaluator
from langchain.chat_models import init_chat_model
def prepare_criteria_data(run: Run, example: Example):
return {
"prediction": run.outputs["output"],
"reference": example.outputs["answer"],
"input": str(example.inputs),
}
results = client.evaluate(
predict,
data=dataset_name,
evaluators=[
accuracy,
LangChainStringEvaluator("embedding_distance"),
LangChainStringEvaluator(
"labeled_criteria",
config={
"criteria": {
"usefulness": "The prediction is useful if it is correct"
" and/or asks a useful followup question."
},
"llm": init_chat_model("gpt-4o"),
},
prepare_data=prepare_criteria_data,
),
],
description="Evaluating with off-the-shelf LangChain evaluators.",
summary_evaluators=[precision],
)
查看实验的评估结果:... 评估 LangChain 对象
.. code-block:: python
from langchain_core.runnables import chain as as_runnable
@as_runnable
def nested_predict(inputs):
return {"response": "Yes"}
@as_runnable
def lc_predict(inputs):
return nested_predict.invoke(inputs)
results = client.evaluate(
lc_predict,
data=dataset_name,
evaluators=[accuracy],
description="This time we're evaluating a LangChain object.",
summary_evaluators=[precision],
)
比较性评估
.. code-block:: python
results = client.evaluate(
# The target is a tuple of the experiment IDs to compare
target=(
"12345678-1234-1234-1234-123456789012",
"98765432-1234-1234-1234-123456789012",
),
evaluators=[accuracy],
summary_evaluators=[precision],
)
评估现有实验
.. code-block:: python
results = client.evaluate(
# The target is the ID of the experiment we are evaluating
target="12345678-1234-1234-1234-123456789012",
evaluators=[accuracy],
summary_evaluators=[precision],
)
.. versionadded:: 0.2.0
aevaluate async ¶
aevaluate(
target: ATARGET_T | AsyncIterable[dict] | Runnable | str | UUID | TracerSession,
/,
data: DATA_T | AsyncIterable[Example] | Iterable[Example] | None = None,
evaluators: Sequence[EVALUATOR_T | AEVALUATOR_T] | None = None,
summary_evaluators: Sequence[SUMMARY_EVALUATOR_T] | None = None,
metadata: dict | None = None,
experiment_prefix: str | None = None,
description: str | None = None,
max_concurrency: int | None = 0,
num_repetitions: int = 1,
blocking: bool = True,
experiment: TracerSession | str | UUID | None = None,
upload_results: bool = True,
error_handling: Literal["log", "ignore"] = "log",
**kwargs: Any,
) -> AsyncExperimentResults
在给定数据集上评估一个异步目标系统。
| 参数 | 描述 |
|---|---|
target
|
要评估的目标系统或实验。可以是一个接受 dict 并返回 dict 的异步函数、一个 langchain Runnable、一个现有的实验 ID 或一个由两个实验 ID 组成的元组。
类型: |
data
|
要评估的数据集。可以是数据集名称、示例列表、示例的异步生成器或示例的异步可迭代对象。
类型: |
evaluators
|
要在每个示例上运行的评估器列表。默认为 None。
类型: |
summary_evaluators
|
要在整个数据集上运行的摘要评估器列表。默认为 None。
类型: |
metadata
|
要附加到实验的元数据。默认为 None。
类型: |
experiment_prefix
|
为实验名称提供的前缀。默认为 None。
类型: |
描述
|
实验的描述。
类型: |
max_concurrency
|
要运行的最大并发评估数。如果为 None,则不设限制。如果为 0,则无并发。默认为 0。
类型: |
num_repetitions
|
运行评估的次数。数据集中的每个项目都将运行和评估这么多次。默认为 1。
类型: |
blocking
|
是否阻塞直到评估完成。默认为 True。
类型: |
experiment
|
要扩展的现有实验。如果提供,则忽略 experiment_prefix。仅供高级使用。
类型: |
upload_results
|
是否将结果上传到 LangSmith。默认为 True。
类型: |
error_handling
|
如何处理单个运行错误。'log' 会将带有错误消息的运行作为实验的一部分进行追踪,'ignore' 将完全不将该运行计为实验的一部分。
类型: |
**kwargs
|
要传递给评估器的其他关键字参数。
类型: |
| 返回 | 描述 |
|---|---|
AsyncExperimentResults
|
AsyncIterator[ExperimentResultRow]:实验结果的异步迭代器。 |
环境
- LANGSMITH_TEST_CACHE:如果设置,API 调用将被缓存到磁盘,以在测试期间节省时间和成本。建议将缓存文件提交到您的代码仓库以加快 CI/CD 运行。需要安装 'langsmith[vcr]' 包。
示例
准备数据集
.. code-block:: python
import asyncio
from langsmith import Client
client = Client()
dataset = client.clone_public_dataset(
"https://smith.langchain.com/public/419dcab2-1d66-4b94-8901-0357ead390df/d"
)
dataset_name = "Evaluate Examples"
基本用法
.. code-block:: python
def accuracy(outputs: dict, reference_outputs: dict) -> dict:
# Row-level evaluator for accuracy.
pred = outputs["resposen"]
expected = reference_outputs["answer"]
return {"score": expected.lower() == pred.lower()}
def precision(outputs: list[dict], reference_outputs: list[dict]) -> dict:
# Experiment-level evaluator for precision.
# TP / (TP + FP)
predictions = [out["response"].lower() for out in outputs]
expected = [ref["answer"].lower() for ref in reference_outputs]
# yes and no are the only possible answers
tp = sum([p == e for p, e in zip(predictions, expected) if p == "yes"])
fp = sum([p == "yes" and e == "no" for p, e in zip(predictions, expected)])
return {"score": tp / (tp + fp)}
async def apredict(inputs: dict) -> dict:
# This can be any async function or just an API call to your app.
await asyncio.sleep(0.1)
return {"response": "Yes"}
results = asyncio.run(
client.aevaluate(
apredict,
data=dataset_name,
evaluators=[accuracy],
summary_evaluators=[precision],
experiment_prefix="My Experiment",
description="Evaluate the accuracy of the model asynchronously.",
metadata={
"my-prompt-version": "abcd-1234",
},
)
)
使用异步生成器仅对示例的一个子集进行评估
.. code-block:: python
async def example_generator():
examples = client.list_examples(dataset_name=dataset_name, limit=5)
for example in examples:
yield example
results = asyncio.run(
client.aevaluate(
apredict,
data=example_generator(),
evaluators=[accuracy],
summary_evaluators=[precision],
experiment_prefix="My Subset Experiment",
description="Evaluate a subset of examples asynchronously.",
)
)
流式传输每个预测以便更轻松、更主动地进行调试。
.. code-block:: python
results = asyncio.run(
client.aevaluate(
apredict,
data=dataset_name,
evaluators=[accuracy],
summary_evaluators=[precision],
experiment_prefix="My Streaming Experiment",
description="Streaming predictions for debugging.",
blocking=False,
)
)
async def aenumerate(iterable):
async for elem in iterable:
print(elem)
asyncio.run(aenumerate(results))
无并发运行
.. code-block:: python
results = asyncio.run(
client.aevaluate(
apredict,
data=dataset_name,
evaluators=[accuracy],
summary_evaluators=[precision],
experiment_prefix="My Experiment Without Concurrency",
description="This was run without concurrency.",
max_concurrency=0,
)
)
使用异步评估器
.. code-block:: python
async def helpfulness(outputs: dict) -> dict:
# Row-level evaluator for helpfulness.
await asyncio.sleep(5) # Replace with your LLM API call
return {"score": outputs["output"] == "Yes"}
results = asyncio.run(
client.aevaluate(
apredict,
data=dataset_name,
evaluators=[helpfulness],
summary_evaluators=[precision],
experiment_prefix="My Helpful Experiment",
description="Applying async evaluators example.",
)
)
评估现有实验
.. code-block:: python
results = asyncio.run(
client.aevaluate(
# The target is the ID of the experiment we are evaluating
target="419dcab2-1d66-4b94-8901-0357ead390df",
evaluators=[accuracy, helpfulness],
summary_evaluators=[precision],
)
)
.. versionadded:: 0.2.0
get_experiment_results ¶
get_experiment_results(
name: str | None = None,
project_id: UUID | None = None,
preview: bool = False,
comparative_experiment_id: UUID | None = None,
filters: dict[UUID, list[str]] | None = None,
limit: int | None = None,
) -> ExperimentResults
获取实验结果,包括实验会话的聚合统计数据和每个数据集示例的实验运行。
实验结果可能在实验创建后不会立即可用。
| 参数 | 描述 |
|---|---|
name
|
实验名称。
类型: |
project_id
|
实验的追踪项目 ID,也称为 session_id,可以在 LS 实验页面的 URL 中找到
类型: |
preview
|
是否仅返回轻量级预览数据。当为 True 时,从 S3 存储中获取 inputs_preview/outputs_preview 摘要,而不是完整的输入/输出。速度更快,带宽更少。
类型: |
comparative_experiment_id
|
用于成对比较实验结果的可选比较实验 UUID。
类型: |
filters
|
应用于结果的可选筛选器 |
limit
|
要返回的最大结果数
TYPE: |
| 返回 | 描述 |
|---|---|
ExperimentResults
|
ExperimentResults 包含: - feedback_stats:组合的反馈统计信息,包括会话级反馈 - run_stats:聚合的运行统计信息(延迟、令牌、成本等) - examples_with_runs:ExampleWithRuns 的迭代器 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果找不到给定 session_id 的项目 |
示例
.. code-block:: python
client = Client()
results = client.get_experiment_results(
project_id="037ae90f-f297-4926-b93c-37d8abf6899f",
)
for example_with_runs in results["examples_with_runs"]:
print(example_with_runs.dict())
# Access aggregated experiment statistics
print(f"Total runs: {results['run_stats']['run_count']}")
print(f"Total cost: {results['run_stats']['total_cost']}")
print(f"P50 latency: {results['run_stats']['latency_p50']}")
# Access feedback statistics
print(f"Feedback stats: {results['feedback_stats']}")
convert_prompt_to_openai_format ¶
convert_prompt_to_openai_format(
messages: Any, model_kwargs: dict[str, Any] | None = None
) -> dict
将提示转换为 OpenAI 格式。
需要安装 `langchain_openai` 包。
| 参数 | 描述 |
|---|---|
messages
|
要转换的消息。
类型: |
model_kwargs
|
模型配置参数,包括 `stop` 和任何其他必需的参数。默认为 None。 |
| 返回 | 描述 |
|---|---|
dict
|
OpenAI 格式的提示。
类型: |
| 引发 | 描述 |
|---|---|
ImportError
|
如果未安装 `langchain_openai` 包。 |
LangSmithError
|
如果在转换过程中出现错误。 |