P1 阶段完成报告 - n8n 接管定时调度
实施时间:2024-12-14
状态:✅ 开发完成,待部署测试
一、实施概览
1.1 目标
将 PlanSchedulerService 的定时逻辑从 Node.js setInterval 迁移到 n8n Cron,实现:
- 更稳定的定时调度
- 可视化的任务监控
- 灵活的工作流编排
1.2 涉及工作流
| workflow_id | 迁移前 | 迁移后 |
|---|---|---|
voiceover_video | Node setInterval | n8n Cron → Node API |
music_video_creation | Node setInterval | n8n Cron → Node API |
content_repurposing | 手动触发 | n8n Webhook |
content_plan_workflow | Node setInterval | n8n Cron → Node API |
二、核心改动清单
2.1 新增文件
| 文件路径 | 说明 |
|---|---|
backend/src/routes/scheduler.js | Scheduler API(供 n8n 调用) |
n8n/workflows/plan_scheduler.json | n8n 工作流配置 |
n8n/README.md | n8n 部署和使用指南 |
docs/architecture/P1_SCHEDULER_MIGRATION_COMPLETE.md | 本文档 |
2.2 修改文件
| 文件路径 | 修改内容 |
|---|---|
backend/src/services/workflow/PlanSchedulerService.js | 移除 setInterval,新增 getPendingTasks() 方法 |
backend/src/index.js | 注册 /api/scheduler 路由 |
backend/src/workflows/manifest.json | 添加 engine 字段 |
三、新增 API
3.1 GET /api/scheduler/pending-tasks
功能:获取待执行任务列表(供 n8n Cron 调用)
请求
http
GET http://localhost:3001/api/scheduler/pending-tasks?limit=10
Authorization: Bearer YOUR_TOKEN响应
json
{
"success": true,
"data": {
"tasks": [
{
"workflow_id": "voiceover",
"project_id": "uuid-xxx",
"project_name": "美业账号",
"strategy_task_id": "task-uuid-xxx",
"inputs": {
"summary": "美业选题技巧",
"platform": "douyin"
},
"task_summary": "美业选题技巧",
"task_due_date": "2024-12-14",
"priority": "high"
}
],
"fetched_at": "2024-12-14T10:00:00.000Z",
"count": 1
}
}3.2 POST /api/scheduler/trigger
功能:触发单个任务执行
请求
http
POST http://localhost:3001/api/scheduler/trigger
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json
{
"workflow_id": "voiceover",
"project_id": "uuid-xxx",
"strategy_task_id": "task-uuid-xxx",
"inputs": {
"summary": "美业选题技巧",
"platform": "douyin"
},
"trigger_source": "n8n_scheduler"
}响应
json
{
"success": true,
"data": {
"run_id": "run-uuid-xxx",
"workflow_id": "voiceover",
"project_id": "uuid-xxx",
"status": "running",
"triggered_at": "2024-12-14T10:00:05.000Z"
}
}3.3 GET /api/scheduler/status
功能:获取调度器状态
请求
http
GET http://localhost:3001/api/scheduler/status
Authorization: Bearer YOUR_TOKEN响应
json
{
"success": true,
"data": {
"mode": "n8n_managed",
"description": "Scheduler is managed by n8n Cron",
"interval_ms": 900000,
"interval_minutes": 15,
"max_projects_per_tick": 10,
"max_runs_per_project": 2,
"valid_workflows": ["voiceover", "graphic", "music", "video", "live", "short_drama"],
"timestamp": "2024-12-14T10:00:00.000Z"
}
}四、PlanSchedulerService 变化
4.1 废弃方法
以下方法已标记为 DEPRECATED,保留用于向后兼容和降级场景:
start()- 启动内置调度器(已迁移到 n8n)stop()- 停止内置调度器(无需手动停止)tick()- 轮询执行任务(已迁移到 n8n)handleProject()- 处理单个项目(已拆分为getPendingTasksForProject)launchWorkflow()- 启动工作流(已迁移到/api/scheduler/trigger)
4.2 新增方法
| 方法 | 说明 |
|---|---|
getPendingTasks(options) | 获取所有待执行任务列表(供 n8n 调用) |
getPendingTasksForProject(project, dateStr) | 获取单个项目的待执行任务 |
4.3 保留方法
以下方法保持不变,继续使用:
shouldAutoRunTask()- 判断任务是否应该自动执行markAutoRun()- 标记任务执行状态buildWorkflowPayload()- 构建工作流输入参数
五、manifest.json 扩展
所有工作流添加了 engine 字段:
json
{
"id": "voiceover_video",
"engine": "node", // 新增字段:执行引擎
// ... 其他字段
}可选值:
"node"- 使用 Node.js WorkflowEngine(默认)"langgraph"- 使用 LangGraph Runner(P2 阶段)"n8n"- 直接由 n8n 编排(如 content_repurposing)"flowise"- Flowise Chatflow(P3 阶段)
六、部署步骤
6.1 前提条件
- ✅ Node.js 后端已启动(监听 3001 端口)
- ⏳ n8n 已安装并运行(监听 5678 端口)
- ⏳ 获取有效的 API Token
6.2 部署 n8n 工作流
选项 A:通过 UI 导入
- 打开 n8n:
http://localhost:5678 - 导入
n8n/workflows/plan_scheduler.json - 修改
AuthorizationHeader 中的 Token - 激活工作流
选项 B:通过 Docker Compose
yaml
# docker-compose.yml
version: '3.8'
services:
n8n:
image: n8nio/n8n:latest
ports:
- "5678:5678"
volumes:
- ./n8n/workflows:/home/node/.n8n/workflows
- n8n_data:/home/node/.n8n
environment:
- N8N_HOST=0.0.0.0
- N8N_PORT=5678
- N8N_PROTOCOL=http
- WEBHOOK_URL=http://localhost:5678
volumes:
n8n_data:bash
# 启动 n8n
docker-compose up -d n8n
# 导入工作流
docker exec -it n8n n8n import:workflow --input=/home/node/.n8n/workflows/plan_scheduler.json6.3 验证部署
步骤 1:检查 Node API
bash
# 检查健康状态
curl http://localhost:3001/health
# 检查调度器状态
curl -H "Authorization: Bearer YOUR_TOKEN" \
http://localhost:3001/api/scheduler/status步骤 2:测试 pending-tasks API
bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
http://localhost:3001/api/scheduler/pending-tasks预期响应:返回待执行任务列表(如果有项目配置了日历)
步骤 3:手动测试 trigger API
bash
curl -X POST \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"workflow_id": "voiceover",
"project_id": "YOUR_PROJECT_ID",
"inputs": {
"summary": "测试任务",
"platform": "douyin"
}
}' \
http://localhost:3001/api/scheduler/trigger步骤 4:在 n8n 中手动执行工作流
- 打开 n8n:
http://localhost:5678 - 打开 "Plan Scheduler" 工作流
- 点击 "Execute Workflow" 按钮
- 查看执行结果和日志
七、测试用例
7.1 单元测试(API)
javascript
// test/scheduler.test.js
describe('Scheduler API', () => {
test('GET /api/scheduler/pending-tasks', async () => {
const response = await request(app)
.get('/api/scheduler/pending-tasks')
.set('Authorization', `Bearer ${token}`)
.expect(200);
expect(response.body.success).toBe(true);
expect(response.body.data).toHaveProperty('tasks');
expect(response.body.data).toHaveProperty('count');
});
test('POST /api/scheduler/trigger', async () => {
const response = await request(app)
.post('/api/scheduler/trigger')
.set('Authorization', `Bearer ${token}`)
.send({
workflow_id: 'voiceover',
project_id: testProjectId,
inputs: { summary: 'test' }
})
.expect(200);
expect(response.body.success).toBe(true);
expect(response.body.data).toHaveProperty('run_id');
});
test('GET /api/scheduler/status', async () => {
const response = await request(app)
.get('/api/scheduler/status')
.set('Authorization', `Bearer ${token}`)
.expect(200);
expect(response.body.data.mode).toBe('n8n_managed');
});
});7.2 集成测试(n8n + Node)
javascript
// test/integration/scheduler-n8n.test.js
describe('n8n Scheduler Integration', () => {
test('Full scheduler flow', async () => {
// 1. 创建测试项目和日历
const project = await createTestProject({
status: 'planning',
meta: {
strategy: {
blueprint: {
calendar: [
{
id: 'task-1',
workflow: 'voiceover',
due_date: new Date().toISOString().split('T')[0],
summary: '测试任务'
}
]
}
}
}
});
// 2. 调用 pending-tasks API
const tasksResponse = await request(app)
.get('/api/scheduler/pending-tasks')
.set('Authorization', `Bearer ${token}`)
.expect(200);
expect(tasksResponse.body.data.count).toBeGreaterThan(0);
const task = tasksResponse.body.data.tasks[0];
// 3. 调用 trigger API
const triggerResponse = await request(app)
.post('/api/scheduler/trigger')
.set('Authorization', `Bearer ${token}`)
.send({
workflow_id: task.workflow_id,
project_id: task.project_id,
strategy_task_id: task.strategy_task_id,
inputs: task.inputs
})
.expect(200);
const runId = triggerResponse.body.data.run_id;
// 4. 验证任务已标记为 running
const updatedProject = await Project.findByPk(project.id);
const autoRuns = updatedProject.meta?.strategy?.auto_runs || {};
expect(autoRuns['task-1'].status).toBe('completed');
});
});八、监控指标
8.1 关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
scheduler_tick_success_rate | n8n Cron 成功率 | < 95% |
pending_tasks_fetch_duration | 获取任务列表耗时 | > 5s |
workflow_trigger_success_rate | 工作流触发成功率 | < 90% |
n8n_execution_error_rate | n8n 执行错误率 | > 5% |
8.2 日志示例
[PlanScheduler] Found 3 pending tasks (not executing, use n8n).
[Scheduler API] Fetching pending tasks...
[Scheduler API] Found 3 pending tasks
[Scheduler API] Triggering workflow: voiceover for project: uuid-xxx
[Scheduler API] Workflow triggered successfully: voiceover九、降级方案
如果 n8n 服务不可用,可以快速回退到 Node 内置调度器:
9.1 回退步骤
- 取消注释代码
javascript
// backend/src/index.js
// 取消以下注释:
const { planScheduler } = require('./services/workflow');
planScheduler.start();- 重启 Node 服务
bash
pm2 restart kkmusic-backend
# 或
npm run dev- 验证调度器运行
bash
# 查看日志,应该看到:
# [PlanScheduler] Started (interval=15m)
tail -f backend/logs/app.log | grep "PlanScheduler"9.2 回退影响
- ✅ 任务正常触发
- ⚠️ 失去 n8n 可视化监控
- ⚠️ 失去 n8n 灵活编排能力
十、下一步
10.1 P2 阶段(1-2 周)
- 创建
langgraph-runnerPython 服务 - 迁移
project_diagnosis_workflow到 LangGraph - 实现 Run/Job 统一接口
10.2 P3 阶段(持续迭代)
- 把灵感检索迁移到 Flowise
- 把策略对话迁移到 Flowise
十一、相关文档
- KKMUSIC_ARCHITECTURE_BLUEPRINT.md - 整体架构蓝图
- UNIFIED_RUN_JOB_CONTRACT.md - 统一接口规范
- WORKFLOW_MIGRATION_ROADMAP.md - 迁移路线
- n8n/README.md - n8n 部署和使用指南
文档维护者:一刻工坊技术团队
最后更新:2024-12-14
状态:✅ P1 阶段开发完成,待部署测试