Skip to content

P1 阶段完成报告 - n8n 接管定时调度

实施时间:2024-12-14
状态:✅ 开发完成,待部署测试

一、实施概览

1.1 目标

PlanSchedulerService 的定时逻辑从 Node.js setInterval 迁移到 n8n Cron,实现:

  • 更稳定的定时调度
  • 可视化的任务监控
  • 灵活的工作流编排

1.2 涉及工作流

workflow_id迁移前迁移后
voiceover_videoNode setIntervaln8n Cron → Node API
music_video_creationNode setIntervaln8n Cron → Node API
content_repurposing手动触发n8n Webhook
content_plan_workflowNode setIntervaln8n Cron → Node API

二、核心改动清单

2.1 新增文件

文件路径说明
backend/src/routes/scheduler.jsScheduler API(供 n8n 调用)
n8n/workflows/plan_scheduler.jsonn8n 工作流配置
n8n/README.mdn8n 部署和使用指南
docs/architecture/P1_SCHEDULER_MIGRATION_COMPLETE.md本文档

2.2 修改文件

文件路径修改内容
backend/src/services/workflow/PlanSchedulerService.js移除 setInterval,新增 getPendingTasks() 方法
backend/src/index.js注册 /api/scheduler 路由
backend/src/workflows/manifest.json添加 engine 字段

三、新增 API

3.1 GET /api/scheduler/pending-tasks

功能:获取待执行任务列表(供 n8n Cron 调用)

请求

http
GET http://localhost:3001/api/scheduler/pending-tasks?limit=10
Authorization: Bearer YOUR_TOKEN

响应

json
{
  "success": true,
  "data": {
    "tasks": [
      {
        "workflow_id": "voiceover",
        "project_id": "uuid-xxx",
        "project_name": "美业账号",
        "strategy_task_id": "task-uuid-xxx",
        "inputs": {
          "summary": "美业选题技巧",
          "platform": "douyin"
        },
        "task_summary": "美业选题技巧",
        "task_due_date": "2024-12-14",
        "priority": "high"
      }
    ],
    "fetched_at": "2024-12-14T10:00:00.000Z",
    "count": 1
  }
}

3.2 POST /api/scheduler/trigger

功能:触发单个任务执行

请求

http
POST http://localhost:3001/api/scheduler/trigger
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json

{
  "workflow_id": "voiceover",
  "project_id": "uuid-xxx",
  "strategy_task_id": "task-uuid-xxx",
  "inputs": {
    "summary": "美业选题技巧",
    "platform": "douyin"
  },
  "trigger_source": "n8n_scheduler"
}

响应

json
{
  "success": true,
  "data": {
    "run_id": "run-uuid-xxx",
    "workflow_id": "voiceover",
    "project_id": "uuid-xxx",
    "status": "running",
    "triggered_at": "2024-12-14T10:00:05.000Z"
  }
}

3.3 GET /api/scheduler/status

功能:获取调度器状态

请求

http
GET http://localhost:3001/api/scheduler/status
Authorization: Bearer YOUR_TOKEN

响应

json
{
  "success": true,
  "data": {
    "mode": "n8n_managed",
    "description": "Scheduler is managed by n8n Cron",
    "interval_ms": 900000,
    "interval_minutes": 15,
    "max_projects_per_tick": 10,
    "max_runs_per_project": 2,
    "valid_workflows": ["voiceover", "graphic", "music", "video", "live", "short_drama"],
    "timestamp": "2024-12-14T10:00:00.000Z"
  }
}

四、PlanSchedulerService 变化

4.1 废弃方法

以下方法已标记为 DEPRECATED,保留用于向后兼容和降级场景:

  • start() - 启动内置调度器(已迁移到 n8n)
  • stop() - 停止内置调度器(无需手动停止)
  • tick() - 轮询执行任务(已迁移到 n8n)
  • handleProject() - 处理单个项目(已拆分为 getPendingTasksForProject
  • launchWorkflow() - 启动工作流(已迁移到 /api/scheduler/trigger

4.2 新增方法

方法说明
getPendingTasks(options)获取所有待执行任务列表(供 n8n 调用)
getPendingTasksForProject(project, dateStr)获取单个项目的待执行任务

4.3 保留方法

以下方法保持不变,继续使用:

  • shouldAutoRunTask() - 判断任务是否应该自动执行
  • markAutoRun() - 标记任务执行状态
  • buildWorkflowPayload() - 构建工作流输入参数

五、manifest.json 扩展

所有工作流添加了 engine 字段:

json
{
  "id": "voiceover_video",
  "engine": "node",           // 新增字段:执行引擎
  // ... 其他字段
}

可选值

  • "node" - 使用 Node.js WorkflowEngine(默认)
  • "langgraph" - 使用 LangGraph Runner(P2 阶段)
  • "n8n" - 直接由 n8n 编排(如 content_repurposing)
  • "flowise" - Flowise Chatflow(P3 阶段)

六、部署步骤

6.1 前提条件

  • ✅ Node.js 后端已启动(监听 3001 端口)
  • ⏳ n8n 已安装并运行(监听 5678 端口)
  • ⏳ 获取有效的 API Token

6.2 部署 n8n 工作流

选项 A:通过 UI 导入

  1. 打开 n8n:http://localhost:5678
  2. 导入 n8n/workflows/plan_scheduler.json
  3. 修改 Authorization Header 中的 Token
  4. 激活工作流

选项 B:通过 Docker Compose

yaml
# docker-compose.yml
version: '3.8'

services:
  n8n:
    image: n8nio/n8n:latest
    ports:
      - "5678:5678"
    volumes:
      - ./n8n/workflows:/home/node/.n8n/workflows
      - n8n_data:/home/node/.n8n
    environment:
      - N8N_HOST=0.0.0.0
      - N8N_PORT=5678
      - N8N_PROTOCOL=http
      - WEBHOOK_URL=http://localhost:5678

volumes:
  n8n_data:
bash
# 启动 n8n
docker-compose up -d n8n

# 导入工作流
docker exec -it n8n n8n import:workflow --input=/home/node/.n8n/workflows/plan_scheduler.json

6.3 验证部署

步骤 1:检查 Node API

bash
# 检查健康状态
curl http://localhost:3001/health

# 检查调度器状态
curl -H "Authorization: Bearer YOUR_TOKEN" \
  http://localhost:3001/api/scheduler/status

步骤 2:测试 pending-tasks API

bash
curl -H "Authorization: Bearer YOUR_TOKEN" \
  http://localhost:3001/api/scheduler/pending-tasks

预期响应:返回待执行任务列表(如果有项目配置了日历)

步骤 3:手动测试 trigger API

bash
curl -X POST \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "workflow_id": "voiceover",
    "project_id": "YOUR_PROJECT_ID",
    "inputs": {
      "summary": "测试任务",
      "platform": "douyin"
    }
  }' \
  http://localhost:3001/api/scheduler/trigger

步骤 4:在 n8n 中手动执行工作流

  1. 打开 n8n:http://localhost:5678
  2. 打开 "Plan Scheduler" 工作流
  3. 点击 "Execute Workflow" 按钮
  4. 查看执行结果和日志

七、测试用例

7.1 单元测试(API)

javascript
// test/scheduler.test.js

describe('Scheduler API', () => {
  test('GET /api/scheduler/pending-tasks', async () => {
    const response = await request(app)
      .get('/api/scheduler/pending-tasks')
      .set('Authorization', `Bearer ${token}`)
      .expect(200);
    
    expect(response.body.success).toBe(true);
    expect(response.body.data).toHaveProperty('tasks');
    expect(response.body.data).toHaveProperty('count');
  });
  
  test('POST /api/scheduler/trigger', async () => {
    const response = await request(app)
      .post('/api/scheduler/trigger')
      .set('Authorization', `Bearer ${token}`)
      .send({
        workflow_id: 'voiceover',
        project_id: testProjectId,
        inputs: { summary: 'test' }
      })
      .expect(200);
    
    expect(response.body.success).toBe(true);
    expect(response.body.data).toHaveProperty('run_id');
  });
  
  test('GET /api/scheduler/status', async () => {
    const response = await request(app)
      .get('/api/scheduler/status')
      .set('Authorization', `Bearer ${token}`)
      .expect(200);
    
    expect(response.body.data.mode).toBe('n8n_managed');
  });
});

7.2 集成测试(n8n + Node)

javascript
// test/integration/scheduler-n8n.test.js

describe('n8n Scheduler Integration', () => {
  test('Full scheduler flow', async () => {
    // 1. 创建测试项目和日历
    const project = await createTestProject({
      status: 'planning',
      meta: {
        strategy: {
          blueprint: {
            calendar: [
              {
                id: 'task-1',
                workflow: 'voiceover',
                due_date: new Date().toISOString().split('T')[0],
                summary: '测试任务'
              }
            ]
          }
        }
      }
    });
    
    // 2. 调用 pending-tasks API
    const tasksResponse = await request(app)
      .get('/api/scheduler/pending-tasks')
      .set('Authorization', `Bearer ${token}`)
      .expect(200);
    
    expect(tasksResponse.body.data.count).toBeGreaterThan(0);
    const task = tasksResponse.body.data.tasks[0];
    
    // 3. 调用 trigger API
    const triggerResponse = await request(app)
      .post('/api/scheduler/trigger')
      .set('Authorization', `Bearer ${token}`)
      .send({
        workflow_id: task.workflow_id,
        project_id: task.project_id,
        strategy_task_id: task.strategy_task_id,
        inputs: task.inputs
      })
      .expect(200);
    
    const runId = triggerResponse.body.data.run_id;
    
    // 4. 验证任务已标记为 running
    const updatedProject = await Project.findByPk(project.id);
    const autoRuns = updatedProject.meta?.strategy?.auto_runs || {};
    expect(autoRuns['task-1'].status).toBe('completed');
  });
});

八、监控指标

8.1 关键指标

指标说明告警阈值
scheduler_tick_success_raten8n Cron 成功率< 95%
pending_tasks_fetch_duration获取任务列表耗时> 5s
workflow_trigger_success_rate工作流触发成功率< 90%
n8n_execution_error_raten8n 执行错误率> 5%

8.2 日志示例

[PlanScheduler] Found 3 pending tasks (not executing, use n8n).
[Scheduler API] Fetching pending tasks...
[Scheduler API] Found 3 pending tasks
[Scheduler API] Triggering workflow: voiceover for project: uuid-xxx
[Scheduler API] Workflow triggered successfully: voiceover

九、降级方案

如果 n8n 服务不可用,可以快速回退到 Node 内置调度器:

9.1 回退步骤

  1. 取消注释代码
javascript
// backend/src/index.js

// 取消以下注释:
const { planScheduler } = require('./services/workflow');
planScheduler.start();
  1. 重启 Node 服务
bash
pm2 restart kkmusic-backend
# 或
npm run dev
  1. 验证调度器运行
bash
# 查看日志,应该看到:
# [PlanScheduler] Started (interval=15m)
tail -f backend/logs/app.log | grep "PlanScheduler"

9.2 回退影响

  • ✅ 任务正常触发
  • ⚠️ 失去 n8n 可视化监控
  • ⚠️ 失去 n8n 灵活编排能力

十、下一步

10.1 P2 阶段(1-2 周)

  • 创建 langgraph-runner Python 服务
  • 迁移 project_diagnosis_workflow 到 LangGraph
  • 实现 Run/Job 统一接口

10.2 P3 阶段(持续迭代)

  • 把灵感检索迁移到 Flowise
  • 把策略对话迁移到 Flowise

十一、相关文档


文档维护者:一刻工坊技术团队
最后更新:2024-12-14
状态:✅ P1 阶段开发完成,待部署测试

© 2024-2025 趣美丽 QuMeiLi · Powered by 刻流星引擎 KeLiuXing