P2 阶段完成报告 - LangGraph 接管 AI 工作流
实施时间:2024-12-14
状态:✅ 开发完成,待部署测试
一、实施概览
1.1 目标
创建 langgraph-runner Python 服务,迁移第一个工作流 project_diagnosis_workflow 到 LangGraph,实现多 Agent 协作和状态管理。
1.2 迁移工作流
| workflow_id | 原引擎 | 新引擎 | 状态 |
|---|---|---|---|
project_diagnosis_workflow | Node WorkflowEngine | LangGraph | ✅ 已迁移 |
二、核心交付物
2.1 新增服务
langgraph-runner - Python 微服务(FastAPI + LangGraph)
langgraph-runner/
├── app/
│ ├── main.py # FastAPI 入口
│ ├── config.py # 配置管理
│ ├── graphs/
│ │ └── diagnosis_graph.py # 诊断工作流 StateGraph
│ ├── agents/
│ │ └── strategy_agent.py # 策略 Agent
│ └── tools/
│ └── node_api.py # Node API Tool
├── requirements.txt # Python 依赖
├── Dockerfile # Docker 配置
├── env.example.txt # 环境变量模板
└── README.md # 文档2.2 Node.js 侧改动
| 文件 | 改动内容 |
|---|---|
backend/src/services/workflow/LangGraphClient.js | 新增:调用 LangGraph Runner 的客户端 |
backend/src/services/workflow/OrchestratorService.js | 修改:支持 engine 字段路由(node/langgraph/n8n) |
backend/src/services/workflow/WorkflowRegistry.js | 修改:为 project_diagnosis_workflow 添加 engine: 'langgraph' |
三、架构设计
3.1 数据流
Frontend (Vue)
↓
POST /api/strategy/diagnosis
↓
Node.js Backend
↓
OrchestratorService.run('project_diagnosis_workflow')
↓
├─ Check manifest.engine = 'langgraph'
↓
LangGraphClient.run()
↓
POST http://localhost:8000/run
↓
LangGraph Runner (Python)
↓
StateGraph: diagnose → positioning
↓
├─ StrategyAgent.diagnose_project()
│ └─ ChatOpenAI → GPT-4o-mini
↓
├─ StrategyAgent.generate_positioning()
│ └─ ChatOpenAI → GPT-4o-mini
↓
Return outputs { diagnosis_result, positioning_options }
↓
Node.js Backend (store in DB / return to frontend)3.2 StateGraph 定义
python
class DiagnosisState(TypedDict):
# Inputs
creator_profile: Dict[str, Any]
goals: List[str]
strengths: List[str]
constraints: List[str]
main_platforms: List[str]
reference_accounts: List[str]
preferred_style: str
# Context
user_context: Dict[str, Any]
project_context: Dict[str, Any]
# Outputs
diagnosis_result: Dict[str, Any]
positioning_options: List[Dict[str, Any]]
# Errors
errors: List[str]Workflow:
Start
↓
diagnose_node (StrategyAgent.diagnose_project)
↓
should_continue? (check errors)
├─ Yes → positioning_node (StrategyAgent.generate_positioning)
│ ↓
│ END
└─ No → END四、API 规范
4.1 LangGraph Runner API
POST /run
请求
json
{
"workflow_id": "project_diagnosis_workflow",
"inputs": {
"creator_profile": {"creator_type": "个人创作者"},
"goals": ["建立个人品牌"],
"strengths": ["擅长写作"],
"constraints": ["时间有限"],
"main_platforms": ["douyin"],
"reference_accounts": [],
"preferred_style": null
},
"user_context": {
"user_id": 123
},
"project_context": {
"project_id": "uuid-xxx"
}
}响应
json
{
"success": true,
"run_id": "run-diagnosis-12345",
"workflow_id": "project_diagnosis_workflow",
"status": "completed",
"outputs": {
"diagnosis_result": {
"summary": "...",
"platform_fit": {...},
"opportunities": [...],
"challenges": [...],
"recommendations": [...]
},
"positioning_options": [
{
"name": "定位名称",
"tagline": "一句话介绍",
"target_audience": "目标受众",
"content_pillars": [...],
"unique_value": "独特价值",
"visual_style": "视觉风格",
"initial_topics": [...]
}
]
},
"trace": [...]
}4.2 Node.js LangGraphClient API
javascript
const langGraphClient = require('./services/workflow/LangGraphClient');
// Run workflow
const result = await langGraphClient.run(
'project_diagnosis_workflow',
{
creator_profile: {...},
goals: [...],
// ...
},
{ user_id: 123 },
{ project_id: 'uuid-xxx' }
);
// Health check
const health = await langGraphClient.health();
// { status: "healthy", graphs_loaded: 1, available_graphs: ["diagnosis"] }
// List graphs
const graphs = await langGraphClient.listGraphs();
// { success: true, graphs: ["diagnosis"], count: 1 }五、部署步骤
5.1 前提条件
- ✅ Python 3.11+ 已安装
- ✅ OpenAI API Key
- ✅ Node.js backend 运行中(3001 端口)
5.2 部署 LangGraph Runner
方式一:本地运行(开发环境)
bash
# 1. 创建虚拟环境
cd langgraph-runner
python -m venv venv
# 激活(Windows)
venv\Scripts\activate
# 激活(Linux/Mac)
source venv/bin/activate
# 2. 安装依赖
pip install -r requirements.txt
# 3. 配置环境变量
copy env.example.txt .env
# 编辑 .env 填入配置
# 4. 启动服务
python -m app.main服务启动在 http://localhost:8000
方式二:Docker 部署
bash
# 1. 构建镜像
cd langgraph-runner
docker build -t kkmusic/langgraph-runner:latest .
# 2. 运行容器
docker run -d \
--name langgraph-runner \
-p 8000:8000 \
--env-file .env \
kkmusic/langgraph-runner:latest
# 3. 查看日志
docker logs -f langgraph-runner5.3 验证部署
步骤 1:检查 LangGraph Runner
bash
# 健康检查
curl http://localhost:8000/health
# 预期响应
{
"status": "healthy",
"graphs_loaded": 1,
"available_graphs": ["diagnosis"]
}步骤 2:检查 Node → LangGraph 连接
bash
# 从 Node 项目目录
cd backend
# 测试 LangGraph 健康
node -e "const c = require('./src/services/workflow/LangGraphClient'); c.health().then(console.log);"
# 预期输出:
# { status: 'healthy', graphs_loaded: 1, available_graphs: [ 'diagnosis' ] }步骤 3:测试完整工作流
bash
# 使用 curl 测试
curl -X POST http://localhost:8000/run \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"workflow_id": "project_diagnosis_workflow",
"inputs": {
"creator_profile": {"creator_type": "个人创作者", "background": "科技博主"},
"goals": ["建立个人品牌", "实现知识变现"],
"strengths": ["擅长写作", "有技术背景"],
"constraints": ["时间有限"],
"main_platforms": ["douyin", "xiaohongshu"],
"reference_accounts": [],
"preferred_style": "专业且有温度"
},
"user_context": {"user_id": 1},
"project_context": {"project_id": "test-123"}
}'预期响应:返回 200,包含 outputs.diagnosis_result 和 outputs.positioning_options
六、测试用例
6.1 单元测试(Python)
python
# tests/test_diagnosis_graph.py
import pytest
from app.graphs.diagnosis_graph import build_diagnosis_graph
@pytest.mark.asyncio
async def test_diagnosis_graph():
graph = build_diagnosis_graph()
initial_state = {
"creator_profile": {"creator_type": "个人创作者"},
"goals": ["建立个人品牌"],
"strengths": ["擅长写作"],
"constraints": ["时间有限"],
"main_platforms": ["douyin"],
"reference_accounts": [],
"preferred_style": None,
"user_context": {},
"project_context": {},
"errors": []
}
final_state = await graph.ainvoke(initial_state)
assert "diagnosis_result" in final_state
assert "positioning_options" in final_state
assert len(final_state["positioning_options"]) >= 26.2 集成测试(Node.js + Python)
javascript
// tests/integration/langgraph.test.js
describe('LangGraph Integration', () => {
test('Full workflow execution', async () => {
const langGraphClient = require('../../src/services/workflow/LangGraphClient');
const result = await langGraphClient.run(
'project_diagnosis_workflow',
{
creator_profile: { creator_type: '个人创作者' },
goals: ['建立个人品牌'],
strengths: ['擅长写作'],
constraints: ['时间有限'],
main_platforms: ['douyin'],
reference_accounts: [],
preferred_style: null
},
{ user_id: 1 },
{ project_id: 'test-123' }
);
expect(result.status).toBe('completed');
expect(result.result).toHaveProperty('diagnosis_result');
expect(result.result).toHaveProperty('positioning_options');
expect(result.result.positioning_options.length).toBeGreaterThanOrEqual(2);
});
});6.3 端到端测试(通过 OrchestratorService)
javascript
// tests/e2e/orchestrator-langgraph.test.js
describe('Orchestrator with LangGraph', () => {
test('Routes to LangGraph for diagnosis workflow', async () => {
const orchestrator = require('../../src/services/workflow/OrchestratorService');
const result = await orchestrator.run('project_diagnosis_workflow', {
inputs: {
creator_profile: { creator_type: '个人创作者' },
goals: ['建立个人品牌'],
strengths: ['擅长写作'],
constraints: ['时间有限'],
main_platforms: ['douyin']
},
user_context: { user_id: 1 },
project_context: { project_id: 'test-123' }
});
expect(result.success).toBe(true);
expect(result.engine).toBe('langgraph');
expect(result.result).toHaveProperty('diagnosis_result');
});
});七、性能指标
| 指标 | 目标 | 实际 |
|---|---|---|
| LangGraph 启动时间 | < 5s | ___s |
| 健康检查响应时间 | < 100ms | ___ms |
| 诊断工作流执行时间 | < 30s | ___s |
| 内存占用 | < 500MB | ___MB |
八、监控与日志
8.1 LangGraph Runner 日志
bash
# 查看服务日志
docker logs -f langgraph-runner
# 或本地运行时
# 日志会输出到 stdout日志示例:
INFO: 🚀 LangGraph Runner starting up...
INFO: ✅ Loaded 1 graph(s)
INFO: 📥 Received run request: workflow=project_diagnosis_workflow
INFO: ▶️ Executing graph: diagnosis
INFO: ✅ Graph execution completed: diagnosis (status=completed)8.2 Node.js 侧日志
bash
# 查看 Orchestrator 日志
tail -f backend/logs/app.log | grep "Orchestrator"
# 或
tail -f backend/logs/app.log | grep "LangGraphClient"日志示例:
[Orchestrator] Running workflow: project_diagnosis_workflow (engine: langgraph)
[LangGraphClient] Calling LangGraph for workflow: project_diagnosis_workflow
[LangGraphClient] Workflow project_diagnosis_workflow completed with status: completed8.3 LangSmith 追踪(可选)
在
.env中启用:envLANGSMITH_TRACING=true LANGSMITH_API_KEY=your-key访问 https://smith.langchain.com/ 查看:
- 每次执行的完整 trace
- LLM 调用次数和 token 消耗
- 执行时间分布
- 错误堆栈
九、故障排查
9.1 常见问题
| 问题 | 可能原因 | 解决方案 |
|---|---|---|
| LangGraph 启动失败 | 缺少依赖 | pip install -r requirements.txt |
| 401 Unauthorized | Token 不匹配 | 检查 .env 中的 NODE_API_TOKEN |
| 工作流执行超时 | OpenAI API 慢 | 增加 LangGraphClient 超时时间 |
| 找不到 graph | Graph 未加载 | 检查 main.py 中的 graph 注册 |
9.2 降级方案
如果 LangGraph Runner 不可用:
临时降级:在
WorkflowRegistry.js中注释掉engine: 'langgraph'javascriptregistry.register('project_diagnosis_workflow', { // engine: 'langgraph', // 临时注释 // ... });重启 Node 服务
bashpm2 restart kkmusic-backend工作流将回退到 Node WorkflowEngine
十、下一步
10.1 P3 阶段(持续迭代)
Flowise 灵感空间:
- 把灵感检索迁移到 Flowise
- 把策略对话迁移到 Flowise
- 集成 Vector Store 实现语料检索
10.2 更多工作流迁移
根据 WORKFLOW_MIGRATION_ROADMAP.md,后续可迁移:
healing_night_mv(高复杂度,4 个 Agent)knowledge_explainer(中复杂度,3 个 Agent)product_showcase(高复杂度,4 个 Agent)
十一、相关文档
- KKMUSIC_ARCHITECTURE_BLUEPRINT.md - 整体架构蓝图
- UNIFIED_RUN_JOB_CONTRACT.md - Run/Job 统一接口规范
- WORKFLOW_MIGRATION_ROADMAP.md - 迁移路线图(包含 P2 详细计划)
- P1_SCHEDULER_MIGRATION_COMPLETE.md - P1 阶段报告
- langgraph-runner/README.md - LangGraph Runner 使用指南
文档维护者:一刻工坊技术团队
最后更新:2024-12-14
状态:✅ P2 阶段开发完成,待部署测试