Skip to content

27.3 自主规划算法

自主规划算法概述

自主规划算法是Agentic AI系统的核心能力之一,它允许AI系统根据目标自主分解任务、制定执行计划、监控执行进度,并根据实际情况调整计划。

规划算法的基本概念

1. 什么是自主规划

自主规划是指AI系统在没有人类干预的情况下,根据给定的目标,自动生成和执行任务计划的过程。

自主规划的特点 :

  • 目标驱动 : 以目标为导向进行规划
  • 任务分解 : 将复杂目标分解为可执行的子任务
  • 动态调整 : 根据执行情况动态调整计划
  • 资源管理 : 合理分配和利用资源

2. 规划算法分类

算法类型特点适用场景

前向搜索| 从初始状态到目标状态| 目标明确,状态空间小 后向搜索| 从目标状态到初始状态| 目标状态明确,初始状态复杂 双向搜索| 同时从两端搜索| 状态空间大,两端明确 层次规划| 分层规划,逐步细化| 复杂任务,多层级目标 部分规划| 规划部分路径,边执行边规划| 动态环境,不确定性高

任务分解算法

1. 层次任务分解

示例:层次任务分解

用户请求: "实现一个层次任务分解算法"

Claude Code 生成的代码:

python
    python

    ````python

```python
```python
    from typing import Dict, List, Any, Optional
    from dataclasses import dataclass, field
    from enum import Enum
    import logging

    logger = logging.getLogger(__name__)

    class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
python
    class TaskPriority(Enum):
"""任务优先级"""
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4

@dataclass
python
    class Task:
    """任务"""
    id: str
    name: str
    description: str
    status: TaskStatus = TaskStatus.PENDING
    priority: TaskPriority = TaskPriority.MEDIUM
    dependencies: List[str] = field(default_factory=list)
    subtasks: List['Task'] = field(default_factory=list)
    estimated_duration: Optional[float] = None
    actual_duration: Optional[float] = None
    result: Optional[Any] = None
    error: Optional[str] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    class HierarchicalTaskPlanner:
"""层次任务规划器"""
python
    def __init__(self):
    self.tasks: Dict[str, Task] = {}
    self.task_graph: Dict[str, List[str]] = {}

    def add_task(self, task: Task):
"""添加任务"""
python
    self.tasks[task.id] = task
    self.task_graph[task.id] = task.dependencies
    logger.info(f"Task added: {task.id}")

    def decompose_task(self, task_id: str, max_depth: int = 3) -> List[Task]:
"""分解任务"""
task = self.tasks.get(task_id)
bash
    if not task:
    raise ValueError(f"Task not found: {task_id}")

    if max_depth <= 0:
    return [task]
 # 生成子任务
subtasks = self._generate_subtasks(task)

 # 递归分解子任务
all_tasks = [task]
bash
    for subtask in subtasks:
    self.add_task(subtask)
    task.subtasks.append(subtask)
    all_tasks.extend(self.decompose_task(subtask.id, max_depth - 1))
 # 更新任务图
python
    self.task_graph[task.id] = [subtask.id for subtask in subtasks]

    return all_tasks

    def _generate_subtasks(self, task: Task) -> List[Task]:
"""生成子任务"""
subtasks = []

 # 根据任务类型生成子任务
task_type = task.metadata.get('type', 'default')
bash
    if task_type == 'code_generation':
    subtasks = self._generate_code_generation_subtasks(task)
    elif task_type == 'code_review':
    subtasks = self._generate_code_review_subtasks(task)
    elif task_type == 'testing':
    subtasks = self._generate_testing_subtasks(task)
    elif task_type == 'deployment':
    subtasks = self._generate_deployment_subtasks(task)
    else:
    subtasks = self._generate_default_subtasks(task)

    return subtasks

    def _generate_code_generation_subtasks(self, task: Task) -> List[Task]:
"""生成代码生成子任务"""
subtasks = [
Task(
id=f"{task.id}_analyze_requirements",
name="Analyze Requirements",
description="Analyze the requirements for code generation",
priority=TaskPriority.HIGH,
metadata={'type': 'analysis'}
),
Task(
id=f"{task.id}_design_architecture",
name="Design Architecture",
description="Design the system architecture",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_analyze_requirements"],
metadata={'type': 'design'}
),
Task(
id=f"{task.id}_generate_code",
name="Generate Code",
description="Generate the code",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_design_architecture"],
metadata={'type': 'code_generation'}
),
Task(
id=f"{task.id}_review_code",
name="Review Code",
description="Review the generated code",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_generate_code"],
metadata={'type': 'code_review'}
)
]
bash
    return subtasks

    def _generate_code_review_subtasks(self, task: Task) -> List[Task]:
"""生成代码审查子任务"""
subtasks = [
Task(
id=f"{task.id}_static_analysis",
name="Static Analysis",
description="Perform static code analysis",
priority=TaskPriority.HIGH,
metadata={'type': 'analysis'}
),
Task(
id=f"{task.id}_security_check",
name="Security Check",
description="Check for security vulnerabilities",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_static_analysis"],
metadata={'type': 'security'}
),
Task(
id=f"{task.id}_performance_check",
name="Performance Check",
description="Check for performance issues",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_static_analysis"],
metadata={'type': 'performance'}
),
Task(
id=f"{task.id}_generate_report",
name="Generate Report",
description="Generate review report",
priority=TaskPriority.MEDIUM,
dependencies=[
f"{task.id}_security_check",
f"{task.id}_performance_check"
],
metadata={'type': 'reporting'}
)
]
bash
    return subtasks

    def _generate_testing_subtasks(self, task: Task) -> List[Task]:
"""生成测试子任务"""
subtasks = [
Task(
id=f"{task.id}_unit_tests",
name="Unit Tests",
description="Write and run unit tests",
priority=TaskPriority.HIGH,
metadata={'type': 'testing'}
),
Task(
id=f"{task.id}_integration_tests",
name="Integration Tests",
description="Write and run integration tests",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_unit_tests"],
metadata={'type': 'testing'}
),
Task(
id=f"{task.id}_e2e_tests",
name="E2E Tests",
description="Write and run end-to-end tests",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_integration_tests"],
metadata={'type': 'testing'}
)
]
bash
    return subtasks

    def _generate_deployment_subtasks(self, task: Task) -> List[Task]:
"""生成部署子任务"""
subtasks = [
Task(
id=f"{task.id}_build",
name="Build",
description="Build the application",
priority=TaskPriority.HIGH,
metadata={'type': 'build'}
),
Task(
id=f"{task.id}_test",
name="Test",
description="Run tests",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_build"],
metadata={'type': 'testing'}
),
Task(
id=f"{task.id}_deploy",
name="Deploy",
description="Deploy to production",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_test"],
metadata={'type': 'deployment'}
),
Task(
id=f"{task.id}_verify",
name="Verify",
description="Verify deployment",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_deploy"],
metadata={'type': 'verification'}
)
]
bash
    return subtasks

    def _generate_default_subtasks(self, task: Task) -> List[Task]:
"""生成默认子任务"""
subtasks = [
Task(
id=f"{task.id}_prepare",
name="Prepare",
description="Prepare for task execution",
priority=TaskPriority.HIGH,
metadata={'type': 'preparation'}
),
Task(
id=f"{task.id}_execute",
name="Execute",
description="Execute the task",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_prepare"],
metadata={'type': 'execution'}
),
Task(
id=f"{task.id}_finalize",
name="Finalize",
description="Finalize the task",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_execute"],
metadata={'type': 'finalization'}
)
]
bash
    return subtasks

    def get_execution_plan(self, task_id: str) -> List[Task]:
"""获取执行计划"""
plan = []
visited = set()
python
    def dfs(current_task_id):
    if current_task_id in visited:
    return
    visited.add(current_task_id)
 # 先执行依赖任务
python
    for dep_id in self.task_graph.get(current_task_id, []):
    dfs(dep_id)
 # 添加当前任务
task = self.tasks.get(current_task_id)
bash
    if task:
    plan.append(task)

    dfs(task_id)

    return plan

    def visualize_plan(self, task_id: str) -> str:
"""可视化执行计划"""
plan = self.get_execution_plan(task_id)

visualization = "Execution Plan:\n"
bash
    for i, task in enumerate(plan, 1):
    status_icon = {
    TaskStatus.PENDING: "○",
    TaskStatus.IN_PROGRESS: "◐",
    TaskStatus.COMPLETED: "●",
    TaskStatus.FAILED: "✗",
    TaskStatus.CANCELLED: "⊘"
    }.get(task.status, "?")

    visualization += f"{i}. {status_icon} {task.name}\n"

    if task.subtasks:
    for j, subtask in enumerate(task.subtasks, 1):
    subtask_icon = {
    TaskStatus.PENDING: "○",
    TaskStatus.IN_PROGRESS: "◐",
    TaskStatus.COMPLETED: "●",
    TaskStatus.FAILED: "✗",
    TaskStatus.CANCELLED: "⊘"
    }.get(subtask.status, "?")

    visualization += f"   {j}. {subtask_icon} {subtask.name}\n"

    return visualization
# 使用示例
planner = HierarchicalTaskPlanner()

# 添加主任务
main_task = Task(
id="main",
name="Develop User Authentication System",
description="Develop a complete user authentication system",
priority=TaskPriority.HIGH,
metadata={'type': 'code_generation'}
)
bash
    planner.add_task(main_task)
# 分解任务
bash
    planner.decompose_task("main", max_depth=2)
# 获取执行计划
plan = planner.get_execution_plan("main")

print("Execution Plan:")
bash
    for task in plan:
    print(f"  - {task.name}")
# 可视化计划
print("\n" + planner.visualize_plan("main"))

```> **层次任务分解特点**:

> - 递归分解复杂任务
> - 自动生成子任务
> - 维护任务依赖关系
> - 生成执行计划

### 2. 动态任务调整

# 示例:动态任务调整
用户请求:
"实现动态任务调整算法"
Claude Code 生成的代码:

````python

`python

from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class DynamicTaskPlanner(HierarchicalTaskPlanner):
"""动态任务规划器"""

def __init__(self):
super().__init__()
self.execution_history: Dict[str, List[Dict[str, Any]]] = {}
self.performance_metrics: Dict[str, Dict[str, float]] = {}

def monitor_execution(self, task_id: str, status: TaskStatus, result: Optional[Any] = None, error: Optional[str] = None):
"""监控任务执行"""
task = self.tasks.get(task_id)
if not task:
return

 # 更新任务状态

task.status = status
task.result = result
task.error = error

 # 记录执行历史

if task_id not in self.execution_history:
self.execution_history[task_id] = []

self.execution_history[task_id].append({
'timestamp': datetime.utcnow(),
'status': status,
'result': result,
'error': error
})

logger.info(f"Task {task_id} status updated to {status}")

def analyze_performance(self, task_id: str) -> Dict[str, float]:
"""分析任务性能"""
if task_id not in self.execution_history:
return {}

history = self.execution_history[task_id]

 # 计算性能指标

metrics = {
'total_executions': len(history),
'success_rate': 0.0,
'avg_duration': 0.0,
'failure_rate': 0.0
}

successful = [h for h in history if h['status'] == TaskStatus.COMPLETED]
failed = [h for h in history if h['status'] == TaskStatus.FAILED]

if len(history) > 0:
metrics['success_rate'] = len(successful) / len(history)
metrics['failure_rate'] = len(failed) / len(history)

self.performance_metrics[task_id] = metrics

return metrics

def adjust_plan(self, task_id: str) -> List[Task]:
"""调整执行计划"""
task = self.tasks.get(task_id)
if not task:
return []

 # 分析性能

metrics = self.analyze_performance(task_id)

 # 根据性能调整计划

if metrics.get('failure_rate', 0) > 0.5:

 # 失败率高,添加重试任务

return self._add_retry_tasks(task)
elif metrics.get('avg_duration', 0) > 3600:

 # 执行时间长,优化任务

return self._optimize_tasks(task)
else:

 # 正常情况,返回原计划

return self.get_execution_plan(task_id)

def _add_retry_tasks(self, task: Task) -> List[Task]:
"""添加重试任务"""
retry_task = Task(
id=f"{task.id}_retry",
name=f"Retry {task.name}",
description=f"Retry failed task: {task.description}",
priority=TaskPriority.HIGH,
dependencies=[task.id],
metadata={'type': 'retry', 'original_task_id': task.id}
)

self.add_task(retry_task)

 # 更新任务图

self.task_graph[task.id].append(retry_task.id)

return self.get_execution_plan(task.id)

def _optimize_tasks(self, task: Task) -> List[Task]:
"""优化任务"""

 # 分解长时间运行的任务

if task.subtasks:

 # 已经有子任务,优化子任务

for subtask in task.subtasks:
if subtask.estimated_duration and subtask.estimated_duration > 1800:
self.decompose_task(subtask.id, max_depth=1)
else:

 # 分解当前任务

self.decompose_task(task.id, max_depth=2)

return self.get_execution_plan(task.id)

def predict_completion_time(self, task_id: str) -> Optional[float]:
"""预测完成时间"""
plan = self.get_execution_plan(task_id)

total_time = 0.0
for task in plan:
if task.estimated_duration:
total_time += task.estimated_duration
elif task.id in self.performance_metrics:

 # 使用历史平均时间

avg_duration = self.performance_metrics[task.id].get('avg_duration', 0)
total_time += avg_duration

return total_time if total_time > 0 else None

# 使用示例

planner = DynamicTaskPlanner()

# 添加任务

task = Task(
id="task1",
name="Process Large Dataset",
description="Process a large dataset",
priority=TaskPriority.HIGH,
estimated_duration=7200,
metadata={'type': 'data_processing'}
)

planner.add_task(task)

# 分解任务

planner.decompose_task("task1", max_depth=2)

# 模拟执行

planner.monitor_execution("task1", TaskStatus.IN_PROGRESS)

# 模拟失败

planner.monitor_execution("task1", TaskStatus.FAILED, error="Timeout")

# 分析性能

metrics = planner.analyze_performance("task1")
print(f"Performance metrics: {metrics}")

# 调整计划

adjusted_plan = planner.adjust_plan("task1")

print("\nAdjusted plan:")
for task in adjusted_plan:
print(f"  - {task.name}")

```> **动态任务调整特点**:
> - 实时监控任务执行
> - 分析任务性能
> - 根据性能调整计划
> - 预测完成时间

## 规划算法优化

### 1. 并行任务调度

# 示例:并行任务调度

用户请求:
"实现并行任务调度算法"
Claude Code 生成的代码:

````python
`python
python
    from typing import Dict, List, Set, Optional
    from collections import defaultdict, deque
    import logging

    logger = logging.getLogger(__name__)

    class ParallelTaskScheduler:
"""并行任务调度器"""
python
    def __init__(self, max_workers: int = 4):
    self.max_workers = max_workers
    self.tasks: Dict[str, Task] = {}
    self.dependencies: Dict[str, Set[str]] = defaultdict(set)
    self.dependents: Dict[str, Set[str]] = defaultdict(set)
    self.ready_tasks: Set[str] = set()
    self.running_tasks: Set[str] = set()
    self.completed_tasks: Set[str] = set()

    def add_task(self, task: Task):
"""添加任务"""
python
    self.tasks[task.id] = task
 # 添加依赖关系
bash
    for dep_id in task.dependencies:
    self.dependencies[task.id].add(dep_id)
    self.dependents[dep_id].add(task.id)
 # 如果没有依赖,标记为就绪
bash
    if not task.dependencies:
    self.ready_tasks.add(task.id)

    logger.info(f"Task added: {task.id}")

    def get_ready_tasks(self) -> List[Task]:
"""获取就绪任务"""
available = self.ready_tasks - self.running_tasks
python
    return [self.tasks[task_id] for task_id in available]

    def start_task(self, task_id: str):
"""开始任务"""
python
    if task_id in self.ready_tasks and task_id not in self.running_tasks:
    self.running_tasks.add(task_id)
    self.ready_tasks.remove(task_id)
    logger.info(f"Task started: {task_id}")

    def complete_task(self, task_id: str, result: Optional[Any] = None, error: Optional[str] = None):
"""完成任务"""
python
    if task_id not in self.running_tasks:
    logger.warning(f"Task not running: {task_id}")
    return
 # 更新任务状态
task = self.tasks[task_id]
task.status = TaskStatus.COMPLETED if not error else TaskStatus.FAILED
task.result = result
task.error = error

 # 更新集合
python
    self.running_tasks.remove(task_id)
    self.completed_tasks.add(task_id)
 # 检查依赖此任务的任务
python
    for dependent_id in self.dependents[task_id]:
    self.dependencies[dependent_id].remove(task_id)
 # 如果所有依赖都完成,标记为就绪
python
    if not self.dependencies[dependent_id]:
    self.ready_tasks.add(dependent_id)

    logger.info(f"Task completed: {task_id}")

    def get_schedule(self) -> List[List[str]]:
"""获取调度计划"""
schedule = []
remaining = set(self.tasks.keys())
bash
    while remaining:
 # 获取当前可以执行的任务
current_batch = []
bash
    for task_id in remaining:
    task = self.tasks[task_id]
    if all(dep in self.completed_tasks for dep in task.dependencies):
    current_batch.append(task_id)
 # 限制并发数
current_batch = current_batch[:self.max_workers]
bash
    if not current_batch:
    logger.warning("Circular dependency detected")
    break

    schedule.append(current_batch)
 # 更新剩余任务
remaining -= set(current_batch)
python
    self.completed_tasks.update(current_batch)

    return schedule

    def visualize_schedule(self) -> str:
"""可视化调度"""
schedule = self.get_schedule()

visualization = "Task Schedule:\n"
bash
    for i, batch in enumerate(schedule, 1):
    visualization += f"Batch {i} (parallel):\n"
    for task_id in batch:
    task = self.tasks[task_id]
    visualization += f"  - {task.name}\n"

    return visualization
# 使用示例
scheduler = ParallelTaskScheduler(max_workers=3)

# 添加任务
tasks = [
Task(id="A", name="Task A", description="Task A", priority=TaskPriority.HIGH),
Task(id="B", name="Task B", description="Task B", priority=TaskPriority.HIGH, dependencies=["A"]),
Task(id="C", name="Task C", description="Task C", priority=TaskPriority.HIGH, dependencies=["A"]),
Task(id="D", name="Task D", description="Task D", priority=TaskPriority.HIGH, dependencies=["B", "C"]),
Task(id="E", name="Task E", description="Task E", priority=TaskPriority.HIGH, dependencies=["B"]),
Task(id="F", name="Task F", description="Task F", priority=TaskPriority.HIGH, dependencies=["C"]),
Task(id="G", name="Task G", description="Task G", priority=TaskPriority.HIGH, dependencies=["D", "E", "F"])
]
bash
    for task in tasks:
    scheduler.add_task(task)
# 获取调度计划
schedule = scheduler.get_schedule()

print("Schedule:")
bash
    for i, batch in enumerate(schedule, 1):
    print(f"Batch {i}: {[scheduler.tasks[task_id].name for task_id in batch]}")
# 可视化调度
print("\n" + scheduler.visualize_schedule())

```> **并行任务调度特点**:

> - 识别可并行执行的任务
> - 限制并发任务数量
> - 处理任务依赖关系
> - 生成调度计划

## 总结

自主规划算法包括:

1. **规划算法的基本概念**: 什么是自主规划、规划算法分类
2. **任务分解算法**: 层次任务分解、动态任务调整
3. **规划算法优化**: 并行任务调度

通过自主规划算法,Claude Code可以自主分解任务、制定执行计划,并根据实际情况动态调整。

在下一节中,我们将探讨记忆系统设计。

基于 MIT 许可发布