0%

langgraph-demo项目1

构建第一个简单的langgraph项目如下:

.env配置
DEFAULT_LLM_MODEL_NAME:xxx

DEFAULT_LLM_API_KEY:xxx

DEFAULT_LLM_BASE_URL:xxx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import operator
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import HumanMessage, AIMessage
from langchain.tools import tool
from dotenv import load_dotenv
import os

# ----------------------------
# 加载环境变量 & 初始化 LLM
# ----------------------------
load_dotenv()
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model=os.getenv("DEFAULT_LLM_MODEL_NAME"),
api_key=os.getenv("DEFAULT_LLM_API_KEY"),
base_url=os.getenv("DEFAULT_LLM_BASE_URL"),
temperature=0,
)

# ----------------------------
# 工具定义
# ----------------------------
@tool
def analyze_sales_data(query: str) -> str:
"""分析销售数据趋势,返回季度增长、品类表现等关键指标摘要"""
return "【销售分析】Q4销售额同比增长25%,主要来自电子产品类目。"

@tool
def analyze_sentiment(text: str) -> str:
"""分析用户评论的情感倾向,返回满意度、关键词和整体评价"""
return "【情感分析】用户评论整体积极,满意度达92%。"

tools = {
"analyze_sales_data": analyze_sales_data,
"analyze_sentiment": analyze_sentiment,
}

# ----------------------------
# 状态定义
# ----------------------------
class AgentState(TypedDict):
messages: Annotated[list, operator.add]
sales_result: str | None
sentiment_result: str | None
final_report: str | None

# ----------------------------
# 节点函数
# ----------------------------
def route_tasks(state: AgentState) -> Literal["analyze_sales", "analyze_sentiment", "summarize"]:
"""Orchestrator:决定启动哪些分析"""
query = state["messages"][0].content
has_sales = "销售" in query or "sales" in query.lower()
has_sentiment = "评论" in query or "情感" in query or "sentiment" in query.lower()

# 注意:LangGraph 1.0 不支持动态 Send,所以我们用条件边控制
# 但无法在一个节点返回多个目标 → 改为:先分发到一个“启动器”,再并行
# 更简单做法:假设最多两个任务,显式建图
return "start_parallel"

def start_parallel(state: AgentState) -> dict:
"""占位节点,实际并行由图结构实现"""
return {}

def analyze_sales(state: AgentState) -> dict:
query = state["messages"][0].content
result = tools["analyze_sales_data"].invoke(query)
return {"sales_result": result}

def analyze_sentiment_node(state: AgentState) -> dict:
query = state["messages"][0].content
result = tools["analyze_sentiment"].invoke(query)
return {"sentiment_result": result}

def summarizer(state: AgentState) -> dict:
parts = []
if state.get("sales_result"):
parts.append(state["sales_result"])
if state.get("sentiment_result"):
parts.append(state["sentiment_result"])

full_text = "\n".join(parts)
# 使用工具摘要(或直接拼接)
summary = f"【综合简报】\n{full_text}\n\n建议:继续保持产品品质,加大营销投入。"
return {"final_report": summary}

# ----------------------------
# 构建图(LangGraph 1.0 风格)
# ----------------------------
builder = StateGraph(AgentState)

# 添加所有节点
builder.add_node("start_parallel", start_parallel)
builder.add_node("analyze_sales", analyze_sales)
builder.add_node("analyze_sentiment", analyze_sentiment_node)
builder.add_node("summarize", summarizer)

# 入口
builder.add_edge(START, "start_parallel")

# 从 start_parallel 分叉到两个并行节点
builder.add_edge("start_parallel", "analyze_sales")
builder.add_edge("start_parallel", "analyze_sentiment")

# 两个分析完成后汇聚到 summarize
# LangGraph 1.0 会自动等待所有上游完成(因为是 DAG)
builder.add_edge("analyze_sales", "summarize")
builder.add_edge("analyze_sentiment", "summarize")

builder.add_edge("summarize", END)

# 编译
app = builder.compile()

# ----------------------------
# 运行
# ----------------------------
if __name__ == "__main__":
user_input = "请同时分析销售数据和用户评论情感,并汇总成一份简报。"
inputs = {"messages": [HumanMessage(content=user_input)]}

result = app.invoke(inputs)

print("\n=== 最终报告 ===")
print(result.get("final_report", "未生成报告"))

模块化拆分

project/
├── main.py
├── schemas.py
├── agents/
│ ├── init.py
│ ├── tools.py
│ └── nodes.py
└── graph/
├── init.py
└── workflow.py

schemas.py

是整个系统的 唯一数据源(Single Source of Truth)
使用 Annotated 声明合并策略(如 operator.add 实现消息累积)

1
2
3
4
5
6
7
8
9
from typing import TypedDict, Annotated, Optional
from langchain_core.messages import BaseMessage
import operator

class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], operator.add]
sales_result: Optional[str]
sentiment_result: Optional[str]
final_report: Optional[str]

agent/tools.py

原子能力单元:
使用 LangChain 的 @tool 装饰器封装业务逻辑
每个工具必须有 docstring(供 LLM 理解用途)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from langchain.tools import tool

@tool
def analyze_sales_data(query: str) -> str:
"""分析销售数据趋势..."""
return "【销售分析】Q4销售额同比增长25%..."

@tool
def analyze_sentiment(text: str) -> str:
"""分析用户评论的情感倾向..."""
return "【情感分析】用户评论整体积极..."

TOOLS = {
"analyze_sales_data": analyze_sales_data,
"analyze_sentiment": analyze_sentiment,
}

agent/nodes.py

Agent 行为节点:
每个函数是一个 状态转换器:state → partial_update
从全局 state 读取输入,返回要更新的字段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from .tools import TOOLS
from typing import Dict, Any

def start_parallel(state: Dict[str, Any]) -> Dict[str, Any]:
return {}

def analyze_sales(state: Dict[str, Any]) -> Dict[str, Any]:
query = state["messages"][0].content
result = TOOLS["analyze_sales_data"].invoke(query)
return {"sales_result": result}

def analyze_sentiment_node(state: Dict[str, Any]) -> Dict[str, Any]:
query = state["messages"][0].content
result = TOOLS["analyze_sentiment"].invoke(query)
return {"sentiment_result": result}

def summarizer(state: Dict[str, Any]) -> Dict[str, Any]:
parts = []
if state.get("sales_result"):
parts.append(state["sales_result"])
if state.get("sentiment_result"):
parts.append(state["sentiment_result"])
full_text = "\n".join(parts)
summary = f"【综合简报】\n{full_text}\n\n建议:..."
return {"final_report": summary}

graph/workflow.py 流程编排

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from .tools import TOOLS
from typing import Dict, Any

def start_parallel(state: Dict[str, Any]) -> Dict[str, Any]:
return {}

def analyze_sales(state: Dict[str, Any]) -> Dict[str, Any]:
query = state["messages"][0].content
result = TOOLS["analyze_sales_data"].invoke(query)
return {"sales_result": result}

def analyze_sentiment_node(state: Dict[str, Any]) -> Dict[str, Any]:
query = state["messages"][0].content
result = TOOLS["analyze_sentiment"].invoke(query)
return {"sentiment_result": result}

def summarizer(state: Dict[str, Any]) -> Dict[str, Any]:
parts = []
if state.get("sales_result"):
parts.append(state["sales_result"])
if state.get("sentiment_result"):
parts.append(state["sentiment_result"])
full_text = "\n".join(parts)
summary = f"【综合简报】\n{full_text}\n\n建议:..."
return {"final_report": summary}

main.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from dotenv import load_dotenv
import os
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from graph.workflow import create_workflow

# ----------------------------
# 加载环境变量 & 初始化 LLM
# ----------------------------
load_dotenv()
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(
model=os.getenv("DEFAULT_LLM_MODEL_NAME"),
api_key=os.getenv("DEFAULT_LLM_API_KEY"),
base_url=os.getenv("DEFAULT_LLM_BASE_URL"),
temperature=0,
)


if __name__ == "__main__":
app = create_workflow()
## 运行调用
inputs = {"messages": [HumanMessage(content="请同时分析销售和评论...")]}
result = app.invoke(inputs)
print("\n=== 最终报告 ===")
print(result.get("final_report", "未生成报告"))

总结

langgraph框架下,简单的实现了一个分析报告agent,主要模块包括:

  • 环境与LLM初始化
  • 工具定义
  • 状态模型定义
  • 节点函数设计
  • 图构建
  • 运行调用

下一个项目中,我们会增加记忆存储,多agent协同等功能;