LangSmith 部署 SDK 参考¶
(曾用名 LangGraph 平台)
langgraph_sdk.client ¶
LangGraph 客户端实现连接到 LangGraph API。
该模块提供了异步(get_client(url="https://:2024")) 或 LangGraphClient)和同步(get_sync_client(url="https://:2024")) 或 SyncLanggraphClient)客户端,用于与 LangGraph API 的核心资源(如 Assistants、Threads、Runs 和 Cron jobs)以及其持久化文档存储(Store)进行交互。
| 函数 | 描述 |
|---|---|
get_client |
创建并配置一个 LangGraphClient。 |
get_sync_client |
获取一个同步的 LangGraphClient 实例。 |
LangGraphClient ¶
LangGraph API 的顶级客户端。
| 属性 | 描述 |
|---|---|
assistants |
管理图的版本化配置。
|
threads |
处理(可能)多轮的交互,例如对话线程。
|
runs |
控制图的单个调用。
|
crons |
管理计划任务。
|
store |
与持久化的共享数据存储进行交互。
|
| 方法 | 描述 |
|---|---|
__aenter__ |
进入异步上下文管理器。 |
__aexit__ |
退出异步上下文管理器。 |
aclose |
关闭底层的 HTTP 客户端。 |
__aexit__ async ¶
__aexit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
退出异步上下文管理器。
HttpClient ¶
处理对 LangGraph API 的异步请求。
在提供的 httpx 客户端之上增加了额外的错误消息和内容处理功能。
| 属性 | 描述 |
|---|---|
client |
底层的 HTTPX 异步客户端。
类型: |
| 方法 | 描述 |
|---|---|
get |
发送一个 |
post |
发送一个 |
put |
发送一个 |
patch |
发送一个 |
delete |
发送一个 |
request_reconnect |
发送一个请求,该请求会自动重新连接到 Location 头部指定的地址。 |
stream |
使用 SSE 流式传输结果。 |
get async ¶
get(
path: str,
*,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 GET 请求。
post async ¶
post(
path: str,
*,
json: dict[str, Any] | list | None,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 POST 请求。
put async ¶
put(
path: str,
*,
json: dict,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 PUT 请求。
patch async ¶
patch(
path: str,
*,
json: dict,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 PATCH 请求。
delete async ¶
delete(
path: str,
*,
json: Any | None = None,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> None
发送一个 DELETE 请求。
request_reconnect async ¶
request_reconnect(
path: str,
method: str,
*,
json: dict[str, Any] | None = None,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
reconnect_limit: int = 5,
) -> Any
发送一个请求,该请求会自动重新连接到 Location 头部指定的地址。
AssistantsClient ¶
用于在 LangGraph 中管理助手的客户端。
此类提供与助手交互的方法,助手是图的版本化配置。
示例
| 方法 | 描述 |
|---|---|
get |
通过 ID 获取一个助手。 |
get_graph |
通过 ID 获取一个助手的图。 |
get_schemas |
通过 ID 获取一个助手的模式(schemas)。 |
get_subgraphs |
通过 ID 获取一个助手的模式(schemas)。 |
create |
创建一个新的助手。 |
更新 |
更新一个助手。 |
delete |
删除一个助手。 |
search |
搜索助手。 |
count |
统计匹配筛选条件的助手数量。 |
get_versions |
列出一个助手的所有版本。 |
set_latest |
更改一个助手的版本。 |
get async ¶
get(
assistant_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
通过 ID 获取一个助手。
| 参数 | 描述 |
|---|---|
assistant_id
|
要获取的助手的 ID。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Assistant
|
助手对象。
类型: |
用法示例
----------------------------------------------------
{
'assistant_id': 'my_assistant_id',
'graph_id': 'agent',
'created_at': '2024-06-25T17:10:33.109781+00:00',
'updated_at': '2024-06-25T17:10:33.109781+00:00',
'config': {},
'metadata': {'created_by': 'system'},
'version': 1,
'name': 'my_assistant'
}
get_graph async ¶
get_graph(
assistant_id: str,
*,
xray: int | bool = False,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> dict[str, list[dict[str, Any]]]
通过 ID 获取一个助手的图。
| 参数 | 描述 |
|---|---|
assistant_id
|
要获取其图的助手的 ID。
类型: |
xray
|
包含子图的图表示。如果提供一个整数值,则只包含深度小于或等于该值的子图。 |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Graph
|
助手的图信息,格式为 JSON。 |
用法示例
client = get_client(url="https://:2024")
graph_info = await client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas async ¶
get_schemas(
assistant_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> GraphSchema
通过 ID 获取一个助手的模式(schemas)。
| 参数 | 描述 |
|---|---|
assistant_id
|
要获取其模式的助手的 ID。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
GraphSchema
|
助手的图模式。
类型: |
用法示例
client = get_client(url="https://:2024")
schema = await client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'context_schema':
{
'title': 'Context',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs async ¶
get_subgraphs(
assistant_id: str,
namespace: str | None = None,
recurse: bool = False,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Subgraphs
create async ¶
create(
graph_id: str | None,
config: Config | None = None,
*,
context: Context | None = None,
metadata: Json = None,
assistant_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
name: str | None = None,
headers: Mapping[str, str] | None = None,
description: str | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
创建一个新的助手。
当图是可配置的,并且您想基于不同配置创建不同助手时很有用。
| 参数 | 描述 |
|---|---|
graph_id
|
助手应使用的图的 ID。图 ID通常在您的 langgraph.json 配置文件中设置。
类型: |
配置
|
用于图的配置。
类型: |
metadata
|
要添加到助手的元数据。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
assistant_id
|
要使用的助手 ID,如果未提供,将默认为随机的 UUID。
类型: |
if_exists
|
如何处理重复创建。内部默认为 'raise'。必须是 'raise'(如果重复则引发错误)或 'do_nothing'(返回现有助手)。
类型: |
name
|
助手的名称。内部默认为 'Untitled'。
类型: |
headers
|
请求中包含的可选自定义头部。 |
描述
|
助手的可选描述。description 字段适用于 langgraph-api 服务器版本 >=0.0.45。
类型: |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Assistant
|
已创建的助手。
类型: |
update async ¶
update(
assistant_id: str,
*,
graph_id: str | None = None,
config: Config | None = None,
context: Context | None = None,
metadata: Json = None,
name: str | None = None,
headers: Mapping[str, str] | None = None,
description: str | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
更新一个助手。
使用此方法指向不同的图、更新配置或更改助手的元数据。
| 参数 | 描述 |
|---|---|
assistant_id
|
要更新的助手。
类型: |
graph_id
|
助手应使用的图的 ID。图 ID 通常在您的 langgraph.json 配置文件中设置。如果为 `None`,助手将继续指向同一个图。
类型: |
配置
|
用于图的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
metadata
|
与现有助手元数据合并的元数据。
类型: |
name
|
助手的新名称。
类型: |
headers
|
请求中包含的可选自定义头部。 |
描述
|
助手的可选描述。description 字段适用于 langgraph-api 服务器版本 >=0.0.45。
类型: |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Assistant
|
已更新的助手。 |
delete async ¶
search async ¶
search(
*,
metadata: Json = None,
graph_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: AssistantSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[AssistantSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Assistant]
搜索助手。
| 参数 | 描述 |
|---|---|
metadata
|
用于筛选的元数据。对每个键值对进行精确匹配筛选。
类型: |
graph_id
|
用于筛选的图 ID。图 ID 通常在您的 langgraph.json 配置文件中设置。
类型: |
limit
|
要返回的最大结果数。
类型: |
offset
|
要跳过的结果数。
类型: |
sort_by
|
用于排序的字段。
类型: |
sort_order
|
排序顺序。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Assistant]
|
助手列表。 |
count async ¶
get_versions async ¶
get_versions(
assistant_id: str,
metadata: Json = None,
limit: int = 10,
offset: int = 0,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[AssistantVersion]
set_latest async ¶
set_latest(
assistant_id: str,
version: int,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
ThreadsClient ¶
用于在 LangGraph 中管理线程的客户端。
线程在多次交互/调用(即 runs)中维持图的状态。它会累积并持久化图的状态,从而允许在图的单独调用之间保持连续性。
示例
| 方法 | 描述 |
|---|---|
get |
通过 ID 获取一个线程。 |
create |
创建一个新线程。 |
更新 |
更新一个线程。 |
delete |
删除一个线程。 |
search |
搜索线程。 |
count |
统计匹配筛选条件的线程数量。 |
copy |
复制一个线程。 |
get_state |
获取一个线程的状态。 |
update_state |
更新一个线程的状态。 |
get_history |
获取一个线程的状态历史。 |
join_stream |
获取一个线程的事件流。 |
get async ¶
get(
thread_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread
create async ¶
create(
*,
metadata: Json = None,
thread_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
graph_id: str | None = None,
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread
创建一个新线程。
| 参数 | 描述 |
|---|---|
metadata
|
要添加到线程的元数据。
类型: |
thread_id
|
线程的 ID。如果为 `None`,ID 将是一个随机生成的 UUID。
类型: |
if_exists
|
如何处理重复创建。内部默认为 'raise'。必须是 'raise'(如果重复则引发错误)或 'do_nothing'(返回现有线程)。
类型: |
supersteps
|
在创建线程时应用一系列超步骤(supersteps),每个超步骤包含一系列更新。每个更新都有 `values` 或 `command` 以及 `as_node`。用于在部署之间复制线程。
类型: |
graph_id
|
与线程关联的可选图 ID。
类型: |
ttl
|
线程的可选生存时间(分钟)。您可以传入一个整数(分钟)或一个包含 `ttl` 键和可选 `strategy` 键(默认为 "delete")的映射。 |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Thread
|
已创建的线程。 |
update async ¶
update(
thread_id: str,
*,
metadata: Mapping[str, Any],
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread
delete async ¶
search async ¶
search(
*,
metadata: Json = None,
values: Json = None,
ids: Sequence[str] | None = None,
status: ThreadStatus | None = None,
limit: int = 10,
offset: int = 0,
sort_by: ThreadSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[ThreadSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Thread]
搜索线程。
| 参数 | 描述 |
|---|---|
metadata
|
用于筛选的线程元数据。
类型: |
values
|
用于筛选的状态值。
类型: |
ids
|
用于筛选的线程 ID 列表。 |
status
|
用于筛选的线程状态。必须是 'idle'、'busy'、'interrupted' 或 'error' 之一。
类型: |
limit
|
返回的线程数量限制。
类型: |
offset
|
在线程表中开始搜索的偏移量。
类型: |
sort_by
|
排序字段。
类型: |
sort_order
|
排序顺序。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Thread]
|
匹配搜索参数的线程列表。 |
count async ¶
copy async ¶
get_state async ¶
get_state(
thread_id: str,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
*,
subgraphs: bool = False,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadState
获取一个线程的状态。
| 参数 | 描述 |
|---|---|
thread_id
|
要获取其状态的线程的 ID。
类型: |
checkpoint
|
要获取其状态的检查点。
类型: |
checkpoint_id
|
(已弃用)要获取其状态的检查点 ID。
类型: |
subgraphs
|
包含子图状态。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
ThreadState
|
状态所属的线程。 |
用法示例
client = get_client(url="https://:2024)
thread_state = await client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state async ¶
update_state(
thread_id: str,
values: dict[str, Any] | Sequence[dict] | None,
*,
as_node: str | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadUpdateStateResponse
更新一个线程的状态。
| 参数 | 描述 |
|---|---|
thread_id
|
要更新的线程的 ID。
类型: |
values
|
用于更新状态的值。 |
as_node
|
如同此节点刚刚执行过一样更新状态。
类型: |
checkpoint
|
要更新其状态的检查点。
类型: |
checkpoint_id
|
(已弃用)要更新其状态的检查点 ID。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
ThreadUpdateStateResponse
|
更新线程状态后的响应。 |
用法示例
client = get_client(url="https://:2024)
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history async ¶
get_history(
thread_id: str,
*,
limit: int = 10,
before: str | Checkpoint | None = None,
metadata: Mapping[str, Any] | None = None,
checkpoint: Checkpoint | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[ThreadState]
获取一个线程的状态历史。
| 参数 | 描述 |
|---|---|
thread_id
|
要获取其状态历史的线程的 ID。
类型: |
checkpoint
|
返回此子图的状态。如果为空,则默认为根图。
类型: |
limit
|
要返回的最大状态数。
类型: |
before
|
返回此检查点之前的状态。
类型: |
metadata
|
通过元数据键值对筛选状态。 |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
list[ThreadState]
|
线程的状态历史。 |
join_stream async ¶
join_stream(
thread_id: str,
*,
last_event_id: str | None = None,
stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> AsyncIterator[StreamPart]
获取一个线程的事件流。
| 参数 | 描述 |
|---|---|
thread_id
|
要获取其流的线程的 ID。
类型: |
last_event_id
|
要获取的最后一个事件的 ID。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
AsyncIterator[StreamPart]
|
流部分的迭代器。 |
RunsClient ¶
用于在 LangGraph 中管理运行(runs)的客户端。
一次运行是助手的单次调用,可以带有可选的输入、配置、上下文和元数据。此客户端管理运行,运行可以是有状态的(在线程上)或无状态的。
示例
| 方法 | 描述 |
|---|---|
stream |
创建一个运行并流式传输结果。 |
create |
创建一个后台运行。 |
create_batch |
创建一批无状态的后台运行。 |
wait |
创建一个运行,等待其完成并返回最终状态。 |
list |
列出运行。 |
get |
获取一个运行。 |
cancel |
获取一个运行。 |
join |
阻塞直到一个运行完成。返回线程的最终状态。 |
join_stream |
实时流式传输一个运行的输出,直到运行完成。 |
delete |
删除一个运行。 |
stream ¶
stream(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
feedback_keys: Sequence[str] | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> AsyncIterator[StreamPart]
创建一个运行并流式传输结果。
| 参数 | 描述 |
|---|---|
thread_id
|
分配给线程的线程 ID。如果为 `None`,将创建一个无状态的运行。
类型: |
assistant_id
|
要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
输入
|
图的输入。 |
command
|
要执行的命令。不能与 input 一起使用。
类型: |
stream_mode
|
要使用的流模式。
类型: |
stream_subgraphs
|
是否从子图流式传输输出。
类型: |
stream_resumable
|
流是否被认为是可恢复的。如果为 true,即使在断开连接后,流也可以恢复并完整地重播。
类型: |
metadata
|
分配给运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint
|
要从中恢复的检查点。
类型: |
checkpoint_during
|
(已弃用)是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
feedback_keys
|
分配给运行的反馈键。 |
on_disconnect
|
要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
类型: |
on_completion
|
是删除还是保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
类型: |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
if_not_exists
|
如何处理缺失的线程。默认为 'reject'。必须是 'reject'(如果缺失则引发错误)或 'create'(创建新线程)。
类型: |
after_seconds
|
在开始运行前等待的秒数。用于安排未来的运行。
TYPE: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
on_run_created
|
创建运行时调用的回调函数。
类型: |
durability
|
用于运行的持久性。值为 "sync"、"async" 或 "exit"。 "async" 表示检查点在下一个图步骤执行时异步持久化,取代 checkpoint_during=True "sync" 表示检查点在图步骤执行后同步持久化,取代 checkpoint_during=False "exit" 表示检查点仅在运行退出时持久化,不保存中间步骤。
类型: |
| 返回 | 描述 |
|---|---|
AsyncIterator[StreamPart]
|
流结果的异步迭代器。 |
用法示例
client = get_client(url="https://:2024)
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
context={"model_name": "anthropic"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create async ¶
create(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
on_completion: OnCompletionBehavior | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> Run
创建一个后台运行。
| 参数 | 描述 |
|---|---|
thread_id
|
分配给线程的线程 ID。如果为 `None`,将创建一个无状态的运行。
类型: |
assistant_id
|
要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
输入
|
图的输入。 |
command
|
要执行的命令。不能与 input 一起使用。
类型: |
stream_mode
|
要使用的流模式。
类型: |
stream_subgraphs
|
是否从子图流式传输输出。
类型: |
stream_resumable
|
流是否被认为是可恢复的。如果为 true,即使在断开连接后,流也可以恢复并完整地重播。
类型: |
metadata
|
分配给运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint
|
要从中恢复的检查点。
类型: |
checkpoint_during
|
(已弃用)是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
on_completion
|
是删除还是保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
类型: |
if_not_exists
|
如何处理缺失的线程。默认为 'reject'。必须是 'reject'(如果缺失则引发错误)或 'create'(创建新线程)。
类型: |
after_seconds
|
在开始运行前等待的秒数。用于安排未来的运行。
TYPE: |
headers
|
请求中包含的可选自定义头部。 |
on_run_created
|
创建运行时调用的可选回调函数。
类型: |
durability
|
用于运行的持久性。值为 "sync"、"async" 或 "exit"。 "async" 表示检查点在下一个图步骤执行时异步持久化,取代 checkpoint_during=True "sync" 表示检查点在图步骤执行后同步持久化,取代 checkpoint_during=False "exit" 表示检查点仅在运行退出时持久化,不保存中间步骤。
类型: |
| 返回 | 描述 |
|---|---|
Run
|
已创建的后台运行。 |
用法示例
background_run = await client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
context={"model_name": "openai"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'assistant_id': 'my_assistant_id'
},
},
'context':
{
'model_name': 'openai'
}
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch async ¶
create_batch(
payloads: list[RunCreate],
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]
创建一批无状态的后台运行。
wait async ¶
wait(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
raise_error: bool = True,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> list[dict] | dict[str, Any]
创建一个运行,等待其完成并返回最终状态。
| 参数 | 描述 |
|---|---|
thread_id
|
要在其上创建运行的线程 ID。如果为 `None`,将创建一个无状态的运行。
类型: |
assistant_id
|
要运行的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
输入
|
图的输入。 |
command
|
要执行的命令。不能与 input 一起使用。
类型: |
metadata
|
分配给运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint
|
要从中恢复的检查点。
类型: |
checkpoint_during
|
(已弃用)是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
on_disconnect
|
要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
类型: |
on_completion
|
是删除还是保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
if_not_exists
|
如何处理缺失的线程。默认为 'reject'。必须是 'reject'(如果缺失则引发错误)或 'create'(创建新线程)。
类型: |
after_seconds
|
在开始运行前等待的秒数。用于安排未来的运行。
TYPE: |
headers
|
请求中包含的可选自定义头部。 |
on_run_created
|
创建运行时调用的可选回调函数。
类型: |
durability
|
用于运行的持久性。值为 "sync"、"async" 或 "exit"。 "async" 表示检查点在下一个图步骤执行时异步持久化,取代 checkpoint_during=True "sync" 表示检查点在图步骤执行后同步持久化,取代 checkpoint_during=False "exit" 表示检查点仅在运行退出时持久化,不保存中间步骤。
类型: |
| 返回 | 描述 |
|---|---|
list[dict] | dict[str, Any]
|
运行的输出。 |
用法示例
client = get_client(url="https://:2024")
final_state_of_run = await client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
context={"model_name": "anthropic"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list async ¶
list(
thread_id: str,
*,
limit: int = 10,
offset: int = 0,
status: RunStatus | None = None,
select: list[RunSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]
get async ¶
get(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run
cancel async ¶
cancel(
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
获取一个运行。
| 参数 | 描述 |
|---|---|
thread_id
|
要取消的线程 ID。
类型: |
run_id
|
要取消的运行 ID。
类型: |
wait
|
是否等待运行完成。
类型: |
action
|
取消运行时要采取的操作。可能的值为 `interrupt` 或 `rollback`。默认为 `interrupt`。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
None
|
|
join async ¶
join(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> dict
join_stream ¶
join_stream(
thread_id: str,
run_id: str,
*,
cancel_on_disconnect: bool = False,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
last_event_id: str | None = None,
) -> AsyncIterator[StreamPart]
实时流式传输一个运行的输出,直到运行完成。输出不进行缓冲,因此在此调用之前产生的任何输出都将不会在这里接收到。
| 参数 | 描述 |
|---|---|
thread_id
|
要加入的线程 ID。
类型: |
run_id
|
要加入的运行 ID。
类型: |
cancel_on_disconnect
|
当流断开连接时是否取消运行。
类型: |
stream_mode
|
要使用的流模式。必须是创建运行时传递的流模式的子集。后台运行默认具有所有流模式的并集。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
last_event_id
|
用于流的最后一个事件 ID。
类型: |
| 返回 | 描述 |
|---|---|
AsyncIterator[StreamPart]
|
部分流。 |
delete async ¶
delete(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
CronClient ¶
用于在 LangGraph 中管理周期性运行(cron 作业)的客户端。
一次运行是助手的单次调用,可以带有可选的输入、配置和上下文。此客户端允许安排周期性运行以自动发生。
用法示例
功能可用性
crons 客户端功能并非在所有许可证上都受支持。请查看相关许可证文档以获取有关功能可用性的最新详细信息。
| 方法 | 描述 |
|---|---|
create_for_thread |
为线程创建一个 cron 作业。 |
create |
创建一个 cron 运行。 |
delete |
删除一个 cron。 |
search |
获取 cron 作业列表。 |
count |
统计匹配筛选条件的 cron 作业数量。 |
create_for_thread async ¶
create_for_thread(
thread_id: str,
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
multitask_strategy: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run
为线程创建一个 cron 作业。
| 参数 | 描述 |
|---|---|
thread_id
|
要在其上运行 cron 作业的线程 ID。
类型: |
assistant_id
|
用于 cron 作业的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
schedule
|
执行此作业的 cron 时间表。
类型: |
输入
|
图的输入。 |
metadata
|
分配给 cron 作业运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint_during
|
是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Run
|
cron 运行。 |
用法示例
client = get_client(url="https://:2024")
cron_run = await client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
context={"model_name": "openai"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create async ¶
create(
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
multitask_strategy: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run
创建一个 cron 运行。
| 参数 | 描述 |
|---|---|
assistant_id
|
用于 cron 作业的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
schedule
|
执行此作业的 cron 时间表。
类型: |
输入
|
图的输入。 |
metadata
|
分配给 cron 作业运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint_during
|
是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Run
|
cron 运行。 |
用法示例
client = get_client(url="https://:2024")
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
context={"model_name": "openai"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete async ¶
search async ¶
search(
*,
assistant_id: str | None = None,
thread_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: CronSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[CronSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Cron]
获取 cron 作业列表。
| 参数 | 描述 |
|---|---|
assistant_id
|
要搜索的助手 ID 或图名称。
类型: |
thread_id
|
要搜索的线程 ID。
类型: |
limit
|
要返回的最大结果数。
类型: |
offset
|
要跳过的结果数。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
list[Cron]
|
搜索返回的 cron 作业列表。 |
用法示例
client = get_client(url="https://:2024")
cron_jobs = await client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
StoreClient ¶
用于与图的共享存储进行交互的客户端。
存储(Store)提供了一个键值存储系统,用于在图执行之间持久化数据,从而允许有状态操作和跨线程的数据共享。
示例
| 方法 | 描述 |
|---|---|
put_item |
存储或更新一个项目。 |
get_item |
检索单个项目。 |
delete_item |
删除一个项目。 |
search_items |
在命名空间前缀内搜索项目。 |
list_namespaces |
列出具有可选匹配条件的命名空间。 |
put_item async ¶
put_item(
namespace: Sequence[str],
/,
key: str,
value: Mapping[str, Any],
index: Literal[False] | list[str] | None = None,
ttl: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
get_item async ¶
get_item(
namespace: Sequence[str],
/,
key: str,
*,
refresh_ttl: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Item
检索单个项目。
| 参数 | 描述 |
|---|---|
key
|
项目的唯一标识符。
类型: |
namespace
|
表示命名空间路径的可选字符串列表。 |
refresh_ttl
|
是否在此读取操作上刷新 TTL。如果为 `None`,则使用存储的默认行为。
类型: |
| 返回 | 描述 |
|---|---|
Item
|
检索到的项目。
类型: |
headers
|
请求中包含的可选自定义头部。
类型: |
params
|
请求中包含的可选查询参数。
类型: |
delete_item async ¶
delete_item(
namespace: Sequence[str],
/,
key: str,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
删除一个项目。
| 参数 | 描述 |
|---|---|
key
|
项目的唯一标识符。
类型: |
namespace
|
表示命名空间路径的可选字符串列表。 |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
None
|
|
search_items async ¶
search_items(
namespace_prefix: Sequence[str],
/,
filter: Mapping[str, Any] | None = None,
limit: int = 10,
offset: int = 0,
query: str | None = None,
refresh_ttl: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> SearchItemsResponse
在命名空间前缀内搜索项目。
| 参数 | 描述 |
|---|---|
namespace_prefix
|
表示命名空间前缀的字符串列表。 |
filter
|
用于筛选结果的可选键值对字典。 |
limit
|
返回的最大项目数(默认为 10)。
类型: |
offset
|
返回结果前要跳过的项目数(默认为 0)。
类型: |
query
|
用于自然语言搜索的可选查询。
类型: |
refresh_ttl
|
是否刷新此搜索返回的项目的 TTL。如果为 `None`,则使用存储的默认行为。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
SearchItemsResponse
|
匹配搜索条件的项目列表。 |
用法示例
client = get_client(url="https://:2024")
items = await client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces async ¶
list_namespaces(
prefix: list[str] | None = None,
suffix: list[str] | None = None,
max_depth: int | None = None,
limit: int = 100,
offset: int = 0,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ListNamespaceResponse
列出具有可选匹配条件的命名空间。
| 参数 | 描述 |
|---|---|
prefix
|
表示用于筛选命名空间的前缀的可选字符串列表。 |
后缀
|
表示用于筛选命名空间的后缀的可选字符串列表。 |
max_depth
|
指定要返回的命名空间最大深度的可选整数。
TYPE: |
limit
|
返回的最大命名空间数(默认为 100)。
类型: |
offset
|
返回结果前要跳过的命名空间数(默认为 0)。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
ListNamespaceResponse
|
匹配条件的命名空间列表。 |
用法示例
client = get_client(url="https://:2024")
namespaces = await client.store.list_namespaces(
prefix=["documents"],
max_depth=3,
limit=10,
offset=0
)
print(namespaces)
----------------------------------------------------------------
[
["documents", "user123", "reports"],
["documents", "user456", "invoices"],
...
]
SyncLangGraphClient ¶
用于与 LangGraph API 交互的同步客户端。
此类提供对 LangGraph API 端点的同步访问,用于管理助手、线程、运行、cron 作业和数据存储。
| 方法 | 描述 |
|---|---|
__enter__ |
进入同步上下文管理器。 |
__exit__ |
退出同步上下文管理器。 |
close |
关闭底层的 HTTP 客户端。 |
__exit__ ¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
退出同步上下文管理器。
SyncHttpClient ¶
处理对 LangGraph API 的同步请求。
在底层 httpx 客户端之上提供错误消息和内容处理增强功能,其接口镜像了 HttpClient,但用于同步使用。
| 属性 | 描述 |
|---|---|
client |
底层的 HTTPX 同步客户端。
类型: |
| 方法 | 描述 |
|---|---|
get |
发送一个 |
post |
发送一个 |
put |
发送一个 |
patch |
发送一个 |
delete |
发送一个 |
request_reconnect |
发送一个请求,该请求会自动重新连接到 Location 头部指定的地址。 |
stream |
使用 SSE 流式传输请求结果。 |
get ¶
get(
path: str,
*,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 GET 请求。
post ¶
post(
path: str,
*,
json: dict[str, Any] | list | None,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 POST 请求。
put ¶
put(
path: str,
*,
json: dict,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 PUT 请求。
patch ¶
patch(
path: str,
*,
json: dict,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> Any
发送一个 PATCH 请求。
delete ¶
delete(
path: str,
*,
json: Any | None = None,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
) -> None
发送一个 DELETE 请求。
request_reconnect ¶
request_reconnect(
path: str,
method: str,
*,
json: dict[str, Any] | None = None,
params: QueryParamTypes | None = None,
headers: Mapping[str, str] | None = None,
on_response: Callable[[Response], None] | None = None,
reconnect_limit: int = 5,
) -> Any
发送一个请求,该请求会自动重新连接到 Location 头部指定的地址。
SyncAssistantsClient ¶
用于同步管理 LangGraph 中助手的客户端。
此类提供与助手交互的方法,助手是图的版本化配置。
示例
| 方法 | 描述 |
|---|---|
get |
通过 ID 获取一个助手。 |
get_graph |
通过 ID 获取一个助手的图。 |
get_schemas |
通过 ID 获取一个助手的模式(schemas)。 |
get_subgraphs |
通过 ID 获取一个助手的模式(schemas)。 |
create |
创建一个新的助手。 |
更新 |
更新一个助手。 |
delete |
删除一个助手。 |
search |
搜索助手。 |
count |
统计匹配筛选条件的助手数量。 |
get_versions |
列出一个助手的所有版本。 |
set_latest |
更改一个助手的版本。 |
get ¶
get(
assistant_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
get_graph ¶
get_graph(
assistant_id: str,
*,
xray: int | bool = False,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> dict[str, list[dict[str, Any]]]
通过 ID 获取一个助手的图。
| 参数 | 描述 |
|---|---|
assistant_id
|
要获取其图的助手的 ID。
类型: |
xray
|
包含子图的图表示。如果提供一个整数值,则只包含深度小于或等于该值的子图。 |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
dict[str, list[dict[str, Any]]]
|
助手的图信息,格式为 JSON。 |
用法示例
client = get_sync_client(url="https://:2024")
graph_info = client.assistants.get_graph(
assistant_id="my_assistant_id"
)
print(graph_info)
--------------------------------------------------------------------------------------------------------------------------
{
'nodes':
[
{'id': '__start__', 'type': 'schema', 'data': '__start__'},
{'id': '__end__', 'type': 'schema', 'data': '__end__'},
{'id': 'agent','type': 'runnable','data': {'id': ['langgraph', 'utils', 'RunnableCallable'],'name': 'agent'}},
],
'edges':
[
{'source': '__start__', 'target': 'agent'},
{'source': 'agent','target': '__end__'}
]
}
get_schemas ¶
get_schemas(
assistant_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> GraphSchema
通过 ID 获取一个助手的模式(schemas)。
| 参数 | 描述 |
|---|---|
assistant_id
|
要获取其模式的助手的 ID。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
GraphSchema
|
助手的图模式。
类型: |
用法示例
client = get_sync_client(url="https://:2024")
schema = client.assistants.get_schemas(
assistant_id="my_assistant_id"
)
print(schema)
----------------------------------------------------------------------------------------------------------------------------
{
'graph_id': 'agent',
'state_schema':
{
'title': 'LangGraphInput',
'$ref': '#/definitions/AgentState',
'definitions':
{
'BaseMessage':
{
'title': 'BaseMessage',
'description': 'Base abstract Message class. Messages are the inputs and outputs of ChatModels.',
'type': 'object',
'properties':
{
'content':
{
'title': 'Content',
'anyOf': [
{'type': 'string'},
{'type': 'array','items': {'anyOf': [{'type': 'string'}, {'type': 'object'}]}}
]
},
'additional_kwargs':
{
'title': 'Additional Kwargs',
'type': 'object'
},
'response_metadata':
{
'title': 'Response Metadata',
'type': 'object'
},
'type':
{
'title': 'Type',
'type': 'string'
},
'name':
{
'title': 'Name',
'type': 'string'
},
'id':
{
'title': 'Id',
'type': 'string'
}
},
'required': ['content', 'type']
},
'AgentState':
{
'title': 'AgentState',
'type': 'object',
'properties':
{
'messages':
{
'title': 'Messages',
'type': 'array',
'items': {'$ref': '#/definitions/BaseMessage'}
}
},
'required': ['messages']
}
}
},
'config_schema':
{
'title': 'Configurable',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
},
'context_schema':
{
'title': 'Context',
'type': 'object',
'properties':
{
'model_name':
{
'title': 'Model Name',
'enum': ['anthropic', 'openai'],
'type': 'string'
}
}
}
}
get_subgraphs ¶
create ¶
create(
graph_id: str | None,
config: Config | None = None,
*,
context: Context | None = None,
metadata: Json = None,
assistant_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
name: str | None = None,
headers: Mapping[str, str] | None = None,
description: str | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
创建一个新的助手。
当图是可配置的,并且您想基于不同配置创建不同助手时很有用。
| 参数 | 描述 |
|---|---|
graph_id
|
助手应使用的图的 ID。图 ID通常在您的 langgraph.json 配置文件中设置。
类型: |
配置
|
用于图的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
metadata
|
要添加到助手的元数据。
类型: |
assistant_id
|
要使用的助手 ID,如果未提供,将默认为随机的 UUID。
类型: |
if_exists
|
如何处理重复创建。内部默认为 'raise'。必须是 'raise'(如果重复则引发错误)或 'do_nothing'(返回现有助手)。
类型: |
name
|
助手的名称。内部默认为 'Untitled'。
类型: |
headers
|
请求中包含的可选自定义头部。 |
描述
|
助手的可选描述。description 字段适用于 langgraph-api 服务器版本 >=0.0.45。
类型: |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Assistant
|
已创建的助手。 |
update ¶
update(
assistant_id: str,
*,
graph_id: str | None = None,
config: Config | None = None,
context: Context | None = None,
metadata: Json = None,
name: str | None = None,
headers: Mapping[str, str] | None = None,
description: str | None = None,
params: QueryParamTypes | None = None,
) -> Assistant
更新一个助手。
使用此方法指向不同的图、更新配置或更改助手的元数据。
| 参数 | 描述 |
|---|---|
assistant_id
|
要更新的助手。
类型: |
graph_id
|
助手应使用的图的 ID。图 ID 通常在您的 langgraph.json 配置文件中设置。如果为 `None`,助手将继续指向同一个图。
类型: |
配置
|
用于图的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
metadata
|
与现有助手元数据合并的元数据。
类型: |
name
|
助手的新名称。
类型: |
headers
|
请求中包含的可选自定义头部。 |
描述
|
助手的可选描述。description 字段适用于 langgraph-api 服务器版本 >=0.0.45。
类型: |
| 返回 | 描述 |
|---|---|
Assistant
|
已更新的助手。 |
delete ¶
search ¶
search(
*,
metadata: Json = None,
graph_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: AssistantSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[AssistantSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Assistant]
count ¶
get_versions ¶
get_versions(
assistant_id: str,
metadata: Json = None,
limit: int = 10,
offset: int = 0,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[AssistantVersion]
SyncThreadsClient ¶
用于同步管理 LangGraph 中线程的客户端。
此类提供创建、检索和管理线程的方法,线程代表对话或有状态的交互。
示例
| 方法 | 描述 |
|---|---|
get |
通过 ID 获取一个线程。 |
create |
创建一个新线程。 |
更新 |
更新一个线程。 |
delete |
删除一个线程。 |
search |
搜索线程。 |
count |
统计匹配筛选条件的线程数量。 |
copy |
复制一个线程。 |
get_state |
获取一个线程的状态。 |
update_state |
更新一个线程的状态。 |
get_history |
获取一个线程的状态历史。 |
join_stream |
获取一个线程的事件流。 |
get ¶
get(
thread_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread
create ¶
create(
*,
metadata: Json = None,
thread_id: str | None = None,
if_exists: OnConflictBehavior | None = None,
supersteps: Sequence[dict[str, Sequence[dict[str, Any]]]] | None = None,
graph_id: str | None = None,
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread
创建一个新线程。
| 参数 | 描述 |
|---|---|
metadata
|
要添加到线程的元数据。
类型: |
thread_id
|
线程的 ID。如果为 `None`,ID 将是一个随机生成的 UUID。
类型: |
if_exists
|
如何处理重复创建。内部默认为 'raise'。必须是 'raise'(如果重复则引发错误)或 'do_nothing'(返回现有线程)。
类型: |
supersteps
|
在创建线程时应用一系列超步骤(supersteps),每个超步骤包含一系列更新。每个更新都有 `values` 或 `command` 以及 `as_node`。用于在部署之间复制线程。
类型: |
graph_id
|
与线程关联的可选图 ID。
类型: |
ttl
|
线程的可选生存时间(分钟)。您可以传入一个整数(分钟)或一个包含 `ttl` 键和可选 `strategy` 键(默认为 "delete")的映射。 |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
Thread
|
已创建的 |
update ¶
update(
thread_id: str,
*,
metadata: Mapping[str, Any],
ttl: int | Mapping[str, Any] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Thread
delete ¶
search ¶
search(
*,
metadata: Json = None,
values: Json = None,
ids: Sequence[str] | None = None,
status: ThreadStatus | None = None,
limit: int = 10,
offset: int = 0,
sort_by: ThreadSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[ThreadSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Thread]
搜索线程。
| 参数 | 描述 |
|---|---|
metadata
|
用于筛选的线程元数据。
类型: |
values
|
用于筛选的状态值。
类型: |
ids
|
用于筛选的线程 ID 列表。 |
status
|
用于筛选的线程状态。必须是 'idle'、'busy'、'interrupted' 或 'error' 之一。
类型: |
limit
|
返回的线程数量限制。
类型: |
offset
|
在线程表中开始搜索的偏移量。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
list[Thread]
|
匹配搜索参数的线程列表。 |
count ¶
copy ¶
get_state ¶
get_state(
thread_id: str,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
*,
subgraphs: bool = False,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadState
获取一个线程的状态。
| 参数 | 描述 |
|---|---|
thread_id
|
要获取其状态的线程的 ID。
类型: |
checkpoint
|
要获取其状态的检查点。
类型: |
subgraphs
|
包含子图状态。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
ThreadState
|
状态所属的线程。 |
用法示例
client = get_sync_client(url="https://:2024")
thread_state = client.threads.get_state(
thread_id="my_thread_id",
checkpoint_id="my_checkpoint_id"
)
print(thread_state)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'values': {
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
},
'next': [],
'checkpoint':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1'
}
'metadata':
{
'step': 1,
'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2',
'source': 'loop',
'writes':
{
'agent':
{
'messages': [
{
'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b',
'name': None,
'type': 'ai',
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'example': False,
'tool_calls': [],
'usage_metadata': None,
'additional_kwargs': {},
'response_metadata': {},
'invalid_tool_calls': []
}
]
}
},
'user_id': None,
'graph_id': 'agent',
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'created_by': 'system',
'assistant_id': 'fe096781-5601-53d2-b2f6-0d3403f7e9ca'},
'created_at': '2024-07-25T15:35:44.184703+00:00',
'parent_config':
{
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-d80d-6fa7-8000-9300467fad0f'
}
}
update_state ¶
update_state(
thread_id: str,
values: dict[str, Any] | Sequence[dict] | None,
*,
as_node: str | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ThreadUpdateStateResponse
更新一个线程的状态。
| 参数 | 描述 |
|---|---|
thread_id
|
要更新的线程的 ID。
类型: |
values
|
用于更新状态的值。 |
as_node
|
如同此节点刚刚执行过一样更新状态。
类型: |
checkpoint
|
要更新其状态的检查点。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
ThreadUpdateStateResponse
|
更新线程状态后的响应。 |
用法示例
response = await client.threads.update_state(
thread_id="my_thread_id",
values={"messages":[{"role": "user", "content": "hello!"}]},
as_node="my_node",
)
print(response)
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
{
'checkpoint': {
'thread_id': 'e2496803-ecd5-4e0c-a779-3226296181c2',
'checkpoint_ns': '',
'checkpoint_id': '1ef4a9b8-e6fb-67b1-8001-abd5184439d1',
'checkpoint_map': {}
}
}
get_history ¶
get_history(
thread_id: str,
*,
limit: int = 10,
before: str | Checkpoint | None = None,
metadata: Mapping[str, Any] | None = None,
checkpoint: Checkpoint | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[ThreadState]
获取一个线程的状态历史。
| 参数 | 描述 |
|---|---|
thread_id
|
要获取其状态历史的线程的 ID。
类型: |
checkpoint
|
返回此子图的状态。如果为空,则默认为根图。
类型: |
limit
|
要返回的最大状态数。
类型: |
before
|
返回此检查点之前的状态。
类型: |
metadata
|
通过元数据键值对筛选状态。 |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
list[ThreadState]
|
|
join_stream ¶
join_stream(
thread_id: str,
*,
stream_mode: ThreadStreamMode | Sequence[ThreadStreamMode] = "run_modes",
last_event_id: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Iterator[StreamPart]
获取一个线程的事件流。
| 参数 | 描述 |
|---|---|
thread_id
|
要获取其流的线程的 ID。
类型: |
last_event_id
|
要获取的最后一个事件的 ID。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
Iterator[StreamPart]
|
流部分的迭代器。 |
SyncRunsClient ¶
用于同步管理 LangGraph 中运行的客户端。
此类提供创建、检索和管理运行的方法,运行代表图的单个执行。
示例
| 方法 | 描述 |
|---|---|
stream |
创建一个运行并流式传输结果。 |
create |
创建一个后台运行。 |
create_batch |
创建一批无状态的后台运行。 |
wait |
创建一个运行,等待其完成并返回最终状态。 |
list |
列出运行。 |
get |
获取一个运行。 |
cancel |
获取一个运行。 |
join |
阻塞直到一个运行完成。返回线程的最终状态。 |
join_stream |
实时流式传输一个运行的输出,直到运行完成。 |
delete |
删除一个运行。 |
stream ¶
stream(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
feedback_keys: Sequence[str] | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> Iterator[StreamPart]
创建一个运行并流式传输结果。
| 参数 | 描述 |
|---|---|
thread_id
|
分配给线程的线程 ID。如果为 `None`,将创建一个无状态的运行。
类型: |
assistant_id
|
要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
输入
|
图的输入。 |
command
|
要执行的命令。
类型: |
stream_mode
|
要使用的流模式。
类型: |
stream_subgraphs
|
是否从子图流式传输输出。
类型: |
stream_resumable
|
流是否被认为是可恢复的。如果为 true,即使在断开连接后,流也可以恢复并完整地重播。
类型: |
metadata
|
分配给运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint
|
要从中恢复的检查点。
类型: |
checkpoint_during
|
(已弃用)是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
feedback_keys
|
分配给运行的反馈键。 |
on_disconnect
|
要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
类型: |
on_completion
|
是删除还是保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
类型: |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
if_not_exists
|
如何处理缺失的线程。默认为 'reject'。必须是 'reject'(如果缺失则引发错误)或 'create'(创建新线程)。
类型: |
after_seconds
|
在开始运行前等待的秒数。用于安排未来的运行。
TYPE: |
headers
|
请求中包含的可选自定义头部。 |
on_run_created
|
创建运行时调用的可选回调函数。
类型: |
durability
|
用于运行的持久性。值为 "sync"、"async" 或 "exit"。 "async" 表示检查点在下一个图步骤执行时异步持久化,取代 checkpoint_during=True "sync" 表示检查点在图步骤执行后同步持久化,取代 checkpoint_during=False "exit" 表示检查点仅在运行退出时持久化,不保存中间步骤。
类型: |
| 返回 | 描述 |
|---|---|
Iterator[StreamPart]
|
流结果的迭代器。 |
用法示例
client = get_sync_client(url="https://:2024")
async for chunk in client.runs.stream(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
stream_mode=["values","debug"],
metadata={"name":"my_run"},
context={"model_name": "anthropic"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
feedback_keys=["my_feedback_key_1","my_feedback_key_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
):
print(chunk)
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
StreamPart(event='metadata', data={'run_id': '1ef4a9b8-d7da-679a-a45a-872054341df2'})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}]})
StreamPart(event='values', data={'messages': [{'content': 'how are you?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'fe0a5778-cfe9-42ee-b807-0adaa1873c10', 'example': False}, {'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.", 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'ai', 'name': None, 'id': 'run-159b782c-b679-4830-83c6-cef87798fe8b', 'example': False, 'tool_calls': [], 'invalid_tool_calls': [], 'usage_metadata': None}]})
StreamPart(event='end', data=None)
create ¶
create(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
stream_mode: StreamMode | Sequence[StreamMode] = "values",
stream_subgraphs: bool = False,
stream_resumable: bool = False,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
on_completion: OnCompletionBehavior | None = None,
after_seconds: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> Run
创建一个后台运行。
| 参数 | 描述 |
|---|---|
thread_id
|
分配给线程的线程 ID。如果为 `None`,将创建一个无状态的运行。
类型: |
assistant_id
|
要从中流式传输的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
输入
|
图的输入。 |
command
|
要执行的命令。
类型: |
stream_mode
|
要使用的流模式。
类型: |
stream_subgraphs
|
是否从子图流式传输输出。
类型: |
stream_resumable
|
流是否被认为是可恢复的。如果为 true,即使在断开连接后,流也可以恢复并完整地重播。
类型: |
metadata
|
分配给运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint
|
要从中恢复的检查点。
类型: |
checkpoint_during
|
(已弃用)是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
on_completion
|
是删除还是保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
类型: |
if_not_exists
|
如何处理缺失的线程。默认为 'reject'。必须是 'reject'(如果缺失则引发错误)或 'create'(创建新线程)。
类型: |
after_seconds
|
在开始运行前等待的秒数。用于安排未来的运行。
TYPE: |
headers
|
请求中包含的可选自定义头部。 |
on_run_created
|
创建运行时调用的可选回调函数。
类型: |
durability
|
用于运行的持久性。值为 "sync"、"async" 或 "exit"。 "async" 表示检查点在下一个图步骤执行时异步持久化,取代 checkpoint_during=True "sync" 表示检查点在图步骤执行后同步持久化,取代 checkpoint_during=False "exit" 表示检查点仅在运行退出时持久化,不保存中间步骤。
类型: |
| 返回 | 描述 |
|---|---|
Run
|
已创建的后台 |
用法示例
client = get_sync_client(url="https://:2024")
background_run = client.runs.create(
thread_id="my_thread_id",
assistant_id="my_assistant_id",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
context={"model_name": "openai"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(background_run)
--------------------------------------------------------------------------------
{
'run_id': 'my_run_id',
'thread_id': 'my_thread_id',
'assistant_id': 'my_assistant_id',
'created_at': '2024-07-25T15:35:42.598503+00:00',
'updated_at': '2024-07-25T15:35:42.598503+00:00',
'metadata': {},
'status': 'pending',
'kwargs':
{
'input':
{
'messages': [
{
'role': 'user',
'content': 'how are you?'
}
]
},
'config':
{
'metadata':
{
'created_by': 'system'
},
'configurable':
{
'run_id': 'my_run_id',
'user_id': None,
'graph_id': 'agent',
'thread_id': 'my_thread_id',
'checkpoint_id': None,
'assistant_id': 'my_assistant_id'
}
},
'context':
{
'model_name': 'openai'
},
'webhook': "https://my.fake.webhook.com",
'temporary': False,
'stream_mode': ['values'],
'feedback_keys': None,
'interrupt_after': ["node_to_stop_after_1","node_to_stop_after_2"],
'interrupt_before': ["node_to_stop_before_1","node_to_stop_before_2"]
},
'multitask_strategy': 'interrupt'
}
create_batch ¶
create_batch(
payloads: list[RunCreate],
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]
创建一批无状态的后台运行。
wait ¶
wait(
thread_id: str | None,
assistant_id: str,
*,
input: Mapping[str, Any] | None = None,
command: Command | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
checkpoint: Checkpoint | None = None,
checkpoint_id: str | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
webhook: str | None = None,
on_disconnect: DisconnectMode | None = None,
on_completion: OnCompletionBehavior | None = None,
multitask_strategy: MultitaskStrategy | None = None,
if_not_exists: IfNotExists | None = None,
after_seconds: int | None = None,
raise_error: bool = True,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
on_run_created: Callable[[RunCreateMetadata], None] | None = None,
durability: Durability | None = None,
) -> list[dict] | dict[str, Any]
创建一个运行,等待其完成并返回最终状态。
| 参数 | 描述 |
|---|---|
thread_id
|
要在其上创建运行的线程 ID。如果为 `None`,将创建一个无状态的运行。
类型: |
assistant_id
|
要运行的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
输入
|
图的输入。 |
command
|
要执行的命令。
类型: |
metadata
|
分配给运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint
|
要从中恢复的检查点。
类型: |
checkpoint_during
|
(已弃用)是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
on_disconnect
|
要使用的断开连接模式。必须是 'cancel' 或 'continue' 之一。
类型: |
on_completion
|
是删除还是保留为无状态运行创建的线程。必须是 'delete' 或 'keep' 之一。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
if_not_exists
|
如何处理缺失的线程。默认为 'reject'。必须是 'reject'(如果缺失则引发错误)或 'create'(创建新线程)。
类型: |
after_seconds
|
在开始运行前等待的秒数。用于安排未来的运行。
TYPE: |
raise_error
|
如果运行失败是否引发错误。
类型: |
headers
|
请求中包含的可选自定义头部。 |
on_run_created
|
创建运行时调用的可选回调函数。
类型: |
durability
|
用于运行的持久性。值为 "sync"、"async" 或 "exit"。 "async" 表示检查点在下一个图步骤执行时异步持久化,取代 checkpoint_during=True "sync" 表示检查点在图步骤执行后同步持久化,取代 checkpoint_during=False "exit" 表示检查点仅在运行退出时持久化,不保存中间步骤。
类型: |
| 返回 | 描述 |
|---|---|
list[dict] | dict[str, Any]
|
|
用法示例
final_state_of_run = client.runs.wait(
thread_id=None,
assistant_id="agent",
input={"messages": [{"role": "user", "content": "how are you?"}]},
metadata={"name":"my_run"},
context={"model_name": "anthropic"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
print(final_state_of_run)
-------------------------------------------------------------------------------------------------------------------------------------------
{
'messages': [
{
'content': 'how are you?',
'additional_kwargs': {},
'response_metadata': {},
'type': 'human',
'name': None,
'id': 'f51a862c-62fe-4866-863b-b0863e8ad78a',
'example': False
},
{
'content': "I'm doing well, thanks for asking! I'm an AI assistant created by Anthropic to be helpful, honest, and harmless.",
'additional_kwargs': {},
'response_metadata': {},
'type': 'ai',
'name': None,
'id': 'run-bf1cd3c6-768f-4c16-b62d-ba6f17ad8b36',
'example': False,
'tool_calls': [],
'invalid_tool_calls': [],
'usage_metadata': None
}
]
}
list ¶
list(
thread_id: str,
*,
limit: int = 10,
offset: int = 0,
status: RunStatus | None = None,
select: list[RunSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Run]
get ¶
cancel ¶
cancel(
thread_id: str,
run_id: str,
*,
wait: bool = False,
action: CancelAction = "interrupt",
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
获取一个运行。
| 参数 | 描述 |
|---|---|
thread_id
|
要取消的线程 ID。
类型: |
run_id
|
要取消的运行 ID。
类型: |
wait
|
是否等待运行完成。
类型: |
action
|
取消运行时要采取的操作。可能的值为 `interrupt` 或 `rollback`。默认为 `interrupt`。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
None
|
|
join ¶
join(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> dict
join_stream ¶
join_stream(
thread_id: str,
run_id: str,
*,
cancel_on_disconnect: bool = False,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
last_event_id: str | None = None,
) -> Iterator[StreamPart]
实时流式传输一个运行的输出,直到运行完成。输出不进行缓冲,因此在此调用之前产生的任何输出都将不会在这里接收到。
| 参数 | 描述 |
|---|---|
thread_id
|
要加入的线程 ID。
类型: |
run_id
|
要加入的运行 ID。
类型: |
stream_mode
|
要使用的流模式。必须是创建运行时传递的流模式的子集。后台运行默认具有所有流模式的并集。
类型: |
cancel_on_disconnect
|
当流断开连接时是否取消运行。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
last_event_id
|
用于流的最后一个事件 ID。
类型: |
| 返回 | 描述 |
|---|---|
Iterator[StreamPart]
|
|
delete ¶
delete(
thread_id: str,
run_id: str,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
SyncCronClient ¶
用于同步管理 LangGraph 中 cron 作业的客户端。
此类提供创建和管理定时任务(cron 作业)的方法,用于自动化图的执行。
示例
功能可用性
crons 客户端功能并非在所有许可证上都受支持。请查看相关许可证文档以获取有关功能可用性的最新详细信息。
| 方法 | 描述 |
|---|---|
create_for_thread |
为线程创建一个 cron 作业。 |
create |
创建一个 cron 运行。 |
delete |
删除一个 cron。 |
search |
获取 cron 作业列表。 |
count |
统计匹配筛选条件的 cron 作业数量。 |
create_for_thread ¶
create_for_thread(
thread_id: str,
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
multitask_strategy: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run
为线程创建一个 cron 作业。
| 参数 | 描述 |
|---|---|
thread_id
|
要在其上运行 cron 作业的线程 ID。
类型: |
assistant_id
|
用于 cron 作业的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
schedule
|
执行此作业的 cron 时间表。
类型: |
输入
|
图的输入。 |
metadata
|
分配给 cron 作业运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint_during
|
是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
Run
|
cron |
用法示例
client = get_sync_client(url="https://:8123")
cron_run = client.crons.create_for_thread(
thread_id="my-thread-id",
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
context={"model_name": "openai"},
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
create ¶
create(
assistant_id: str,
*,
schedule: str,
input: Mapping[str, Any] | None = None,
metadata: Mapping[str, Any] | None = None,
config: Config | None = None,
context: Context | None = None,
checkpoint_during: bool | None = None,
interrupt_before: All | list[str] | None = None,
interrupt_after: All | list[str] | None = None,
webhook: str | None = None,
multitask_strategy: str | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Run
创建一个 cron 运行。
| 参数 | 描述 |
|---|---|
assistant_id
|
用于 cron 作业的助手 ID 或图名称。如果使用图名称,将默认使用从该图创建的第一个助手。
类型: |
schedule
|
执行此作业的 cron 时间表。
类型: |
输入
|
图的输入。 |
metadata
|
分配给 cron 作业运行的元数据。 |
配置
|
助手的配置。
类型: |
context
|
要添加到助手的静态上下文。 在 0.6.0 版本中添加
类型: |
checkpoint_during
|
是否在运行期间设置检查点(或仅在结束/中断时设置)。
类型: |
interrupt_before
|
在节点执行前立即中断的节点。 |
interrupt_after
|
在节点执行后立即中断的节点。 |
webhook
|
在 LangGraph API 调用完成后调用的 Webhook。
类型: |
multitask_strategy
|
要使用的多任务策略。必须是 'reject'、'interrupt'、'rollback' 或 'enqueue' 之一。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
Run
|
cron |
用法示例
client = get_sync_client(url="https://:8123")
cron_run = client.crons.create(
assistant_id="agent",
schedule="27 15 * * *",
input={"messages": [{"role": "user", "content": "hello!"}]},
metadata={"name":"my_run"},
context={"model_name": "openai"},
checkpoint_during=True,
interrupt_before=["node_to_stop_before_1","node_to_stop_before_2"],
interrupt_after=["node_to_stop_after_1","node_to_stop_after_2"],
webhook="https://my.fake.webhook.com",
multitask_strategy="interrupt"
)
delete ¶
search ¶
search(
*,
assistant_id: str | None = None,
thread_id: str | None = None,
limit: int = 10,
offset: int = 0,
sort_by: CronSortBy | None = None,
sort_order: SortOrder | None = None,
select: list[CronSelectField] | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> list[Cron]
获取 cron 作业列表。
| 参数 | 描述 |
|---|---|
assistant_id
|
要搜索的助手 ID 或图名称。
类型: |
thread_id
|
要搜索的线程 ID。
类型: |
limit
|
要返回的最大结果数。
类型: |
offset
|
要跳过的结果数。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
list[Cron]
|
搜索返回的 cron 作业列表。 |
用法示例
client = get_sync_client(url="https://:8123")
cron_jobs = client.crons.search(
assistant_id="my_assistant_id",
thread_id="my_thread_id",
limit=5,
offset=5,
)
print(cron_jobs)
----------------------------------------------------------
[
{
'cron_id': '1ef3cefa-4c09-6926-96d0-3dc97fd5e39b',
'assistant_id': 'my_assistant_id',
'thread_id': 'my_thread_id',
'user_id': None,
'payload':
{
'input': {'start_time': ''},
'schedule': '4 * * * *',
'assistant_id': 'my_assistant_id'
},
'schedule': '4 * * * *',
'next_run_date': '2024-07-25T17:04:00+00:00',
'end_time': None,
'created_at': '2024-07-08T06:02:23.073257+00:00',
'updated_at': '2024-07-08T06:02:23.073257+00:00'
}
]
SyncStoreClient ¶
用于对键值存储进行同步操作的客户端。
提供与远程键值存储交互的方法,允许在命名空间层次结构中存储和检索项目。
示例
| 方法 | 描述 |
|---|---|
put_item |
存储或更新一个项目。 |
get_item |
检索单个项目。 |
delete_item |
删除一个项目。 |
search_items |
在命名空间前缀内搜索项目。 |
list_namespaces |
列出具有可选匹配条件的命名空间。 |
put_item ¶
put_item(
namespace: Sequence[str],
/,
key: str,
value: Mapping[str, Any],
index: Literal[False] | list[str] | None = None,
ttl: int | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
get_item ¶
get_item(
namespace: Sequence[str],
/,
key: str,
*,
refresh_ttl: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> Item
检索单个项目。
| 参数 | 描述 |
|---|---|
key
|
项目的唯一标识符。
类型: |
namespace
|
表示命名空间路径的可选字符串列表。 |
refresh_ttl
|
是否在此读取操作上刷新 TTL。如果为 `None`,则使用存储的默认行为。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
Item
|
检索到的项目。 |
delete_item ¶
delete_item(
namespace: Sequence[str],
/,
key: str,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> None
删除一个项目。
| 参数 | 描述 |
|---|---|
key
|
项目的唯一标识符。
类型: |
namespace
|
表示命名空间路径的可选字符串列表。 |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
None
|
|
search_items ¶
search_items(
namespace_prefix: Sequence[str],
/,
filter: Mapping[str, Any] | None = None,
limit: int = 10,
offset: int = 0,
query: str | None = None,
refresh_ttl: bool | None = None,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> SearchItemsResponse
在命名空间前缀内搜索项目。
| 参数 | 描述 |
|---|---|
namespace_prefix
|
表示命名空间前缀的字符串列表。 |
filter
|
用于筛选结果的可选键值对字典。 |
limit
|
返回的最大项目数(默认为 10)。
类型: |
offset
|
返回结果前要跳过的项目数(默认为 0)。
类型: |
query
|
用于自然语言搜索的可选查询。
类型: |
refresh_ttl
|
是否刷新此搜索返回的项目的 TTL。如果为 `None`,则使用存储的默认行为。
类型: |
headers
|
请求中包含的可选自定义头部。 |
params
|
请求中包含的可选查询参数。
类型: |
| 返回 | 描述 |
|---|---|
SearchItemsResponse
|
匹配搜索条件的项目列表。 |
用法示例
client = get_sync_client(url="https://:8123")
items = client.store.search_items(
["documents"],
filter={"author": "John Doe"},
limit=5,
offset=0
)
print(items)
----------------------------------------------------------------
{
"items": [
{
"namespace": ["documents", "user123"],
"key": "item789",
"value": {
"title": "Another Document",
"author": "John Doe"
},
"created_at": "2024-07-30T12:00:00Z",
"updated_at": "2024-07-30T12:00:00Z"
},
# ... additional items ...
]
}
list_namespaces ¶
list_namespaces(
prefix: list[str] | None = None,
suffix: list[str] | None = None,
max_depth: int | None = None,
limit: int = 100,
offset: int = 0,
*,
headers: Mapping[str, str] | None = None,
params: QueryParamTypes | None = None,
) -> ListNamespaceResponse
列出具有可选匹配条件的命名空间。
| 参数 | 描述 |
|---|---|
prefix
|
表示用于筛选命名空间的前缀的可选字符串列表。 |
后缀
|
表示用于筛选命名空间的后缀的可选字符串列表。 |
max_depth
|
指定要返回的命名空间最大深度的可选整数。
TYPE: |
limit
|
返回的最大命名空间数(默认为 100)。
类型: |
offset
|
返回结果前要跳过的命名空间数(默认为 0)。
类型: |
headers
|
请求中包含的可选自定义头部。 |
| 返回 | 描述 |
|---|---|
ListNamespaceResponse
|
匹配条件的命名空间列表。 |
get_client ¶
get_client(
*,
url: str | None = None,
api_key: str | None = None,
headers: Mapping[str, str] | None = None,
timeout: TimeoutTypes | None = None,
) -> LangGraphClient
创建并配置一个 LangGraphClient。
该客户端提供对 LangSmith Deployment 的编程访问。它支持远程服务器和本地进程内连接(在 LangGraph 服务器内部运行时)。
| 参数 | 描述 |
|---|---|
url
|
LangGraph API 的基础 URL。– 如果为
类型: |
api_key
|
用于身份验证的 API 密钥。如果省略,客户端将按以下顺序从环境变量中读取: 1. 函数参数 2.
类型: |
headers
|
要在请求中包含的额外 HTTP 标头。与身份验证标头合并。 |
timeout
|
HTTP 超时配置。可以是: –
类型: |
| 返回 | 描述 |
|---|---|
LangGraphClient
|
一个顶级客户端,公开用于助手、线程、运行和 cron 操作的子客户端。
类型: |
连接到远程服务器
get_sync_client ¶
get_sync_client(
*,
url: str | None = None,
api_key: str | None = None,
headers: Mapping[str, str] | None = None,
timeout: TimeoutTypes | None = None,
) -> SyncLangGraphClient
获取一个同步的 LangGraphClient 实例。
| 参数 | 描述 |
|---|---|
url
|
LangGraph API 的 URL。
类型: |
api_key
|
API 密钥。如果未提供,将从环境中读取。优先级:1. 显式参数 2. LANGGRAPH_API_KEY 3. LANGSMITH_API_KEY 4. LANGCHAIN_API_KEY
类型: |
headers
|
可选的自定义标头 |
timeout
|
HTTP 客户端的可选超时配置。接受 httpx.Timeout 实例、浮点数(秒)或超时元组。元组格式为 (connect, read, write, pool)。如果未提供,则默认为 connect=5s, read=300s, write=300s, and pool=5s。
类型: |
返回:SyncLangGraphClient:用于访问 AssistantsClient、ThreadsClient、RunsClient 和 CronClient 的顶级同步客户端。
langgraph_sdk.schema ¶
用于与 LangGraph API 交互的数据模型。
RunStatus module-attribute ¶
RunStatus = Literal['pending', 'running', 'error', 'success', 'timeout', 'interrupted']
表示运行的状态:- "pending":运行正在等待开始。- "running":运行当前正在执行。- "error":运行遇到错误并停止。- "success":运行成功完成。- "timeout":运行超出其时间限制。- "interrupted":运行被人为停止或中断。
ThreadStatus module-attribute ¶
ThreadStatus = Literal['idle', 'busy', 'interrupted', 'error']
表示线程的状态:- "idle":线程当前未处理任何任务。- "busy":线程正在积极处理任务。- "interrupted":线程的执行被中断。- "error":任务处理期间发生异常。
ThreadStreamMode module-attribute ¶
ThreadStreamMode = Literal['run_modes', 'lifecycle', 'state_update']
定义流模式:- "run_modes":流式传输与线程上运行相同的事件,以及 run_done 事件。- "lifecycle":仅流式传输运行开始/结束事件。- "state_update":流式传输线程上的状态更新。
StreamMode module-attribute ¶
StreamMode = Literal[
"values",
"messages",
"updates",
"events",
"tasks",
"checkpoints",
"debug",
"custom",
"messages-tuple",
]
定义流模式:- "values":仅流式传输值。- "messages":流式传输完整的消息。- "updates":流式传输状态的更新。- "events":流式传输执行期间发生的事件。- "checkpoints":在创建检查点时流式传输它们。- "tasks":流式传输任务开始和结束事件。- "debug":流式传输详细的调试信息。- "custom":流式传输自定义事件。
DisconnectMode module-attribute ¶
DisconnectMode = Literal['cancel', 'continue']
指定断开连接时的行为:- "cancel":断开连接时取消操作。- "continue":即使断开连接也继续操作。
MultitaskStrategy module-attribute ¶
MultitaskStrategy = Literal['reject', 'interrupt', 'rollback', 'enqueue']
定义如何处理多个任务:- "reject":繁忙时拒绝新任务。- "interrupt":为新任务中断当前任务。- "rollback":回滚当前任务并开始新任务。- "enqueue":将新任务排队等待稍后执行。
OnConflictBehavior module-attribute ¶
OnConflictBehavior = Literal['raise', 'do_nothing']
指定冲突时的行为:- "raise":发生冲突时引发异常。- "do_nothing":忽略冲突并继续。
OnCompletionBehavior module-attribute ¶
OnCompletionBehavior = Literal['delete', 'keep']
定义完成后要执行的操作:- "delete":完成后删除资源。- "keep":完成后保留资源。
Durability module-attribute ¶
Durability = Literal['sync', 'async', 'exit']
图执行的持久性模式。- "sync":在下一步开始之前同步持久化更改。- "async":在下一步执行时异步持久化更改。- "exit":仅在图退出时持久化更改。
IfNotExists module-attribute ¶
IfNotExists = Literal['create', 'reject']
指定线程不存在时的行为:- "create":如果线程不存在,则创建新线程。- "reject":如果线程不存在,则拒绝操作。
CancelAction module-attribute ¶
CancelAction = Literal['interrupt', 'rollback']
取消运行时要采取的操作。- "interrupt":仅取消运行。- "rollback":取消运行。然后删除运行和相关的检查点。
AssistantSortBy module-attribute ¶
AssistantSortBy = Literal[
"assistant_id", "graph_id", "name", "created_at", "updated_at"
]
用于排序的字段。
ThreadSortBy module-attribute ¶
ThreadSortBy = Literal['thread_id', 'status', 'created_at', 'updated_at']
用于排序的字段。
CronSortBy module-attribute ¶
CronSortBy = Literal[
"cron_id", "assistant_id", "thread_id", "created_at", "updated_at", "next_run_date"
]
用于排序的字段。
AssistantVersion ¶
Assistant ¶
Interrupt ¶
ThreadUpdateStateResponse ¶
Run ¶
RunCreate ¶
基类:TypedDict
定义启动后台运行的参数。
multitask_strategy instance-attribute ¶
multitask_strategy: MultitaskStrategy | None
处理同一线程上并发运行的策略。
ListNamespaceResponse ¶
SearchItem ¶
SearchItemsResponse ¶
StreamPart ¶
基类:NamedTuple
表示流式响应的一部分。
Send ¶
langgraph_sdk.auth ¶
Auth ¶
向您的 LangGraph 应用程序添加自定义身份验证和授权管理。
Auth 类提供了一个统一的系统,用于处理 LangGraph 应用程序中的身份验证和授权。它支持自定义用户身份验证协议以及针对不同资源和操作的细粒度授权规则。
要使用,请创建一个单独的 python 文件,并将其路径添加到您的 LangGraph API 配置文件(langgraph.json)中。在该文件中,创建 Auth 类的实例,并根据需要注册身份验证和授权处理程序。
langgraph.json 文件示例
{
"dependencies": ["."],
"graphs": {
"agent": "./my_agent/agent.py:graph"
},
"env": ".env",
"auth": {
"path": "./auth.py:my_auth"
}
然后,LangGraph 服务器将在每次请求进入时加载您的 auth 文件并在服务器端运行它。
基本用法
from langgraph_sdk import Auth
my_auth = Auth()
async def verify_token(token: str) -> str:
# Verify token and return user_id
# This would typically be a call to your auth server
return "user_id"
@auth.authenticate
async def authenticate(authorization: str) -> str:
# Verify token and return user_id
result = await verify_token(authorization)
if result != "user_id":
raise Auth.exceptions.HTTPException(
status_code=401, detail="Unauthorized"
)
return result
# Global fallback handler
@auth.on
async def authorize_default(params: Auth.on.value):
return False # Reject all requests (default behavior)
@auth.on.threads.create
async def authorize_thread_create(params: Auth.on.threads.create.value):
# Allow the allowed user to create a thread
assert params.get("metadata", {}).get("owner") == "allowed_user"
@auth.on.store
async def authorize_store(ctx: Auth.types.AuthContext, value: Auth.types.on):
assert ctx.user.identity in value["namespace"], "Not authorized"
请求处理流程
- 首先对每个请求执行身份验证(您的
@auth.authenticate处理程序) - 对于授权,将调用最具体的匹配处理程序
- 如果存在针对确切资源和操作的处理程序,则使用它(例如,
@auth.on.threads.create) - 否则,如果存在针对该资源任何操作的处理程序,则使用它(例如,
@auth.on.threads) - 最后,如果没有匹配的特定处理程序,则使用全局处理程序(例如,
@auth.on) - 如果未设置全局处理程序,则接受该请求
- 如果存在针对确切资源和操作的处理程序,则使用它(例如,
这使您可以使用全局处理程序设置默认行为,同时根据需要覆盖特定路由。
| 方法 | 描述 |
|---|---|
authenticate |
注册一个身份验证处理函数。 |
types class-attribute instance-attribute ¶
types = types
对 auth 类型定义的引用。
提供对 auth 系统中使用的所有类型定义的访问,如 ThreadsCreate、AssistantsRead 等。
exceptions class-attribute instance-attribute ¶
exceptions = exceptions
对 auth 异常定义的引用。
提供对 auth 系统中使用的所有异常定义的访问,如 HTTPException 等。
on instance-attribute ¶
控制对特定资源访问的授权处理程序的入口点。
on 类提供了一种灵活的方式,用于为应用程序中的不同资源和操作定义授权规则。它支持三种主要的使用模式
- 为所有资源和操作运行的全局处理程序
- 为资源上的所有操作运行的特定于资源的处理程序
- 用于细粒度控制的资源和操作特定的处理程序
每个处理程序必须是一个接受两个参数的异步函数
- ctx (AuthContext):包含请求上下文和经过身份验证的用户信息
- value:正在授权的数据(类型因端点而异)
处理程序应返回以下之一
- None or True: Accept the request
- False: Reject with 403 error
- FilterType: Apply filtering rules to the response
示例
适用于所有请求的全局处理程序
@auth.on
async def reject_unhandled_requests(ctx: AuthContext, value: Any) -> None:
print(f"Request to {ctx.path} by {ctx.user.identity}")
return False
特定于资源的处理程序。对于 threads 资源上的所有操作,这将优先于全局处理程序
@auth.on.threads
async def check_thread_access(ctx: AuthContext, value: Any) -> bool:
# Allow access only to threads created by the user
return value.get("created_by") == ctx.user.identity
资源和操作特定的处理程序
@auth.on.threads.delete
async def prevent_thread_deletion(ctx: AuthContext, value: Any) -> bool:
# Only admins can delete threads
return "admin" in ctx.user.permissions
多个资源或操作
@auth.on(resources=["threads", "runs"], actions=["create", "update"])
async def rate_limit_writes(ctx: AuthContext, value: Any) -> bool:
# Implement rate limiting for write operations
return await check_rate_limit(ctx.user.identity)
store 资源的认证有点不同,因为它的结构是由开发者定义的。您通常希望在命名空间中强制执行用户凭据。
authenticate ¶
注册一个身份验证处理函数。
身份验证处理程序负责验证凭据并返回用户范围。它可以通过名称接受以下任何参数
- request (Request): The raw ASGI request object
- path (str): The request path, e.g., "/threads/abcd-1234-abcd-1234/runs/abcd-1234-abcd-1234/stream"
- method (str): The HTTP method, e.g., "GET"
- path_params (dict[str, str]): URL path parameters, e.g., {"thread_id": "abcd-1234-abcd-1234", "run_id": "abcd-1234-abcd-1234"}
- query_params (dict[str, str]): URL query parameters, e.g., {"stream": "true"}
- headers (dict[bytes, bytes]): Request headers
- authorization (str | None): The Authorization header value (e.g., "Bearer <token>")
| 参数 | 描述 |
|---|---|
fn
|
要注册的身份验证处理函数。必须返回用户的表示形式。这可以是:- 字符串(用户 ID)- 包含 {"identity": str, "permissions": list[str]} 的字典 - 或具有 identity 和 permissions 属性的对象。权限可以由您的下游处理程序选择性使用。
类型: |
| 返回 | 描述 |
|---|---|
AH
|
已注册的处理函数。 |
| 引发 | 描述 |
|---|---|
ValueError
|
如果已经注册了身份验证处理程序。 |
示例
基本令牌认证
@auth.authenticate
async def authenticate(authorization: str) -> str:
user_id = verify_token(authorization)
return user_id
接受完整的请求上下文
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> str:
user = await verify_request(method, path, headers)
return user
返回用户名和权限
@auth.authenticate
async def authenticate(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
permissions, user = await verify_request(method, path, headers)
# Permissions could be things like ["runs:read", "runs:write", "threads:read", "threads:write"]
return {
"identity": user["id"],
"permissions": permissions,
"display_name": user["name"],
}
langgraph_sdk.auth.types ¶
LangGraph 的身份验证和授权类型。
此模块定义了用于 LangGraph 中身份验证、授权和请求处理的核心类型。它包括用户协议、身份验证上下文以及用于各种 API 操作的类型化字典。
注意
所有 typing.TypedDict 类都使用 total=False,使所有字段默认为 typing.Optional。
RunStatus module-attribute ¶
RunStatus = Literal['pending', 'error', 'success', 'timeout', 'interrupted']
运行执行的状态。
值
- pending:运行已排队或正在进行中
- error:运行因错误而失败
- success:运行成功完成
- timeout:运行超出时间限制
- interrupted:运行被手动中断
MultitaskStrategy module-attribute ¶
MultitaskStrategy = Literal['reject', 'rollback', 'interrupt', 'enqueue']
处理多个并发任务的策略。
值
- reject:当一个任务正在进行时拒绝新任务
- rollback:取消当前任务并开始新任务
- interrupt:中断当前任务并开始新任务
- enqueue:将新任务排队,在当前任务之后运行
OnConflictBehavior module-attribute ¶
OnConflictBehavior = Literal['raise', 'do_nothing']
遇到冲突时的行为。
值
- raise:发生冲突时引发异常
- do_nothing:静默忽略冲突
IfNotExists module-attribute ¶
IfNotExists = Literal['create', 'reject']
当实体不存在时的行为。
值
- create:创建实体
- reject:拒绝操作
FilterType module-attribute ¶
FilterType = (
dict[
str,
str
| dict[Literal["$eq", "$contains"], str]
| dict[Literal["$contains"], list[str]],
]
| dict[str, str]
)
授权处理程序的响应类型。
支持精确匹配和操作符
- 精确匹配简写:{"field": "value"}
- 精确匹配:{"field": {"$eq": "value"}}
- 包含(成员关系):{"field": {"$contains": "value"}}
- 包含(子集包含):{"field": {"$contains": ["value1", "value2"]}}
子集包含仅受较新版本的 LangGraph 开发服务器支持;安装 langgraph-runtime-inmem >= 0.14.1 以使用此过滤器变体。
示例
资源所有者的简单精确匹配过滤器
精确匹配过滤器的显式版本
包含(单个元素的成员关系)
包含(子集包含;所有值都必须存在,但顺序无关紧要)
组合过滤器(被视为逻辑 `AND`)
ThreadStatus module-attribute ¶
ThreadStatus = Literal['idle', 'busy', 'interrupted', 'error']
线程的状态。
值
- idle:线程可用于工作
- busy:线程当前正在处理中
- interrupted:线程被中断
- error:线程遇到错误
MetadataInput module-attribute ¶
HandlerResult module-attribute ¶
HandlerResult = None | bool | FilterType
处理程序的结果可以是: * None | True:接受请求。 * False:以 403 错误拒绝请求 * FilterType:要应用的过滤器
Authenticator module-attribute ¶
Authenticator = Callable[
..., Awaitable[MinimalUser | str | BaseUser | MinimalUserDict | Mapping[str, Any],]
]
身份验证函数的类型。
身份验证器可以返回:1. 一个字符串 (user_id) 2. 一个包含 {"identity": str, "permissions": list[str]} 的字典 3. 一个具有 identity 和 permissions 属性的对象
权限可以被您的授权逻辑下游使用,以确定对不同资源的访问权限。
如果您的函数签名中包含以下任何参数,authenticate 装饰器将自动按名称注入它们
| 参数 | 描述 |
|---|---|
request
|
原始 ASGI 请求对象
类型: |
body
|
已解析的请求正文
类型: |
path
|
请求路径
类型: |
method
|
HTTP 方法(GET、POST 等)
类型: |
path_params
|
URL 路径参数 |
query_params
|
URL 查询参数 |
headers
|
请求头 |
authorization
|
Authorization 标头值(例如 "Bearer
类型: |
示例
使用令牌的基本身份验证
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate1(authorization: str) -> Auth.types.MinimalUserDict:
return await get_user(authorization)
使用多个参数进行身份验证
@auth.authenticate
async def authenticate2(
method: str,
path: str,
headers: dict[str, bytes]
) -> Auth.types.MinimalUserDict:
# Custom auth logic using method, path and headers
user = verify_request(method, path, headers)
return user
接受原始 ASGI 请求
MY_SECRET = "my-secret-key"
@auth.authenticate
async def get_current_user(request: Request) -> Auth.types.MinimalUserDict:
try:
token = (request.headers.get("authorization") or "").split(" ", 1)[1]
payload = jwt.decode(token, MY_SECRET, algorithms=["HS256"])
except (IndexError, InvalidTokenError):
raise HTTPException(
status_code=401,
detail="Invalid token",
headers={"WWW-Authenticate": "Bearer"},
)
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.myauth-provider.com/auth/v1/user",
headers={"Authorization": f"Bearer {MY_SECRET}"}
)
if response.status_code != 200:
raise HTTPException(status_code=401, detail="User not found")
user_data = response.json()
return {
"identity": user_data["id"],
"display_name": user_data.get("name"),
"permissions": user_data.get("permissions", []),
"is_authenticated": True,
}
MinimalUser ¶
BaseUser ¶
StudioUser ¶
从 LangGraph studio 的已认证请求中填充的用户对象。
注意:Studio 认证可以在您的 langgraph.json 配置中禁用。
您可以在您的授权处理程序(@auth.on)中使用 isinstance 检查,以专门控制从 LangGraph Studio UI 访问实例的开发人员的访问权限。
示例
BaseAuthContext dataclass ¶
AuthContext dataclass ¶
包含资源和操作信息的完整身份验证上下文。
通过扩展 BaseAuthContext,增加了正在访问的特定资源和操作的信息,从而实现细粒度的访问控制决策。
resource instance-attribute ¶
resource: Literal['runs', 'threads', 'crons', 'assistants', 'store']
正在访问的资源。
action instance-attribute ¶
action: Literal[
"create",
"read",
"update",
"delete",
"search",
"create_run",
"put",
"get",
"list_namespaces",
]
正在对资源执行的操作。
大多数资源支持以下操作:- create:创建新资源 - read:读取有关资源的信息 - update:更新现有资源 - delete:删除资源 - search:搜索资源
存储支持以下操作:- put:在存储中添加或更新文档 - get:从存储中获取文档 - list_namespaces:列出存储中的命名空间
ThreadTTL ¶
ThreadsRead ¶
ThreadsUpdate ¶
ThreadsDelete ¶
RunsCreate ¶
基类:TypedDict
创建运行的负载。
示例
create_params = {
"assistant_id": UUID("123e4567-e89b-12d3-a456-426614174000"),
"thread_id": UUID("123e4567-e89b-12d3-a456-426614174001"),
"run_id": UUID("123e4567-e89b-12d3-a456-426614174002"),
"status": "pending",
"metadata": {"owner": "user123"},
"prevent_insert_if_inflight": True,
"multitask_strategy": "reject",
"if_not_exists": "create",
"after_seconds": 10,
"kwargs": {"key": "value"},
"action": "interrupt"
}
AssistantsDelete ¶
CronsDelete ¶
CronsRead ¶
StoreGet ¶
StoreDelete ¶
on ¶
用于不同 API 操作的类型定义的命名空间。
此类组织了跨不同资源(线程、助手、定时任务)的创建、读取、更新、删除和搜索操作的类型定义。
用法
from langgraph_sdk import Auth
auth = Auth()
@auth.on
def handle_all(params: Auth.on.value):
raise Exception("Not authorized")
@auth.on.threads.create
def handle_thread_create(params: Auth.on.threads.create.value):
# Handle thread creation
pass
@auth.on.assistants.search
def handle_assistant_search(params: Auth.on.assistants.search.value):
# Handle assistant search
pass
langgraph_sdk.auth.exceptions ¶
认证系统中使用的异常。
HTTPException ¶
基类: Exception
您可以引发此 HTTP 异常以返回特定的 HTTP 错误响应。
由于此异常定义在认证模块中,我们默认使用 401 状态码。
| 参数 | 描述 |
|---|---|
status_code
|
错误的 HTTP 状态码。默认为 401 "Unauthorized"。
类型: |
detail
|
详细的错误信息。如果为 `None`,则根据状态码使用默认消息。
类型: |
headers
|
要包含在错误响应中的额外 HTTP 头部。 |
示例
默认
添加头部
raise HTTPException(headers={"X-Custom-Header": "Custom Value"})
# HTTPException(status_code=401, detail='Unauthorized', headers={"WWW-Authenticate": "Bearer"})
自定义错误