编辑
2025-02-13
后端
00
请注意,本文编写于 86 天前,最后修改于 86 天前,其中某些信息可能已经过时。

目录

1. Workflow 类
GraphState 代码讲解
Nodes 代码讲解
解释:

github地址

以下是对代码的详细讲解和中文注释:


1. Workflow

Workflow 类定义了整个邮件处理的工作流程。它使用 StateGraph 来构建一个有状态的工作流,并通过 Nodes 类中的方法来实现每个节点的功能。

python
from langgraph.graph import END, StateGraph from .state import GraphState from .nodes import Nodes class Workflow(): def __init__(self): # 初始化图状态和节点 workflow = StateGraph(GraphState) nodes = Nodes() # 定义所有节点 workflow.add_node("load_inbox_emails", nodes.load_new_emails) workflow.add_node("is_email_inbox_empty", nodes.is_email_inbox_empty) workflow.add_node("categorize_email", nodes.categorize_email) workflow.add_node("construct_rag_queries", nodes.construct_rag_queries) workflow.add_node("retrieve_from_rag", nodes.retrieve_from_rag) workflow.add_node("email_writer", nodes.write_draft_email) workflow.add_node("email_proofreader", nodes.verify_generated_email) workflow.add_node("send_email", nodes.create_draft_response) workflow.add_node("skip_unrelated_email", nodes.skip_unrelated_email) # 设置入口节点:加载收件箱邮件 workflow.set_entry_point("load_inbox_emails") # 检查是否有邮件需要处理 workflow.add_edge("load_inbox_emails", "is_email_inbox_empty") workflow.add_conditional_edges( "is_email_inbox_empty", nodes.check_new_emails, { "process": "categorize_email", # 如果有邮件,进入分类节点 "empty": END # 如果收件箱为空,结束流程 } ) # 根据邮件类别路由 workflow.add_conditional_edges( "categorize_email", nodes.route_email_based_on_category, { "product related": "construct_rag_queries", # 产品相关邮件,进入 RAG 查询构建 "not product related": "email_writer", # 非产品相关邮件(如反馈或投诉),进入邮件撰写 "unrelated": "skip_unrelated_email" # 无关邮件,跳过 } ) # 将构建的 RAG 查询传递给 RAG 链以检索信息 workflow.add_edge("construct_rag_queries", "retrieve_from_rag") # 将检索到的信息传递给撰写代理以创建草稿邮件 workflow.add_edge("retrieve_from_rag", "email_writer") # 校对生成的草稿邮件 workflow.add_edge("email_writer", "email_proofreader") # 检查邮件是否可发送,如果不可发送则重写 workflow.add_conditional_edges( "email_proofreader", nodes.must_rewrite, { "send": "send_email", # 邮件可发送,进入发送节点 "rewrite": "email_writer", # 邮件需要重写,返回撰写节点 "stop": "categorize_email" # 达到最大重试次数,停止处理 } ) # 检查是否还有邮件需要处理 workflow.add_edge("send_email", "is_email_inbox_empty") workflow.add_edge("skip_unrelated_email", "is_email_inbox_empty") # 编译工作流 self.app = workflow.compile()

GraphState 代码讲解

GraphState 定义了工作流的状态结构,使用 TypedDictPydantic 模型来描述状态中的字段。

python
from pydantic import BaseModel, Field from typing import List, Annotated from typing_extensions import TypedDict from langgraph.graph.message import add_messages class Email(BaseModel): id: str = Field(..., description="邮件的唯一标识符") threadId: str = Field(..., description="邮件的线程标识符") messageId: str = Field(..., description="邮件的消息标识符") references: str = Field(..., description="邮件的引用信息") sender: str = Field(..., description="发件人的邮箱地址") subject: str = Field(..., description="邮件的主题") body: str = Field(..., description="邮件的正文内容") class GraphState(TypedDict): emails: List[Email] # 待处理的邮件列表 current_email: Email # 当前正在处理的邮件 email_category: str # 邮件的类别 generated_email: str # 生成的草稿邮件内容 rag_queries: List[str] # RAG 查询列表 retrieved_documents: str # 从 RAG 检索到的文档 writer_messages: Annotated[list, add_messages] # 撰写代理的消息历史 sendable: bool # 邮件是否可发送 trials: int # 重试次数

Nodes 代码讲解

Nodes 类定义了工作流中每个节点的具体实现。每个方法都接收 GraphState 作为输入,并返回更新后的状态。

python
from colorama import Fore, Style from .agents import Agents from .tools.GmailTools import GmailToolsClass from .state import GraphState, Email class Nodes: def __init__(self): self.agents = Agents() # 初始化代理 self.gmail_tools = GmailToolsClass() # 初始化 Gmail 工具 def load_new_emails(self, state: GraphState) -> GraphState: """从 Gmail 加载新邮件并更新状态""" print(Fore.YELLOW + "Loading new emails...\n" + Style.RESET_ALL) recent_emails = self.gmail_tools.fetch_unanswered_emails() emails = [Email(**email) for email in recent_emails] return {"emails": emails} def check_new_emails(self, state: GraphState) -> str: """检查是否有新邮件需要处理""" if len(state['emails']) == 0: print(Fore.RED + "No new emails" + Style.RESET_ALL) return "empty" else: print(Fore.GREEN + "New emails to process" + Style.RESET_ALL) return "process" def is_email_inbox_empty(self, state: GraphState) -> GraphState: """检查收件箱是否为空""" return state def categorize_email(self, state: GraphState) -> GraphState: """使用代理对当前邮件进行分类""" print(Fore.YELLOW + "Checking email category...\n" + Style.RESET_ALL) current_email = state["emails"][-1] result = self.agents.categorize_email.invoke({"email": current_email.body}) print(Fore.MAGENTA + f"Email category: {result.category.value}" + Style.RESET_ALL) return { "email_category": result.category.value, "current_email": current_email } def route_email_based_on_category(self, state: GraphState) -> str: """根据邮件类别路由""" print(Fore.YELLOW + "Routing email based on category...\n" + Style.RESET_ALL) category = state["email_category"] if category == "product_enquiry": return "product related" elif category == "unrelated": return "unrelated" else: return "not product related" def construct_rag_queries(self, state: GraphState) -> GraphState: """根据邮件内容构建 RAG 查询""" print(Fore.YELLOW + "Designing RAG query...\n" + Style.RESET_ALL) email_content = state["current_email"].body query_result = self.agents.design_rag_queries.invoke({"email": email_content}) return {"rag_queries": query_result.queries} def retrieve_from_rag(self, state: GraphState) -> GraphState: """从内部知识库中检索信息""" print(Fore.YELLOW + "Retrieving information from internal knowledge...\n" + Style.RESET_ALL) final_answer = "" for query in state["rag_queries"]: rag_result = self.agents.generate_rag_answer.invoke(query) final_answer += query + "\n" + rag_result + "\n\n" return {"retrieved_documents": final_answer} def write_draft_email(self, state: GraphState) -> GraphState: """根据当前邮件和检索到的信息撰写草稿邮件""" print(Fore.YELLOW + "Writing draft email...\n" + Style.RESET_ALL) inputs = ( f'# **EMAIL CATEGORY:** {state["email_category"]}\n\n' f'# **EMAIL CONTENT:**\n{state["current_email"].body}\n\n' f'# **INFORMATION:**\n{state["retrieved_documents"]}' ) writer_messages = state.get('writer_messages', []) draft_result = self.agents.email_writer.invoke({ "email_information": inputs, "history": writer_messages }) email = draft_result.email trials = state.get('trials', 0) + 1 writer_messages.append(f"**Draft {trials}:**\n{email}") return { "generated_email": email, "trials": trials, "writer_messages": writer_messages } def verify_generated_email(self, state: GraphState) -> GraphState: """使用校对代理验证生成的邮件""" print(Fore.YELLOW + "Verifying generated email...\n" + Style.RESET_ALL) review = self.agents.email_proofreader.invoke({ "initial_email": state["current_email"].body, "generated_email": state["generated_email"], }) writer_messages = state.get('writer_messages', []) writer_messages.append(f"**Proofreader Feedback:**\n{review.feedback}") return { "sendable": review.send, "writer_messages": writer_messages } def must_rewrite(self, state: GraphState) -> str: """根据校对结果和重试次数决定是否需要重写邮件""" email_sendable = state["sendable"] if email_sendable: print(Fore.GREEN + "Email is good, ready to be sent!!!" + Style.RESET_ALL) state["emails"].pop() state["writer_messages"] = [] return "send" elif state["trials"] >= 3: print(Fore.RED + "Email is not good, we reached max trials must stop!!!" + Style.RESET_ALL) state["emails"].pop() state["writer_messages"] = [] return "stop" else: print(Fore.RED + "Email is not good, must rewrite it..." + Style.RESET_ALL) return "rewrite" def create_draft_response(self, state: GraphState) -> GraphState: """在 Gmail 中创建草稿回复""" print(Fore.YELLOW + "Creating draft email...\n" + Style.RESET_ALL) self.gmail_tools.create_draft_reply(state["current_email"], state["generated_email"]) return {"retrieved_documents": "", "trials": 0} def send_email_response(self, state: GraphState) -> GraphState: """直接发送邮件回复""" print(Fore.YELLOW + "Sending email...\n" + Style.RESET_ALL) self.gmail_tools.send_reply(state["current_email"], state["generated_email"]) return {"retrieved_documents": "", "trials": 0} def skip_unrelated_email(self, state): """跳过无关邮件并从邮件列表中移除""" print("Skipping unrelated email...\n") state["emails"].pop() return state

image.png

解释:

  1. load_inbox_emails: 加载收件箱中的邮件。
  2. is_email_inbox_empty: 检查收件箱是否为空。
    • 如果为空,流程结束。
    • 如果不为空,进入 categorize_email
  3. categorize_email: 对邮件进行分类。
    • 如果邮件与产品相关,进入 construct_rag_queries
    • 如果邮件与产品无关(如反馈或投诉),进入 email_writer
    • 如果邮件无关,进入 skip_unrelated_email
  4. construct_rag_queries: 构建RAG查询。
  5. retrieve_from_rag: 从RAG中检索信息。
  6. email_writer: 撰写草稿邮件。
  7. email_proofreader: 校对生成的草稿邮件。
    • 如果邮件可以发送,进入 send_email
    • 如果需要重写,返回 email_writer
    • 如果停止处理,返回 categorize_email
  8. send_email: 发送邮件。
  9. skip_unrelated_email: 跳过无关邮件。
  10. END: 流程结束。

这个流程图展示了邮件处理的工作流程,从加载邮件到最终发送或跳过邮件的整个过程。

本文作者:yowayimono

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!