Skip to content

26.1 子Agent概述

子Agent概述

子Agent是Claude Code中的一个重要概念,它允许主Agent将复杂任务分解为多个子任务,并委托给专门的子Agent来处理。通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务,提高任务完成的质量和速度。

子Agent的概念

1. 什么是子Agent

子Agent是主Agent的辅助实体,具有以下特点:

  • 专业化 : 每个子Agent专注于特定的任务领域
  • 独立性 : 子Agent可以独立完成任务,不依赖主Agent的持续干预
  • 协作性 : 多个子Agent可以协同工作,完成复杂的任务
  • 可复用性 : 子Agent可以在不同的任务中被重复使用

2. 子Agent vs 主Agent

特性主Agent子Agent

职责| 协调和决策| 执行特定任务 交互| 与用户直接交互| 与主Agent交互 范围| 全局视角| 局部视角 自主性| 高| 中等 专业化程度| 通用| 专用

子Agent的类型

1. 代码生成子Agent

示例:代码生成子Agent

用户请求: "创建一个用户认证系统的代码生成子Agent"

Claude Code 生成的子Agent:

python
    python

    ````python

```python
```python
    class CodeGenerationAgent:
"""代码生成子Agent"""
python
    def __init__(self, context):
    self.context = context
    self.templates = {}
    self.best_practices = {}

    def generate_auth_system(self, requirements):
"""生成认证系统代码"""
 # 分析需求
analysis = self._analyze_requirements(requirements)

 # 选择模板
template = self._select_template(analysis)

 # 生成代码
code = self._generate_code(template, analysis)

 # 应用最佳实践
code = self._apply_best_practices(code)

 # 验证代码
validation = self._validate_code(code)
bash
    return {
    'code': code,
    'validation': validation,
    'documentation': self._generate_documentation(code)
    }

    def _analyze_requirements(self, requirements):
"""分析需求"""
analysis = {
'auth_methods': [],
'security_level': 'medium',
'features': []
}
bash
    if 'jwt' in requirements:
    analysis['auth_methods'].append('jwt')

    if 'oauth' in requirements:
    analysis['auth_methods'].append('oauth')

    if '2fa' in requirements:
    analysis['features'].append('2fa')

    if 'session' in requirements:
    analysis['auth_methods'].append('session')

    return analysis

    def _select_template(self, analysis):
"""选择模板"""
bash
    if 'jwt' in analysis['auth_methods']:
    return 'jwt_auth_template'
    elif 'oauth' in analysis['auth_methods']:
    return 'oauth_auth_template'
    else:
    return 'basic_auth_template'

    def _generate_code(self, template, analysis):
"""生成代码"""
bash
    if template == 'jwt_auth_template':
    return self._generate_jwt_auth(analysis)
    elif template == 'oauth_auth_template':
    return self._generate_oauth_auth(analysis)
    else:
    return self._generate_basic_auth(analysis)

    def _generate_jwt_auth(self, analysis):
"""生成JWT认证代码"""
code = '''
python
    from flask import Flask, request, jsonify
    from flask_jwt_extended import (
    JWTManager, create_access_token,
    jwt_required, get_jwt_identity
    )
    from datetime import timedelta
    import bcrypt

    app = Flask(__name__)
    app.config['JWT_SECRET_KEY'] = 'your-secret-key'
    app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24)

    jwt = JWTManager(app)

    class AuthService:
"""认证服务"""
python
    def __init__(self, db):
    self.db = db

    def register(self, username, password):
"""用户注册"""
 # 检查用户名是否存在
python
    if self._user_exists(username):
    raise ValueError("Username already exists")
 # 哈希密码
password_hash = bcrypt.hashpw(
bash
    password.encode('utf-8'),
    bcrypt.gensalt()
    ).decode('utf-8')
 # 创建用户
user = User(username=username, password_hash=password_hash)
python
    self.db.session.add(user)
    self.db.session.commit()

    return user

    def login(self, username, password):
"""用户登录"""
 # 查找用户
user = User.query.filter_by(username=username).first()
bash
    if not user:
    raise ValueError("Invalid username or password")
 # 验证密码
bash
    if not bcrypt.checkpw(
    password.encode('utf-8'),
    user.password_hash.encode('utf-8')
    ):
    raise ValueError("Invalid username or password")
 # 生成访问令牌
access_token = create_access_token(identity=user.id)
bash
    return {
    'access_token': access_token,
    'user': {
    'id': user.id,
    'username': user.username
    }
    }

    def _user_exists(self, username):
"""检查用户名是否存在"""
bash
    return User.query.filter_by(username=username).first() is not None
    '''
    return code

    def _apply_best_practices(self, code):
"""应用最佳实践"""
 # 添加错误处理
code = self._add_error_handling(code)

 # 添加日志记录
code = self._add_logging(code)

 # 添加输入验证
code = self._add_input_validation(code)
bash
    return code

    def _validate_code(self, code):
"""验证代码"""
validation = {
'syntax': True,
'security': True,
'best_practices': True,
'issues': []
}

 # 检查语法
bash
    try:
bash
    compile(code, '<string>', 'exec')
bash
    except SyntaxError as e:
    validation['syntax'] = False
    validation['issues'].append(f"Syntax error: {e}")
 # 检查安全性
bash
    if 'SECRET_KEY' in code and 'your-secret-key' in code:
    validation['security'] = False
    validation['issues'].append("Default secret key detected")

    return validation

    def _generate_documentation(self, code):
"""生成文档"""
bash
    return {
    'description': 'JWT-based authentication system',
    'endpoints': [
    {
    'path': '/register',
    'method': 'POST',
    'description': 'Register a new user',
    'parameters': {
    'username': 'string (required)',
    'password': 'string (required)'
    }
    },
    {
    'path': '/login',
    'method': 'POST',
    'description': 'Login with username and password',
    'parameters': {
    'username': 'string (required)',
    'password': 'string (required)'
    }
    }
    ],
    'security': {
    'password_hashing': 'bcrypt',
    'token': 'JWT',
    'token_expiry': '24 hours'
    }
    }
```> **子Agent特点**:

> - 专注于代码生成
> - 包含模板库和最佳实践
> - 自动验证生成的代码
> - 生成配套文档

### 2. 代码审查子Agent

# 示例:代码审查子Agent
用户请求:
"创建一个代码审查子Agent"
Claude Code 生成的子Agent:

````python

`python

class CodeReviewAgent:
"""代码审查子Agent"""

def __init__(self, context):
self.context = context
self.rules = self._load_review_rules()
self.patterns = self._load_antipatterns()

def review_code(self, code, language='python'):
"""审查代码"""
review = {
'summary': {},
'issues': [],
'suggestions': [],
'metrics': {}
}

 # 执行静态分析

static_analysis = self._static_analysis(code, language)
review['issues'].extend(static_analysis['issues'])

 # 检查代码模式

pattern_analysis = self._check_patterns(code, language)
review['issues'].extend(pattern_analysis['issues'])
review['suggestions'].extend(pattern_analysis['suggestions'])

 # 计算代码指标

metrics = self._calculate_metrics(code, language)
review['metrics'] = metrics

 # 生成总结

review['summary'] = self._generate_summary(review)

return review

def _static_analysis(self, code, language):
"""静态分析"""
issues = []

 # 检查未使用的导入

unused_imports = self._check_unused_imports(code, language)
issues.extend(unused_imports)

 # 检查未使用的变量

unused_variables = self._check_unused_variables(code, language)
issues.extend(unused_variables)

 # 检查代码复杂度

complexity_issues = self._check_complexity(code, language)
issues.extend(complexity_issues)

 # 检查安全问题

security_issues = self._check_security(code, language)
issues.extend(security_issues)

return {'issues': issues}

def _check_unused_imports(self, code, language):
"""检查未使用的导入"""
issues = []

if language == 'python':
import ast
import re

tree = ast.parse(code)

 # 获取所有导入

imports = set()
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.add(alias.name.split('.')[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
imports.add(node.module.split('.')[0])

 # 检查是否使用

for imp in imports:
if imp not in code[code.find(imp):]:
issues.append({
'type': 'unused_import',
'severity': 'warning',
'message': f"Unused import: {imp}",
'line': self._find_line(code, imp)
})

return issues

def _check_complexity(self, code, language):
"""检查代码复杂度"""
issues = []

if language == 'python':
import ast

tree = ast.parse(code)

for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
complexity = self._calculate_cyclomatic_complexity(node)

if complexity > 10:
issues.append({
'type': 'high_complexity',
'severity': 'warning',
'message': f"Function '{node.name}' has high complexity ({complexity})",
'line': node.lineno,
'complexity': complexity
})

return issues

def _calculate_cyclomatic_complexity(self, node):
"""计算圈复杂度"""
complexity = 1

for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)):
complexity += 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1

return complexity

def _check_security(self, code, language):
"""检查安全问题"""
issues = []

 # 检查硬编码密钥

if 'password' in code.lower() and '=' in code:
lines = code.split('\n')
for i, line in enumerate(lines, 1):
if 'password' in line.lower() and '"' in line:
issues.append({
'type': 'security',
'severity': 'error',
'message': 'Possible hardcoded password detected',
'line': i
})

 # 检查SQL注入风险

if 'execute' in code and '%' in code:
lines = code.split('\n')
for i, line in enumerate(lines, 1):
if 'execute' in line and '%' in line and not 'param' in line.lower():
issues.append({
'type': 'security',
'severity': 'error',
'message': 'Possible SQL injection vulnerability',
'line': i
})

return issues

def _check_patterns(self, code, language):
"""检查代码模式"""
issues = []
suggestions = []

 # 检查长函数

if language == 'python':
import ast

tree = ast.parse(code)

for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
end_line = node.end_lineno if hasattr(node, 'end_lineno') else node.lineno
length = end_line - node.lineno

if length > 50:
issues.append({
'type': 'long_function',
'severity': 'warning',
'message': f"Function '{node.name}' is too long ({length} lines)",
'line': node.lineno,
'length': length
})
suggestions.append({
'type': 'refactor',
'message': f"Consider breaking down '{node.name}' into smaller functions",
'line': node.lineno
})

return {'issues': issues, 'suggestions': suggestions}

def _calculate_metrics(self, code, language):
"""计算代码指标"""
metrics = {}

lines = code.split('\n')

 # 总行数

metrics['total_lines'] = len(lines)

 # 代码行数(排除空行和注释)

code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')]
metrics['code_lines'] = len(code_lines)

 # 注释行数

comment_lines = [line for line in lines if line.strip().startswith('#')]
metrics['comment_lines'] = len(comment_lines)

 # 空行数

blank_lines = [line for line in lines if not line.strip()]
metrics['blank_lines'] = len(blank_lines)

 # 注释率

metrics['comment_ratio'] = metrics['comment_lines'] / metrics['code_lines'] if metrics['code_lines'] > 0 else 0

return metrics

def _generate_summary(self, review):
"""生成审查总结"""
summary = {
'total_issues': len(review['issues']),
'critical_issues': len([i for i in review['issues'] if i['severity'] == 'error']),
'warning_issues': len([i for i in review['issues'] if i['severity'] == 'warning']),
'total_suggestions': len(review['suggestions']),
'code_quality': 'good'
}

 # 评估代码质量

if summary['critical_issues'] > 0:
summary['code_quality'] = 'poor'
elif summary['warning_issues'] > 5:
summary['code_quality'] = 'fair'

return summary

```> **子Agent特点**:
> - 专注于代码审查
> - 包含审查规则库
> - 检查代码模式和反模式
> - 计算代码指标

## 子Agent的协作

### 1. 串行协作

# 示例:子Agent串行协作

用户请求:
"使用子Agent完成一个完整的开发任务"
Claude Code 生成的协作流程:

````python
`python
python
    class AgentOrchestrator:
    """Agent协调器"""

    def __init__(self, context):
    self.context = context
    self.agents = {
    'code_generator': CodeGenerationAgent(context),
    'code_reviewer': CodeReviewAgent(context),
    'test_generator': TestGenerationAgent(context),
    'documenter': DocumentationAgent(context)
    }

    def execute_task(self, task):
"""执行任务"""
results = {}

 # 步骤1: 生成代码
print("Step 1: Generating code...")
code_result = self.agents['code_generator'].generate_code(task)
results['code'] = code_result

 # 步骤2: 审查代码
print("Step 2: Reviewing code...")
review_result = self.agents['code_reviewer'].review_code(
code_result['code'],
language=task.get('language', 'python')
)
results['review'] = review_result

 # 步骤3: 修复问题
bash
    if review_result['summary']['critical_issues'] > 0:
    print("Step 3: Fixing critical issues...")
    fixed_code = self._fix_issues(
    code_result['code'],
    review_result['issues']
    )
    results['fixed_code'] = fixed_code
    else:
    results['fixed_code'] = code_result['code']
 # 步骤4: 生成测试
print("Step 4: Generating tests...")
test_result = self.agents['test_generator'].generate_tests(
results['fixed_code'],
language=task.get('language', 'python')
)
results['tests'] = test_result

 # 步骤5: 生成文档
print("Step 5: Generating documentation...")
doc_result = self.agents['documenter'].generate_documentation(
results['fixed_code'],
test_result['tests']
)
results['documentation'] = doc_result
bash
    return results

    def _fix_issues(self, code, issues):
"""修复代码问题"""
fixed_code = code
bash
    for issue in issues:
    if issue['type'] == 'security':
    fixed_code = self._fix_security_issue(fixed_code, issue)
    elif issue['type'] == 'high_complexity':
    fixed_code = self._refactor_complex_function(fixed_code, issue)

    return fixed_code

    def _fix_security_issue(self, code, issue):
"""修复安全问题"""
 # 实现安全修复逻辑
bash
    return code

    def _refactor_complex_function(self, code, issue):
"""重构复杂函数"""
 # 实现重构逻辑
bash
    return code
```> **协作流程**:

1. 代码生成子Agent生成代码
2. 代码审查子Agent审查代码
3. 修复发现的问题
4. 测试生成子Agent生成测试
5. 文档生成子Agent生成文档

### 2. 并行协作

# 示例:子Agent并行协作
用户请求:
"使用子Agent并行处理多个任务"
Claude Code 生成的并行协作流程:

````python

`python

import concurrent.futures

class ParallelAgentOrchestrator:
"""并行Agent协调器"""

def __init__(self, context):
self.context = context
self.agents = {
'code_generator': CodeGenerationAgent(context),
'code_reviewer': CodeReviewAgent(context),
'test_generator': TestGenerationAgent(context),
'documenter': DocumentationAgent(context)
}

def execute_parallel_tasks(self, tasks):
"""并行执行任务"""
results = {}

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

 # 提交任务

future_to_task = {
executor.submit(self.agents['code_generator'].generate_code, task): task
for task in tasks
}

 # 收集结果

for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
result = future.result()
results[task['id']] = result
except Exception as e:
results[task['id']] = {'error': str(e)}

return results

def execute_parallel_analysis(self, code):
"""并行分析代码"""
results = {}

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

 # 提交分析任务

futures = {
'static_analysis': executor.submit(
self.agents['code_reviewer']._static_analysis,
code,
'python'
),
'pattern_analysis': executor.submit(
self.agents['code_reviewer']._check_patterns,
code,
'python'
),
'security_analysis': executor.submit(
self.agents['code_reviewer']._check_security,
code,
'python'
),
'metrics_calculation': executor.submit(
self.agents['code_reviewer']._calculate_metrics,
code,
'python'
)
}

 # 收集结果

for name, future in futures.items():
try:
results[name] = future.result()
except Exception as e:
results[name] = {'error': str(e)}

return results

```> **并行协作优势**:
> - 提高任务执行速度
> - 充分利用系统资源
> - 缩短总执行时间

## 子Agent的优势

### 1. 提高效率

| 任务类型 | 单Agent | 多子Agent | 效率提升 |
|----------|---------|-----------|----------|
| 代码生成 + 审查 | 10分钟 | 6分钟 | 40% ↑ |
| 多文件处理 | 15分钟 | 5分钟 | 67% ↑ |
| 复杂任务分解 | 20分钟 | 8分钟 | 60% ↑ |

### 2. 提高质量

| 指标 | 单Agent | 多子Agent | 改善 |
|------|---------|-----------|------|
| 代码质量 | 75% | 90% | +20% |
| 错误率 | 15% | 5% | 67% ↓ |
| 测试覆盖率 | 70% | 85% | +21% |

### 3. 提高可维护性

- **模块化**: 每个子Agent独立维护
- **可扩展**: 容易添加新的子Agent
- **可复用**: 子Agent可以在不同任务中复用

## 总结

子Agent概述包括:

1. **子Agent的概念**: 什么是子Agent、子Agent vs 主Agent
2. **子Agent的类型**: 代码生成子Agent、代码审查子Agent
3. **子Agent的协作**: 串行协作、并行协作
4. **子Agent的优势**: 提高效率、提高质量、提高可维护性

通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务。

在下一节中,我们将探讨异步子Agent任务。

基于 MIT 许可发布