在构建智能系统时,理解“工作流”和“代理”的区别至关重要。Anthropic 提供了一个清晰的解释:
LangGraph 提供了构建这些系统的强大工具,支持持久化、流式处理、调试和部署。以下是详细指南:
首先,安装依赖并配置 API 密钥:
python%%capture --no-stderr
%pip install -U langgraph langsmith langchain_anthropic
import os
import getpass
def _set_env(var: str):
if not os.environ.get(var):
os.environ[var] = getpass.getpass(f"{var}: ")
_set_env("ANTHROPIC_API_KEY")
from langchain_anthropic import ChatAnthropic
llm = ChatAnthropic(model="claude-3-5-sonnet-latest")
LLM 可以通过结构化输出和工具调用进行增强。以下是示例:
pythonfrom pydantic import BaseModel, Field
class SearchQuery(BaseModel):
search_query: str = Field(None, description="优化网络搜索的查询。")
justification: str = Field(None, description="为什么此查询与用户请求相关。")
# 增强 LLM 以支持结构化输出
structured_llm = llm.with_structured_output(SearchQuery)
# 调用增强的 LLM
output = structured_llm.invoke("钙 CT 评分与高胆固醇有何关系?")
print(output)
提示链将任务分解为一系列步骤,每个 LLM 调用处理前一个调用的输出:
pythonfrom langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
# 定义状态
class State(TypedDict):
topic: str
joke: str
improved_joke: str
final_joke: str
# 节点函数
def generate_joke(state: State):
msg = llm.invoke(f"写一个关于 {state['topic']} 的短笑话。")
return {"joke": msg.content}
def check_punchline(state: State):
if "?" in state["joke"] or "!" in state["joke"]:
return "Fail"
return "Pass"
def improve_joke(state: State):
msg = llm.invoke(f"通过添加文字游戏使这个笑话更有趣:{state['joke']}")
return {"improved_joke": msg.content}
def polish_joke(state: State):
msg = llm.invoke(f"为这个笑话添加一个意想不到的转折:{state['improved_joke']}")
return {"final_joke": msg.content}
# 构建工作流
workflow = StateGraph(State)
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges("generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END})
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)
chain = workflow.compile()
# 调用
state = chain.invoke({"topic": "猫"})
print(state["final_joke"])
并行化允许 LLM 同时处理任务,并将输出聚合:
python# 定义状态
class State(TypedDict):
topic: str
joke: str
story: str
poem: str
combined_output: str
# 节点函数
def call_llm_1(state: State):
msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话。")
return {"joke": msg.content}
def call_llm_2(state: State):
msg = llm.invoke(f"写一个关于 {state['topic']} 的故事。")
return {"story": msg.content}
def call_llm_3(state: State):
msg = llm.invoke(f"写一首关于 {state['topic']} 的诗。")
return {"poem": msg.content}
def aggregator(state: State):
combined = f"这是关于 {state['topic']} 的故事、笑话和诗!\n\n"
combined += f"故事:\n{state['story']}\n\n"
combined += f"笑话:\n{state['joke']}\n\n"
combined += f"诗:\n{state['poem']}"
return {"combined_output": combined}
# 构建工作流
parallel_builder = StateGraph(State)
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()
# 调用
state = parallel_workflow.invoke({"topic": "狗"})
print(state["combined_output"])
代理通过工具调用和环境反馈在循环中执行任务:
pythonfrom langgraph.graph import MessagesState
from langchain_core.messages import SystemMessage, HumanMessage, ToolMessage
from langchain_core.tools import tool
# 定义工具
@tool
def multiply(a: int, b: int) -> int:
return a * b
@tool
def add(a: int, b: int) -> int:
return a + b
@tool
def divide(a: int, b: int) -> float:
return a / b
# 增强 LLM 以支持工具调用
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
# 节点函数
def llm_call(state: MessagesState):
return {
"messages": [
llm_with_tools.invoke(
[
SystemMessage(content="你是一个帮助执行算术运算的助手。")
]
+ state["messages"]
)
]
}
def tool_node(state: dict):
result = []
for tool_call in state["messages"][-1].tool_calls:
tool = tools_by_name[tool_call["name"]]
observation = tool.invoke(tool_call["args"])
result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
return {"messages": result}
def should_continue(state: MessagesState) -> Literal["environment", END]:
if state["messages"][-1].tool_calls:
return "Action"
return END
# 构建代理
agent_builder = StateGraph(MessagesState)
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("environment", tool_node)
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges("llm_call", should_continue, {"Action": "environment", END: END})
agent_builder.add_edge("environment", "llm_call")
agent = agent_builder.compile()
# 调用
messages = [HumanMessage(content="3 加 4 等于多少?")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
m.pretty_print()
下一步
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!