1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| import operator from typing import TypedDict, Annotated, Literal from langgraph.graph import StateGraph, START, END from langchain_core.messages import HumanMessage, AIMessage from langchain.tools import tool from dotenv import load_dotenv import os
load_dotenv() from langchain_openai import ChatOpenAI
llm = ChatOpenAI( model=os.getenv("DEFAULT_LLM_MODEL_NAME"), api_key=os.getenv("DEFAULT_LLM_API_KEY"), base_url=os.getenv("DEFAULT_LLM_BASE_URL"), temperature=0, )
@tool def analyze_sales_data(query: str) -> str: """分析销售数据趋势,返回季度增长、品类表现等关键指标摘要""" return "【销售分析】Q4销售额同比增长25%,主要来自电子产品类目。"
@tool def analyze_sentiment(text: str) -> str: """分析用户评论的情感倾向,返回满意度、关键词和整体评价""" return "【情感分析】用户评论整体积极,满意度达92%。"
tools = { "analyze_sales_data": analyze_sales_data, "analyze_sentiment": analyze_sentiment, }
class AgentState(TypedDict): messages: Annotated[list, operator.add] sales_result: str | None sentiment_result: str | None final_report: str | None
def route_tasks(state: AgentState) -> Literal["analyze_sales", "analyze_sentiment", "summarize"]: """Orchestrator:决定启动哪些分析""" query = state["messages"][0].content has_sales = "销售" in query or "sales" in query.lower() has_sentiment = "评论" in query or "情感" in query or "sentiment" in query.lower() return "start_parallel"
def start_parallel(state: AgentState) -> dict: """占位节点,实际并行由图结构实现""" return {}
def analyze_sales(state: AgentState) -> dict: query = state["messages"][0].content result = tools["analyze_sales_data"].invoke(query) return {"sales_result": result}
def analyze_sentiment_node(state: AgentState) -> dict: query = state["messages"][0].content result = tools["analyze_sentiment"].invoke(query) return {"sentiment_result": result}
def summarizer(state: AgentState) -> dict: parts = [] if state.get("sales_result"): parts.append(state["sales_result"]) if state.get("sentiment_result"): parts.append(state["sentiment_result"]) full_text = "\n".join(parts) summary = f"【综合简报】\n{full_text}\n\n建议:继续保持产品品质,加大营销投入。" return {"final_report": summary}
builder = StateGraph(AgentState)
builder.add_node("start_parallel", start_parallel) builder.add_node("analyze_sales", analyze_sales) builder.add_node("analyze_sentiment", analyze_sentiment_node) builder.add_node("summarize", summarizer)
builder.add_edge(START, "start_parallel")
builder.add_edge("start_parallel", "analyze_sales") builder.add_edge("start_parallel", "analyze_sentiment")
builder.add_edge("analyze_sales", "summarize") builder.add_edge("analyze_sentiment", "summarize")
builder.add_edge("summarize", END)
app = builder.compile()
if __name__ == "__main__": user_input = "请同时分析销售数据和用户评论情感,并汇总成一份简报。" inputs = {"messages": [HumanMessage(content=user_input)]} result = app.invoke(inputs) print("\n=== 最终报告 ===") print(result.get("final_report", "未生成报告"))
|