以下是对代码的详细讲解和中文注释:
Workflow
类Workflow
类定义了整个邮件处理的工作流程。它使用 StateGraph
来构建一个有状态的工作流,并通过 Nodes
类中的方法来实现每个节点的功能。
pythonfrom langgraph.graph import END, StateGraph
from .state import GraphState
from .nodes import Nodes
class Workflow():
def __init__(self):
# 初始化图状态和节点
workflow = StateGraph(GraphState)
nodes = Nodes()
# 定义所有节点
workflow.add_node("load_inbox_emails", nodes.load_new_emails)
workflow.add_node("is_email_inbox_empty", nodes.is_email_inbox_empty)
workflow.add_node("categorize_email", nodes.categorize_email)
workflow.add_node("construct_rag_queries", nodes.construct_rag_queries)
workflow.add_node("retrieve_from_rag", nodes.retrieve_from_rag)
workflow.add_node("email_writer", nodes.write_draft_email)
workflow.add_node("email_proofreader", nodes.verify_generated_email)
workflow.add_node("send_email", nodes.create_draft_response)
workflow.add_node("skip_unrelated_email", nodes.skip_unrelated_email)
# 设置入口节点:加载收件箱邮件
workflow.set_entry_point("load_inbox_emails")
# 检查是否有邮件需要处理
workflow.add_edge("load_inbox_emails", "is_email_inbox_empty")
workflow.add_conditional_edges(
"is_email_inbox_empty",
nodes.check_new_emails,
{
"process": "categorize_email", # 如果有邮件,进入分类节点
"empty": END # 如果收件箱为空,结束流程
}
)
# 根据邮件类别路由
workflow.add_conditional_edges(
"categorize_email",
nodes.route_email_based_on_category,
{
"product related": "construct_rag_queries", # 产品相关邮件,进入 RAG 查询构建
"not product related": "email_writer", # 非产品相关邮件(如反馈或投诉),进入邮件撰写
"unrelated": "skip_unrelated_email" # 无关邮件,跳过
}
)
# 将构建的 RAG 查询传递给 RAG 链以检索信息
workflow.add_edge("construct_rag_queries", "retrieve_from_rag")
# 将检索到的信息传递给撰写代理以创建草稿邮件
workflow.add_edge("retrieve_from_rag", "email_writer")
# 校对生成的草稿邮件
workflow.add_edge("email_writer", "email_proofreader")
# 检查邮件是否可发送,如果不可发送则重写
workflow.add_conditional_edges(
"email_proofreader",
nodes.must_rewrite,
{
"send": "send_email", # 邮件可发送,进入发送节点
"rewrite": "email_writer", # 邮件需要重写,返回撰写节点
"stop": "categorize_email" # 达到最大重试次数,停止处理
}
)
# 检查是否还有邮件需要处理
workflow.add_edge("send_email", "is_email_inbox_empty")
workflow.add_edge("skip_unrelated_email", "is_email_inbox_empty")
# 编译工作流
self.app = workflow.compile()
GraphState
定义了工作流的状态结构,使用 TypedDict
和 Pydantic
模型来描述状态中的字段。
pythonfrom pydantic import BaseModel, Field
from typing import List, Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
class Email(BaseModel):
id: str = Field(..., description="邮件的唯一标识符")
threadId: str = Field(..., description="邮件的线程标识符")
messageId: str = Field(..., description="邮件的消息标识符")
references: str = Field(..., description="邮件的引用信息")
sender: str = Field(..., description="发件人的邮箱地址")
subject: str = Field(..., description="邮件的主题")
body: str = Field(..., description="邮件的正文内容")
class GraphState(TypedDict):
emails: List[Email] # 待处理的邮件列表
current_email: Email # 当前正在处理的邮件
email_category: str # 邮件的类别
generated_email: str # 生成的草稿邮件内容
rag_queries: List[str] # RAG 查询列表
retrieved_documents: str # 从 RAG 检索到的文档
writer_messages: Annotated[list, add_messages] # 撰写代理的消息历史
sendable: bool # 邮件是否可发送
trials: int # 重试次数
Nodes
类定义了工作流中每个节点的具体实现。每个方法都接收 GraphState
作为输入,并返回更新后的状态。
pythonfrom colorama import Fore, Style
from .agents import Agents
from .tools.GmailTools import GmailToolsClass
from .state import GraphState, Email
class Nodes:
def __init__(self):
self.agents = Agents() # 初始化代理
self.gmail_tools = GmailToolsClass() # 初始化 Gmail 工具
def load_new_emails(self, state: GraphState) -> GraphState:
"""从 Gmail 加载新邮件并更新状态"""
print(Fore.YELLOW + "Loading new emails...\n" + Style.RESET_ALL)
recent_emails = self.gmail_tools.fetch_unanswered_emails()
emails = [Email(**email) for email in recent_emails]
return {"emails": emails}
def check_new_emails(self, state: GraphState) -> str:
"""检查是否有新邮件需要处理"""
if len(state['emails']) == 0:
print(Fore.RED + "No new emails" + Style.RESET_ALL)
return "empty"
else:
print(Fore.GREEN + "New emails to process" + Style.RESET_ALL)
return "process"
def is_email_inbox_empty(self, state: GraphState) -> GraphState:
"""检查收件箱是否为空"""
return state
def categorize_email(self, state: GraphState) -> GraphState:
"""使用代理对当前邮件进行分类"""
print(Fore.YELLOW + "Checking email category...\n" + Style.RESET_ALL)
current_email = state["emails"][-1]
result = self.agents.categorize_email.invoke({"email": current_email.body})
print(Fore.MAGENTA + f"Email category: {result.category.value}" + Style.RESET_ALL)
return {
"email_category": result.category.value,
"current_email": current_email
}
def route_email_based_on_category(self, state: GraphState) -> str:
"""根据邮件类别路由"""
print(Fore.YELLOW + "Routing email based on category...\n" + Style.RESET_ALL)
category = state["email_category"]
if category == "product_enquiry":
return "product related"
elif category == "unrelated":
return "unrelated"
else:
return "not product related"
def construct_rag_queries(self, state: GraphState) -> GraphState:
"""根据邮件内容构建 RAG 查询"""
print(Fore.YELLOW + "Designing RAG query...\n" + Style.RESET_ALL)
email_content = state["current_email"].body
query_result = self.agents.design_rag_queries.invoke({"email": email_content})
return {"rag_queries": query_result.queries}
def retrieve_from_rag(self, state: GraphState) -> GraphState:
"""从内部知识库中检索信息"""
print(Fore.YELLOW + "Retrieving information from internal knowledge...\n" + Style.RESET_ALL)
final_answer = ""
for query in state["rag_queries"]:
rag_result = self.agents.generate_rag_answer.invoke(query)
final_answer += query + "\n" + rag_result + "\n\n"
return {"retrieved_documents": final_answer}
def write_draft_email(self, state: GraphState) -> GraphState:
"""根据当前邮件和检索到的信息撰写草稿邮件"""
print(Fore.YELLOW + "Writing draft email...\n" + Style.RESET_ALL)
inputs = (
f'# **EMAIL CATEGORY:** {state["email_category"]}\n\n'
f'# **EMAIL CONTENT:**\n{state["current_email"].body}\n\n'
f'# **INFORMATION:**\n{state["retrieved_documents"]}'
)
writer_messages = state.get('writer_messages', [])
draft_result = self.agents.email_writer.invoke({
"email_information": inputs,
"history": writer_messages
})
email = draft_result.email
trials = state.get('trials', 0) + 1
writer_messages.append(f"**Draft {trials}:**\n{email}")
return {
"generated_email": email,
"trials": trials,
"writer_messages": writer_messages
}
def verify_generated_email(self, state: GraphState) -> GraphState:
"""使用校对代理验证生成的邮件"""
print(Fore.YELLOW + "Verifying generated email...\n" + Style.RESET_ALL)
review = self.agents.email_proofreader.invoke({
"initial_email": state["current_email"].body,
"generated_email": state["generated_email"],
})
writer_messages = state.get('writer_messages', [])
writer_messages.append(f"**Proofreader Feedback:**\n{review.feedback}")
return {
"sendable": review.send,
"writer_messages": writer_messages
}
def must_rewrite(self, state: GraphState) -> str:
"""根据校对结果和重试次数决定是否需要重写邮件"""
email_sendable = state["sendable"]
if email_sendable:
print(Fore.GREEN + "Email is good, ready to be sent!!!" + Style.RESET_ALL)
state["emails"].pop()
state["writer_messages"] = []
return "send"
elif state["trials"] >= 3:
print(Fore.RED + "Email is not good, we reached max trials must stop!!!" + Style.RESET_ALL)
state["emails"].pop()
state["writer_messages"] = []
return "stop"
else:
print(Fore.RED + "Email is not good, must rewrite it..." + Style.RESET_ALL)
return "rewrite"
def create_draft_response(self, state: GraphState) -> GraphState:
"""在 Gmail 中创建草稿回复"""
print(Fore.YELLOW + "Creating draft email...\n" + Style.RESET_ALL)
self.gmail_tools.create_draft_reply(state["current_email"], state["generated_email"])
return {"retrieved_documents": "", "trials": 0}
def send_email_response(self, state: GraphState) -> GraphState:
"""直接发送邮件回复"""
print(Fore.YELLOW + "Sending email...\n" + Style.RESET_ALL)
self.gmail_tools.send_reply(state["current_email"], state["generated_email"])
return {"retrieved_documents": "", "trials": 0}
def skip_unrelated_email(self, state):
"""跳过无关邮件并从邮件列表中移除"""
print("Skipping unrelated email...\n")
state["emails"].pop()
return state
这个流程图展示了邮件处理的工作流程,从加载邮件到最终发送或跳过邮件的整个过程。
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!